contentmonster/src/contentmonster/classes/vesselthread.py

199 lines
6.9 KiB
Python
Raw Normal View History

2021-11-20 14:40:07 +00:00
from multiprocessing import Process
2021-11-25 18:03:58 +00:00
from typing import NoReturn, Optional
from traceback import format_exception
2021-11-20 14:40:07 +00:00
2022-09-19 18:05:58 +00:00
from .vessel import Vessel
from .remotefile import RemoteFile
from .retry import retry
from .database import Database
from .logger import Logger
from .file import File
from ..const import STATUS_COMPLETE, STATUS_START
2021-11-25 15:31:49 +00:00
2021-11-22 10:14:38 +00:00
import time
import sys
2021-11-22 10:14:38 +00:00
2021-11-25 16:09:40 +00:00
2021-11-20 14:40:07 +00:00
class VesselThread(Process):
"""Thread processing uploads to a single vessel"""
2021-11-25 16:09:40 +00:00
def __init__(self, vessel: Vessel, state: dict, dbclass: type = Database) -> None:
2021-11-25 15:31:49 +00:00
"""Initialize a new VesselThread
Args:
vessel (classes.vessel.Vessel): Vessel object to handle uploads for
state (dict): Dictionary containing the current application state
"""
2021-11-20 14:40:07 +00:00
super().__init__()
self.vessel = vessel
2021-11-25 15:31:49 +00:00
self._state = state
self._logger = Logger()
self._dbclass = dbclass
2021-11-20 14:40:07 +00:00
def run(self) -> NoReturn:
"""Run thread and process uploads to the vessel"""
2021-11-30 16:52:40 +00:00
self._logger.debug("Launched Vessel Thread for " + self.vessel.name)
self.assertDirectories()
2021-11-22 10:14:38 +00:00
while True:
try:
2021-11-25 16:21:04 +00:00
self.upload()
time.sleep(5)
2021-11-25 16:09:40 +00:00
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
self._logger.error(
"An exception occurred in the Vessel Thread for " + self.vessel.name
)
self._logger.error(
"\n".join(format_exception(exc_type, exc_value, exc_traceback))
)
2021-11-25 16:09:40 +00:00
@retry()
def assertDirectories(self) -> None:
for directory in self._state["config"].directories:
if not directory.name in self.vessel._ignoredirs:
2021-11-30 16:52:40 +00:00
self._logger.debug(
f"Making sure directory {directory.name} exists on Vessel {self.vessel.name}"
)
self.vessel.connection.assertDirectories(directory)
2021-11-25 18:03:58 +00:00
@retry()
2021-11-25 16:21:04 +00:00
def upload(self) -> None:
"""Continue uploading process"""
if not (current := (self.vessel.currentUpload() or self.processQueue())):
self._logger.debug(
f"No file needs to be uploaded to Vessel {self.vessel.name} at the moment"
)
2021-11-25 16:21:04 +00:00
return
if isinstance(current, tuple):
dirname, name, _ = current
self._logger.debug(
f"Found file {name} in directory {dirname} for vessel {self.vessel.name}"
)
directory = None
for d in self._state["config"].directories:
if d.name == dirname:
directory = d
break
if not directory:
self._logger.debug(
f"Directory {dirname} not specified in config - deleting File from Vessel {self.vessel.name}"
)
self.vessel.clearTempDir()
return
try:
fileobj = File(name, directory)
except FileNotFoundError:
self._logger.debug(
f"File {name} does not exist in Directory {dirname} on shore - deleting from Vessel {self.name}"
)
self.vessel.clearTempDir()
return
else:
fileobj = current
remotefile = RemoteFile(fileobj, self.vessel, self._state["config"].chunksize)
2021-11-25 18:03:58 +00:00
self._logger.debug(
f"Start processing file {fileobj.name} in directory {fileobj.directory.name} on vessel {self.vessel.name}"
)
2021-11-25 18:03:58 +00:00
while True:
db = self._dbclass()
if not db.getFileByUUID(fileobj.uuid):
self._logger.debug(
f"File {fileobj.name} in directory {fileobj.directory.name} does not exist anymore - deleting from {self.vessel.name}"
)
self.vessel.clearTempDir()
del db
self.vessel.assertDirectories(fileobj.directory)
2021-11-25 18:03:58 +00:00
status = remotefile.getStatus()
if status == STATUS_COMPLETE:
self._logger.debug(
f"File {fileobj.name} uploaded to vessel {self.vessel.name} completely - finalizing"
)
2021-11-25 18:03:58 +00:00
remotefile.finalizeUpload()
db = self._dbclass()
db.logCompletion(fileobj, self.vessel)
del db
self.vessel._uploaded.append(fileobj.uuid)
self._logger.debug(
f"Moved {fileobj.name} to its final destination on {self.vessel.name} - done!"
)
self.checkFileCompletion(fileobj)
2021-11-25 18:03:58 +00:00
return
nextchunk = 0 if status == STATUS_START else status + 1
self._logger.debug(
f"Getting chunk #{nextchunk} for file {fileobj.name} for vessel {self.vessel.name}"
)
2021-11-25 18:03:58 +00:00
chunk = remotefile.getChunk(nextchunk)
self._logger.debug("Got chunk")
2021-11-25 18:03:58 +00:00
# If the Chunk has no data, the selected range is beyond the end
# of the file, i.e. the complete file has already been uploaded
if chunk.data:
self._logger.debug(f"Uploading chunk to vessel {self.vessel.name}")
2021-11-25 18:03:58 +00:00
self.vessel.pushChunk(chunk)
else:
self._logger.debug(
f"No more data to upload to vessel {self.vessel.name} for file {fileobj.name} - compiling"
)
2021-11-25 18:03:58 +00:00
self.vessel.compileComplete(remotefile)
def checkFileCompletion(self, fileobj: File) -> None:
db = self._dbclass()
2021-11-30 06:25:24 +00:00
complete = db.getCompletionByFileUUID(fileobj.uuid)
del db
for vessel in self._state["config"].vessels:
if (
vessel.name not in complete
and fileobj.directory.name not in vessel._ignoredirs
):
return
self._logger.debug(
f"File {fileobj.name} from Directory {fileobj.directory.name} transferred to all Vessels. Moving out of replication directory."
)
if fileobj.exists():
fileobj.moveCompleted()
2021-11-25 18:03:58 +00:00
def processQueue(self) -> Optional[str]:
"""Return a file from the processing queue"""
self._logger.debug(
f"Trying to fetch new file for vessel {self.vessel.name} from queue"
)
2021-11-25 16:09:40 +00:00
for f in self._state["files"]:
if (not (f.uuid in self.vessel._uploaded)) and (
not (f.directory.name in self.vessel._ignoredirs)
):
self._logger.debug(f"Using file {f.name} for vessel {self.vessel.name}")
2021-11-25 18:03:58 +00:00
return f
if f.uuid in self.vessel._uploaded:
reason = "already uploaded"
else:
reason = "Directory ignored"
self._logger.trace(
f"Disregarding file {f.name} for vessel {self.vessel.name} - {reason}"
)
self._logger.trace(f"Didn't find any new files for vessel {self.vessel.name}")