From e82ccb270138d1a436d90a7d7cbb14812f190396 Mon Sep 17 00:00:00 2001 From: Kumi Date: Mon, 22 Apr 2024 16:39:33 +0200 Subject: [PATCH] feat: Enhance stability and configurability - Extended copyright to reflect the current year. - Incremented project version to indicate new features and fixes. - Added a new script entry for easier execution, increasing utility and accessibility. - Updated project URLs for better alignment with current infrastructure. - Refactored settings path for simplicity and consistency across deployments. - Improved code readability and maintenance across several modules by cleaning up redundant code, adding missing type annotations, and ensuring consistent code formatting. - Enhanced logging capabilities and error handling to improve diagnostics and troubleshooting, supporting more robust error recovery mechanisms. - Implemented more graceful handling of termination signals to ensure clean shutdown and resource cleanup, enhancing the robustness of the application in production environments. - Introduced command-line argument parsing for configuration file path customization, improving flexibility in different runtime environments. These changes collectively improve the project's maintainability, reliability, and user experience, laying a stronger foundation for future development. --- LICENSE | 2 +- pyproject.toml | 10 +- ...ttings.example.ini => settings.example.ini | 0 src/contentmonster/classes/chunk.py | 9 +- src/contentmonster/classes/config.py | 9 +- src/contentmonster/classes/connection.py | 107 ++++++++++++++---- src/contentmonster/classes/database.py | 96 ++++++++++------ src/contentmonster/classes/directory.py | 30 +++-- src/contentmonster/classes/doghandler.py | 17 ++- src/contentmonster/classes/file.py | 14 ++- src/contentmonster/classes/logger.py | 5 +- src/contentmonster/classes/remotefile.py | 25 ++-- src/contentmonster/classes/retry.py | 17 ++- src/contentmonster/classes/shorethread.py | 52 +++++---- src/contentmonster/classes/vessel.py | 54 ++++++--- src/contentmonster/classes/vesselthread.py | 97 +++++++++------- src/contentmonster/const.py | 2 +- src/contentmonster/worker.py | 55 +++++++-- 18 files changed, 398 insertions(+), 203 deletions(-) rename src/contentmonster/settings.example.ini => settings.example.ini (100%) diff --git a/LICENSE b/LICENSE index 4825fb9..54e798a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2021-2022 Kumi Systems e.U. +Copyright (c) 2021-2024 Kumi Systems e.U. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pyproject.toml b/pyproject.toml index b3cd5d8..6440baf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "contentmonster" -version = "0.0.6" +version = "0.0.7" authors = [ { name="Kumi Systems e.U.", email="office@kumi.systems" }, ] @@ -22,6 +22,10 @@ dependencies = [ "watchdog" ] +[project.scripts] +contentmonster = "contentmonster.worker:main" + [project.urls] -"Homepage" = "https://kumig.it/kumisystems/contentmonster" -"Bug Tracker" = "https://kumig.it/kumisystems/contentmonster/issues" \ No newline at end of file +"Homepage" = "https://git.private.coffee/kumisystems/contentmonster" +"Bug Tracker" = "https://git.private.coffee/kumisystems/contentmonster/issues" +"Source Code" = "https://git.private.coffee/kumisystems/contentmonster" \ No newline at end of file diff --git a/src/contentmonster/settings.example.ini b/settings.example.ini similarity index 100% rename from src/contentmonster/settings.example.ini rename to settings.example.ini diff --git a/src/contentmonster/classes/chunk.py b/src/contentmonster/classes/chunk.py index 4548740..fb990e8 100644 --- a/src/contentmonster/classes/chunk.py +++ b/src/contentmonster/classes/chunk.py @@ -15,8 +15,8 @@ class Chunk: chunksize (int): Size of each chunk in bytes Returns: - classes.chunk.Chunk: A Chunk object containing the portion of the - File object beginning at (count * chunksize) bytes and ending at + classes.chunk.Chunk: A Chunk object containing the portion of the + File object beginning at (count * chunksize) bytes and ending at ((count + 1) * chunksize - 1) bytes """ return fileobj.getChunk(count, chunksize) @@ -26,7 +26,7 @@ class Chunk: Args: fileobj (classes.file.File): The file object from local storage - count (int): Position of the current chunk in the list of total + count (int): Position of the current chunk in the list of total chunks (first index: 0) or -1 to get the complete file data (bytes): Content of the chunk """ @@ -49,3 +49,6 @@ class Chunk: str: SHA256 hash of Chunk.data """ return hashlib.sha256(self.data).hexdigest() + + def getFileSize(self) -> int: + return len(self.data) diff --git a/src/contentmonster/classes/config.py b/src/contentmonster/classes/config.py index 1a6fb0b..3533e84 100644 --- a/src/contentmonster/classes/config.py +++ b/src/contentmonster/classes/config.py @@ -41,17 +41,14 @@ class MonsterConfig: for section in parser.sections(): # Read Directories from the config file if section.startswith("Directory"): - self.directories.append( - Directory.fromConfig(parser[section])) + self.directories.append(Directory.fromConfig(parser[section])) # Read Vessels from the config file elif section.startswith("Vessel"): - self.vessels.append( - Vessel.fromConfig(parser[section], dbclass)) + self.vessels.append(Vessel.fromConfig(parser[section], dbclass)) def __init__(self) -> None: - """Initialize a new (empty) MonsterConfig object - """ + """Initialize a new (empty) MonsterConfig object""" self.directories = [] self.vessels = [] self.chunksize = 10485760 # Default: 10 MiB diff --git a/src/contentmonster/classes/connection.py b/src/contentmonster/classes/connection.py index e823575..75a5288 100644 --- a/src/contentmonster/classes/connection.py +++ b/src/contentmonster/classes/connection.py @@ -9,10 +9,11 @@ from typing import Union, Optional import errno import stat +from .logger import Logger + class Connection: - """Class representing an SSH/SFTP connection to a Vessel - """ + """Class representing an SSH/SFTP connection to a Vessel""" def __init__(self, vessel): """Initialize a new Connection to a Vessel @@ -24,12 +25,20 @@ class Connection: self._client = SSHClient() self._client.load_system_host_keys() self._client.set_missing_host_key_policy(WarningPolicy) - self._client.connect(vessel.address, vessel.port, vessel.username, - vessel.password, timeout=vessel.timeout, - passphrase=vessel.passphrase) + self._client.connect( + vessel.address, + vessel.port, + vessel.username, + vessel.password, + timeout=vessel.timeout, + passphrase=vessel.passphrase, + auth_timeout=vessel.timeout, + banner_timeout=vessel.timeout, + ) self._transport = self._client.get_transport() self._transport.set_keepalive(10) self._sftp = self._client.open_sftp() + self._logger = Logger() def _exists(self, path: Union[str, Path]) -> bool: """Check if a path exists on the Vessel. Symlinks are not followed. @@ -97,20 +106,35 @@ class Connection: """ return self._sftp.remove(str(path)) + def _size(self, path: Union[str, Path]) -> int: + """Get the size of a file on the Vessel + + Args: + path (str, pathlib.Path): Path of the file to check + + Returns: + int: Size of the file in bytes + + Raises: + FileNotFoundError: Raised if no file is found at the given path + """ + return self._sftp.stat(str(path)).st_size + def assertTempDirectory(self) -> None: """Make sure that the temp directory exists on the Vessel Raises: ValueError: Raised if the path is already in use on the vessel but is not a directory. - IOError: Raised if the directory does not exist but cannot be + IOError: Raised if the directory does not exist but cannot be created. """ if not self._exists(self._vessel.tempdir): self._mkdir(self._vessel.tempdir) elif not self._isdir(self._vessel.tempdir): raise ValueError( - f"{self._vessel.tempdir} exists but is not a directory on Vessel {self._vessel.name}!") + f"{self._vessel.tempdir} exists but is not a directory on Vessel {self._vessel.name}!" + ) def assertDirectories(self, directory) -> None: """Make sure that destination and temp directories exist on the Vessel @@ -122,7 +146,7 @@ class Connection: Raises: ValueError: Raised if a path is already in use on the vessel but not a directory. - IOError: Raised if a directory that does not exist cannot be + IOError: Raised if a directory that does not exist cannot be created. """ for d in [directory.location, self._vessel.tempdir]: @@ -130,15 +154,18 @@ class Connection: self._mkdir(d) elif not self._isdir(d): raise ValueError( - f"{d} exists but is not a directory on Vessel {self._vessel.name}!") + f"{d} exists but is not a directory on Vessel {self._vessel.name}!" + ) - def assertChunkComplete(self, chunk, path: Optional[Union[str, Path]] = None) -> bool: + def assertChunkComplete( + self, chunk: "Chunk", path: Optional[Union[str, Path]] = None + ) -> bool: """Check if a Chunk has been uploaded correctly Args: chunk (classes.chunk.Chunk): Chunk object to verify upload for - path (str, pathlib.Path, optional): Optional path at which to - check. If None, will get default path from Chunk object. + path (str, pathlib.Path, optional): Optional path at which to + check. If None, will get default path from Chunk object. Defaults to None. Returns: @@ -154,10 +181,29 @@ class Connection: o.channel.recv_exit_status() # Remove the file if it is broken - if not o.readline().split()[0] == chunk.getHash(): + complete_output = o.readlines() + filehash = complete_output[0].split()[0] + chunkhash = chunk.getHash() + + if filehash != chunkhash: + self._logger.error( + f"Hash mismatch for {chunk.file.uuid} chunk {chunk.count}: {filehash} != {chunkhash}" + ) + + filesize = self._size(path) + chunksize = chunk.getFileSize() + + self._logger.debug( + f"File size: {filesize} bytes, expected chunk size: {chunksize} bytes" + ) + self._logger.debug("\n".join(complete_output)) + self._remove(path) else: + self._logger.debug( + f"Hash match for {chunk.file.uuid} chunk {chunk.count}: {filehash} == {chunkhash}" + ) return True return False @@ -185,16 +231,30 @@ class Connection: numchunks = remotefile.getStatus() + 1 # Get files in correct order to concatenate - files = " ".join( - [str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)]) + files = [ + str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") + for i in range(numchunks) + ] + + for filepath in files: + if not self._exists(filepath): + self._logger.error(f"{filepath} does not exist...?") + return False completefile = remotefile.file.getChunk(-1) outname = completefile.getTempName() outpath = self._vessel.tempdir / outname - _, o, _ = self._client.exec_command(f"cat {files} > {outpath}") - # Block for command to complete - o.channel.recv_exit_status() + if self._exists(outpath): + self._remove(outpath) + + for f in files: + _, o, _ = self._client.exec_command(f"cat {f} >> {outpath}") + + # Block for command to complete + o.channel.recv_exit_status() + + return True def assertComplete(self, remotefile, allow_retry: bool = False) -> bool: """Check if File has been reassembled from Chunks correctly @@ -230,7 +290,7 @@ class Connection: """Moves reassembled file to output destination Args: - remotefile (classes.remotefile.RemoteFile): RemoteFile object + remotefile (classes.remotefile.RemoteFile): RemoteFile object describing the uploaded File. Returns: @@ -239,7 +299,8 @@ class Connection: completefile = remotefile.file.getChunk(-1) destination = remotefile.file.getFullPath() self._sftp.posix_rename( - str(self._vessel.tempdir / completefile.getTempName()), str(destination)) + str(self._vessel.tempdir / completefile.getTempName()), str(destination) + ) # Make sure that file has actually been created at destination self._sftp.stat(str(destination)) @@ -255,12 +316,10 @@ class Connection: return f.split("_")[0] def clearTempDir(self) -> None: - """Clean up the temporary directory on the Vessel - """ + """Clean up the temporary directory on the Vessel""" for f in self._listdir(self._vessel.tempdir): self._remove(self._vessel.tempdir / f) def __del__(self): - """Close SSH connection when ending Connection - """ + """Close SSH connection when ending Connection""" self._client.close() diff --git a/src/contentmonster/classes/database.py b/src/contentmonster/classes/database.py index 52ba1df..6fd6a86 100644 --- a/src/contentmonster/classes/database.py +++ b/src/contentmonster/classes/database.py @@ -4,10 +4,11 @@ import uuid from typing import Union, Optional +from .logger import Logger + class Database: - """Class wrapping sqlite3 database connection - """ + """Class wrapping sqlite3 database connection""" def __init__(self, filename: Optional[Union[str, pathlib.Path]] = None): """Initialize a new Database object @@ -17,22 +18,34 @@ class Database: database to use. If None, use "database.sqlite3" in project base directory. Defaults to None. """ - filename = filename or pathlib.Path( - __file__).parent.parent.absolute() / "database.sqlite3" + filename = ( + filename + or pathlib.Path(__file__).parent.parent.absolute() / "database.sqlite3" + ) self._con = sqlite3.connect(filename) self.migrate() - def _execute(self, query: str, parameters: Optional[tuple] = None) -> None: + def _execute( + self, query: str, parameters: Optional[tuple] = None, retry: bool = True + ) -> None: """Execute a query on the database Args: query (str): SQL query to execute parameters (tuple, optional): Parameters to use to replace placeholders in the query, if any. Defaults to None. + retry (bool, optional): Whether to retry the query if it fails due to + a locked database. Defaults to True. """ - cur = self.getCursor() - cur.execute(query, parameters) - self.commit() # Instantly commit after every (potential) write action + try: + cur = self.getCursor() + cur.execute(query, parameters) + self.commit() # Instantly commit after every (potential) write action + except sqlite3.OperationalError as e: + self._logger.error(f"An error occurred while writing to the database: {e}") + if retry: + return self._execute(query, parameters, False) + raise def commit(self) -> None: """Commit the current database transaction @@ -60,7 +73,8 @@ class Database: cur = self.getCursor() try: cur.execute( - "SELECT value FROM contentmonster_settings WHERE key = 'dbversion'") + "SELECT value FROM contentmonster_settings WHERE key = 'dbversion'" + ) assert (version := cur.fetchone()) return int(version[0]) except (sqlite3.OperationalError, AssertionError): @@ -78,8 +92,10 @@ class Database: hash = fileobj.getHash() cur = self.getCursor() - cur.execute("SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?", - (fileobj.directory.name, fileobj.name)) + cur.execute( + "SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?", + (fileobj.directory.name, fileobj.name), + ) fileuuid = None @@ -102,7 +118,7 @@ class Database: Args: fileobj (classes.file.File): File object to add to database - hash (str, optional): Checksum of the file, if already known. + hash (str, optional): Checksum of the file, if already known. Defaults to None. Returns: @@ -110,8 +126,10 @@ class Database: """ hash = hash or fileobj.getHash() fileuuid = str(uuid.uuid4()) - self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", - (fileuuid, fileobj.directory.name, fileobj.name, hash)) + self._execute( + "INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", + (fileuuid, fileobj.directory.name, fileobj.name, hash), + ) return fileuuid def getFileByUUID(self, fileuuid: str) -> Optional[tuple[str, str, str]]: @@ -129,20 +147,24 @@ class Database: """ cur = self.getCursor() cur.execute( - "SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) - if (result := cur.fetchone()): + "SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", + (fileuuid,), + ) + if result := cur.fetchone(): return result def removeFile(self, directory, name: str) -> None: """Remove a File from the database based on Directory and filename Args: - directory (classes.directory.Directory): Directory object + directory (classes.directory.Directory): Directory object containing the File to remove name (str): Filename of the File to remove """ self._execute( - "DELETE FROM contentmonster_file WHERE directory = ? AND name = ?", (directory.name, name)) + "DELETE FROM contentmonster_file WHERE directory = ? AND name = ?", + (directory.name, name), + ) def removeFileByUUID(self, fileuuid: str) -> None: """Remove a File from the database based on UUID @@ -150,19 +172,20 @@ class Database: Args: fileuuid (str): The UUID of the File to remove from the database """ - self._execute( - "DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) + self._execute("DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) def logCompletion(self, file, vessel): """Log the completion of a File upload Args: file (classes.file.File): The File object that has been uploaded - vessel (classes.vessel.Vessel): The Vessel the File has been + vessel (classes.vessel.Vessel): The Vessel the File has been uploaded to """ self._execute( - "INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", (file.uuid, vessel.name)) + "INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", + (file.uuid, vessel.name), + ) def getCompletionForVessel(self, vessel) -> list[Optional[str]]: """Get completed uploads for a vessel @@ -176,37 +199,44 @@ class Database: """ cur = self.getCursor() cur.execute( - "SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,)) + "SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,) + ) return [f[0] for f in cur.fetchall()] def getCompletionByFileUUID(self, fileuuid: str) -> list[Optional[str]]: cur = self.getCursor() - cur.execute("SELECT vessel FROM contentmonster_file_log WHERE file = ?", (fileuuid,)) + cur.execute( + "SELECT vessel FROM contentmonster_file_log WHERE file = ?", (fileuuid,) + ) return [v[0] for v in cur.fetchall()] def migrate(self) -> None: - """Apply database migrations - """ + """Apply database migrations""" cur = self.getCursor() if self.getVersion() == 0: cur.execute( - "CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)") + "CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)" + ) cur.execute( - "INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')") + "INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')" + ) self.commit() if self.getVersion() == 1: cur.execute( - "CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))") - cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)") + "CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))" + ) cur.execute( - "UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'") + "CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)" + ) + cur.execute( + "UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'" + ) self.commit() def __del__(self): - """Close database connection on removal of the Database object - """ + """Close database connection on removal of the Database object""" self._con.close() diff --git a/src/contentmonster/classes/directory.py b/src/contentmonster/classes/directory.py index a1274f8..52729ea 100644 --- a/src/contentmonster/classes/directory.py +++ b/src/contentmonster/classes/directory.py @@ -8,14 +8,14 @@ from typing import Union, Optional class Directory: - """Class representing a Directory on the local filesystem - """ + """Class representing a Directory on the local filesystem""" + @classmethod def fromConfig(cls, config: SectionProxy): """Create Directory object from a Directory section in the Config file Args: - config (configparser.SectionProxy): Configuration section defining + config (configparser.SectionProxy): Configuration section defining a Directory Raises: @@ -28,8 +28,11 @@ class Directory: if "Location" in config.keys(): return cls(config.name.split()[1], config["Location"]) else: - raise ValueError("Definition for Directory " + - config.name.split()[1] + " does not contain Location!") + raise ValueError( + "Definition for Directory " + + config.name.split()[1] + + " does not contain Location!" + ) def __init__(self, name: str, location: Union[str, pathlib.Path]): """Initialize a new Directory object @@ -39,7 +42,7 @@ class Directory: location (str, pathlib.Path): Filesystem location of the Directory Raises: - ValueError: Raised if passed location does not exist or is not a + ValueError: Raised if passed location does not exist or is not a directory """ self.name = name @@ -50,7 +53,8 @@ class Directory: else: location = str(location) raise ValueError( - f"Location {location} for Directory {name} does not exist or is not a directory.") + f"Location {location} for Directory {name} does not exist or is not a directory." + ) @property def completeddir(self): @@ -59,7 +63,10 @@ class Directory: def assertCompletedDirectory(self): if not os.path.isdir(self.completeddir): if os.path.isfile(self.completeddir): - raise FileExistsError("Cannot create directory %s - path exists but is not a directory!" % str(self.completeddir)) + raise FileExistsError( + "Cannot create directory %s - path exists but is not a directory!" + % str(self.completeddir) + ) os.mkdir(self.completeddir) @@ -71,8 +78,11 @@ class Directory: Returns: list: List of File objects for files within the Directory """ - files = [f for f in os.listdir(self.location) if os.path.isfile( - self.location / f) and os.path.getsize(self.location / f)] + files = [ + f + for f in os.listdir(self.location) + if os.path.isfile(self.location / f) and os.path.getsize(self.location / f) + ] return [File(f, self) for f in files] def getFile(self, name: str) -> Optional[File]: diff --git a/src/contentmonster/classes/doghandler.py b/src/contentmonster/classes/doghandler.py index 6c6b737..2f7be76 100644 --- a/src/contentmonster/classes/doghandler.py +++ b/src/contentmonster/classes/doghandler.py @@ -1,6 +1,11 @@ -from watchdog.events import (FileSystemEventHandler, FileSystemEvent, - FileCreatedEvent, FileDeletedEvent, - FileModifiedEvent, FileMovedEvent) +from watchdog.events import ( + FileSystemEventHandler, + FileSystemEvent, + FileCreatedEvent, + FileDeletedEvent, + FileModifiedEvent, + FileMovedEvent, +) from multiprocessing import Queue @@ -11,8 +16,7 @@ import time class DogHandler(FileSystemEventHandler): - """Class implementing a watchdog event handler - """ + """Class implementing a watchdog event handler""" def __init__(self, directory, queue: Queue, *args, **kwargs) -> None: """Initialize a new DogHandler object @@ -71,7 +75,8 @@ class DogHandler(FileSystemEventHandler): file (source and destination) """ self._logger.debug( - f"Detected move event of {event.src_path} to {event.dest_path}") + f"Detected move event of {event.src_path} to {event.dest_path}" + ) self._queue.put((self._directory, os.path.basename(event.src_path))) self._queue.put((self._directory, os.path.basename(event.dest_path))) diff --git a/src/contentmonster/classes/file.py b/src/contentmonster/classes/file.py index 2af6cd8..54443a7 100644 --- a/src/contentmonster/classes/file.py +++ b/src/contentmonster/classes/file.py @@ -8,10 +8,11 @@ import os.path class File: - """Object representing a file found in a local Directory - """ + """Object representing a file found in a local Directory""" - def __init__(self, name: str, directory, uuid: Optional[str] = None, dbclass: type = Database) -> None: + def __init__( + self, name: str, directory, uuid: Optional[str] = None, dbclass: type = Database + ) -> None: """Initialize new File object Args: @@ -31,7 +32,9 @@ class File: self.dbclass = dbclass if not self.exists(): - raise FileNotFoundError(f"File {self.name} does not exist in {self.directory.name}!") + raise FileNotFoundError( + f"File {self.name} does not exist in {self.directory.name}!" + ) self.uuid = uuid or self.getUUID() @@ -90,7 +93,8 @@ class File: """ if count != -1 and not size: raise ValueError( - "A Chunk size needs to be passed to getChunk() if not using the complete file (-1)!") + "A Chunk size needs to be passed to getChunk() if not using the complete file (-1)!" + ) with open(self.getFullPath(), "rb") as binary: binary.seek((count * size) if count > 0 else 0) diff --git a/src/contentmonster/classes/logger.py b/src/contentmonster/classes/logger.py index 99deaab..669061b 100644 --- a/src/contentmonster/classes/logger.py +++ b/src/contentmonster/classes/logger.py @@ -3,4 +3,7 @@ from logging import Logger as PyLogger class Logger(PyLogger): def __init__(self, name="contentmonster"): - super().__init__(name) \ No newline at end of file + super().__init__(name) + + def trace(self, msg, *args, **kwargs) -> None: + pass diff --git a/src/contentmonster/classes/remotefile.py b/src/contentmonster/classes/remotefile.py index 3e7e0a4..c201431 100644 --- a/src/contentmonster/classes/remotefile.py +++ b/src/contentmonster/classes/remotefile.py @@ -2,8 +2,7 @@ from ..const import STATUS_COMPLETE, STATUS_START class RemoteFile: - """Class describing the transfer status of a File to a Vessel - """ + """Class describing the transfer status of a File to a Vessel""" def __init__(self, fileobj, vessel, chunksize: int) -> None: """Initialize a new RemoteFile object @@ -29,8 +28,7 @@ class RemoteFile: # Get all files in the vessel's tempdir ls = self.vessel.connection._listdir(self.vessel.tempdir) - files = [f for f in ls if f.startswith( - self.file.uuid) and f.endswith(".part")] + files = [f for f in ls if f.startswith(self.file.uuid) and f.endswith(".part")] # Find the file with the largest chunk number @@ -39,11 +37,14 @@ class RemoteFile: for f in files: part = f.split("_")[1].split(".")[0] if part == "complete": # If a reassembled file is found - if self.validateComplete(True): # and it is not broken - return STATUS_COMPLETE # the upload is complete + if self.validateComplete(True): # and it is not broken + return STATUS_COMPLETE # the upload is complete # Else save the chunk number if it is larger than the previous - count = max(count, int(part)) + try: + count = max(count, int(part)) + except: + pass # Find and return the largest non-corrupt chunk while count >= 0: @@ -80,8 +81,7 @@ class RemoteFile: return self.vessel.connection.assertComplete(self, allow_retry) def compileComplete(self) -> None: - """Reassemble a complete File from the uploaded Chunks - """ + """Reassemble a complete File from the uploaded Chunks""" self.vessel.connection.compileComplete(self) def getChunk(self, count: int): @@ -91,7 +91,7 @@ class RemoteFile: count (int): Number of the Chunk to generate Returns: - classes.chunk.Chunk: A Chunk object containing the portion of the + classes.chunk.Chunk: A Chunk object containing the portion of the File object beginning at (count * chunksize) bytes and ending at ((count + 1) * chunksize - 1) bytes, with chunksize taken from the RemoteFile initialization value @@ -99,7 +99,6 @@ class RemoteFile: return self.file.getChunk(count, self.chunksize) def finalizeUpload(self) -> None: - """Move complete file to its final destination and clean up - """ + """Move complete file to its final destination and clean up""" self.vessel.connection.moveComplete(self) - self.vessel.connection.clearTempDir() \ No newline at end of file + self.vessel.connection.clearTempDir() diff --git a/src/contentmonster/classes/retry.py b/src/contentmonster/classes/retry.py index 60d4b90..610677a 100644 --- a/src/contentmonster/classes/retry.py +++ b/src/contentmonster/classes/retry.py @@ -5,8 +5,8 @@ from .logger import Logger class retry: - """Decorator used to automatically retry operations throwing exceptions - """ + """Decorator used to automatically retry operations throwing exceptions""" + def __init__(self, exceptions: tuple[BaseException] = None): """Initializing the retry decorator @@ -16,10 +16,14 @@ class retry: paramiko.ssh_exception.SSHException/NoValidConnectionsError and socket.timeout/TimeoutError. Defaults to None. """ - self.exceptions = exceptions or (SSHException, NoValidConnectionsError, - timeout, TimeoutError) + self.exceptions = exceptions or ( + SSHException, + NoValidConnectionsError, + timeout, + TimeoutError, + ) self._logger = Logger() - + def __call__(self, f): """Return a function through the retry decorator @@ -29,6 +33,7 @@ class retry: Returns: function: Function wrapping the passed function """ + def wrapped_f(*args, **kwargs): while True: try: @@ -36,4 +41,4 @@ class retry: except self.exceptions as e: self._logger.info("Caught expected exception: " + repr(e)) - return wrapped_f \ No newline at end of file + return wrapped_f diff --git a/src/contentmonster/classes/shorethread.py b/src/contentmonster/classes/shorethread.py index f994bf6..d219501 100644 --- a/src/contentmonster/classes/shorethread.py +++ b/src/contentmonster/classes/shorethread.py @@ -15,8 +15,8 @@ import os.path class ShoreThread(Process): - """Thread handling the discovery of shore-side file changes - """ + """Thread handling the discovery of shore-side file changes""" + def __init__(self, state: dict, dbclass: type = Database) -> None: """Create a new ShoreThread object @@ -48,14 +48,12 @@ class ShoreThread(Process): return files def clearFiles(self) -> None: - """Clear the files variable in the application state - """ + """Clear the files variable in the application state""" self._logger.debug("Clearing global files variable") del self._state["files"][:] def monitor(self) -> None: - """Initialize monitoring of Directories specified in configuration - """ + """Initialize monitoring of Directories specified in configuration""" for directory in self._state["config"].directories: self._logger.debug("Creating dog for " + str(directory.location)) handler = DogHandler(directory, self.queue) @@ -65,8 +63,7 @@ class ShoreThread(Process): self._dogs.append(dog) def run(self) -> NoReturn: - """Launch the ShoreThread and start monitoring for file changes - """ + """Launch the ShoreThread and start monitoring for file changes""" self._logger.info("Launched Shore Thread") for f in self.getAllFiles(): @@ -79,8 +76,7 @@ class ShoreThread(Process): self.processQueue() def joinDogs(self) -> None: - """Join observers to receive updates on the queue - """ + """Join observers to receive updates on the queue""" self._logger.debug("Joining dogs") for dog in self._dogs: dog.join(1) @@ -103,7 +99,7 @@ class ShoreThread(Process): outlist.append(f) self.clearFiles() - + for f in outlist: self._state["files"].append(f) @@ -118,7 +114,9 @@ class ShoreThread(Process): Args: fileobj (classes.file.File): File object to add to the queue """ - self._logger.debug(f"Adding file {fileobj.name} to directory {fileobj.directory.name}") + self._logger.debug( + f"Adding file {fileobj.name} to directory {fileobj.directory.name}" + ) outlist = [] @@ -133,9 +131,13 @@ class ShoreThread(Process): outlist.append(f) if self.checkFileCompletion(fileobj): - self._logger.debug(f"File {fileobj.name} already transferred to all Vessels - not re-adding to queue") + self._logger.debug( + f"File {fileobj.name} already transferred to all Vessels - not re-adding to queue" + ) else: - self._logger.debug(f"File {fileobj.name} not in processing queue (anymore) - adding") + self._logger.debug( + f"File {fileobj.name} not in processing queue (anymore) - adding" + ) outlist.append(fileobj) self.clearFiles() @@ -146,15 +148,18 @@ class ShoreThread(Process): def checkFileCompletion(self, fileobj: File) -> bool: db = self._dbclass() complete = db.getCompletionByFileUUID(fileobj.uuid) - del(db) + del db for vessel in self._state["config"].vessels: - if vessel.name not in complete and fileobj.directory.name not in vessel._ignoredirs: + if ( + vessel.name not in complete + and fileobj.directory.name not in vessel._ignoredirs + ): return False if fileobj.exists(): fileobj.moveCompleted() - + return True def processFile(self, directory: Directory, name: str) -> None: @@ -163,11 +168,13 @@ class ShoreThread(Process): Args: directory (classes.directory.Directory): Directory containing the created, deleted, modified or moved File - name (str): Filename of the created, deleted, modified or moved + name (str): Filename of the created, deleted, modified or moved File """ - self._logger.debug(f"Processing change to file {name} in directory {directory.name}") - if (fileobj := directory.getFile(name)): + self._logger.debug( + f"Processing change to file {name} in directory {directory.name}" + ) + if fileobj := directory.getFile(name): self.addFile(fileobj) else: self.purgeFile(directory, name) @@ -180,12 +187,11 @@ class ShoreThread(Process): File that has been created, moved, modified or deleted. """ self._logger.debug("Waiting for new changes...") - directory, basename = self.queue.get() # Will block until an event is found + directory, basename = self.queue.get() # Will block until an event is found self.processFile(directory, basename) def terminate(self, *args, **kwargs) -> NoReturn: - """Terminate observer threads, then terminate self - """ + """Terminate observer threads, then terminate self""" self._logger.info("Terminating dogs and shore thread") for dog in self._dogs: dog.terminate() diff --git a/src/contentmonster/classes/vessel.py b/src/contentmonster/classes/vessel.py index 5d9669d..289ef1d 100644 --- a/src/contentmonster/classes/vessel.py +++ b/src/contentmonster/classes/vessel.py @@ -11,14 +11,14 @@ import pathlib class Vessel: - """Class describing a Vessel (= a replication destination) - """ + """Class describing a Vessel (= a replication destination)""" + @classmethod def fromConfig(cls, config: SectionProxy, dbclass: type = Database): """Create Vessel object from a Vessel section in the Config file Args: - config (configparser.SectionProxy): Vessel section defining a + config (configparser.SectionProxy): Vessel section defining a Vessel dbclass (type): Class to use for database connections. Defaults to built-in Database using sqlite3. @@ -61,17 +61,38 @@ class Vessel: ignoredirs = [d.strip() for d in config["IgnoreDirs"].split(",")] if "Address" in config.keys(): - return cls(config.name.split()[1], config["Address"], username, - password, passphrase, port, timeout, tempdir, ignoredirs, dbclass) + return cls( + config.name.split()[1], + config["Address"], + username, + password, + passphrase, + port, + timeout, + tempdir, + ignoredirs, + dbclass, + ) else: - raise ValueError("Definition for Vessel " + - config.name.split()[1] + " does not contain Address!") + raise ValueError( + "Definition for Vessel " + + config.name.split()[1] + + " does not contain Address!" + ) - def __init__(self, name: str, address: str, username: Optional[str] = None, - password: Optional[str] = None, passphrase: Optional[str] = None, - port: Optional[int] = None, timeout: Optional[int] = None, - tempdir: Optional[Union[str, pathlib.Path]] = None, - ignoredirs: list[Optional[str]] = [], dbclass: type = Database) -> None: + def __init__( + self, + name: str, + address: str, + username: Optional[str] = None, + password: Optional[str] = None, + passphrase: Optional[str] = None, + port: Optional[int] = None, + timeout: Optional[int] = None, + tempdir: Optional[Union[str, pathlib.Path]] = None, + ignoredirs: list[Optional[str]] = [], + dbclass: type = Database, + ) -> None: """Initialize new Vessel object Args: @@ -141,8 +162,7 @@ class Vessel: return output def clearTempDir(self) -> None: - """Clean up the temporary directory on the Vessel - """ + """Clean up the temporary directory on the Vessel""" self.connection.clearTempDir() def pushChunk(self, chunk, path: Optional[Union[str, pathlib.Path]] = None) -> None: @@ -165,7 +185,7 @@ class Vessel: remotefile (classes.remotefile.RemoteFile): RemoteFile object describing the uploaded File """ - self.connection.compileComplete(remotefile) + return self.connection.compileComplete(remotefile) def assertDirectories(self, directory) -> None: """Make sure that destination and temp directories exist on the Vessel @@ -177,7 +197,7 @@ class Vessel: Raises: ValueError: Raised if a path is already in use on the vessel but not a directory. - IOError: Raised if a directory that does not exist cannot be + IOError: Raised if a directory that does not exist cannot be created. """ self.connection.assertDirectories(directory) @@ -188,7 +208,7 @@ class Vessel: Raises: ValueError: Raised if the path is already in use on the vessel but is not a directory. - IOError: Raised if the directory does not exist but cannot be + IOError: Raised if the directory does not exist but cannot be created. """ self.connection.assertTempDirectory() diff --git a/src/contentmonster/classes/vesselthread.py b/src/contentmonster/classes/vesselthread.py index 8583777..039c0d0 100644 --- a/src/contentmonster/classes/vesselthread.py +++ b/src/contentmonster/classes/vesselthread.py @@ -1,5 +1,6 @@ from multiprocessing import Process from typing import NoReturn, Optional +from traceback import format_exception from .vessel import Vessel from .remotefile import RemoteFile @@ -10,11 +11,11 @@ from .file import File from ..const import STATUS_COMPLETE, STATUS_START import time +import sys class VesselThread(Process): - """Thread processing uploads to a single vessel - """ + """Thread processing uploads to a single vessel""" def __init__(self, vessel: Vessel, state: dict, dbclass: type = Database) -> None: """Initialize a new VesselThread @@ -30,8 +31,7 @@ class VesselThread(Process): self._dbclass = dbclass def run(self) -> NoReturn: - """Run thread and process uploads to the vessel - """ + """Run thread and process uploads to the vessel""" self._logger.debug("Launched Vessel Thread for " + self.vessel.name) self.assertDirectories() while True: @@ -39,31 +39,39 @@ class VesselThread(Process): self.upload() time.sleep(5) except Exception as e: - self._logger.error("An exception occurred in the Vessel Thread for " + - self.vessel.name) - self._logger.error(repr(e)) + exc_type, exc_value, exc_traceback = sys.exc_info() + + self._logger.error( + "An exception occurred in the Vessel Thread for " + self.vessel.name + ) + + self._logger.error( + "\n".join(format_exception(exc_type, exc_value, exc_traceback)) + ) @retry() def assertDirectories(self) -> None: for directory in self._state["config"].directories: if not directory.name in self.vessel._ignoredirs: self._logger.debug( - f"Making sure directory {directory.name} exists on Vessel {self.vessel.name}") + f"Making sure directory {directory.name} exists on Vessel {self.vessel.name}" + ) self.vessel.connection.assertDirectories(directory) @retry() def upload(self) -> None: - """Continue uploading process - """ + """Continue uploading process""" if not (current := (self.vessel.currentUpload() or self.processQueue())): self._logger.debug( - f"No file needs to be uploaded to Vessel {self.vessel.name} at the moment") + f"No file needs to be uploaded to Vessel {self.vessel.name} at the moment" + ) return if isinstance(current, tuple): dirname, name, _ = current self._logger.debug( - f"Found file {name} in directory {dirname} for vessel {self.vessel.name}") + f"Found file {name} in directory {dirname} for vessel {self.vessel.name}" + ) directory = None @@ -74,7 +82,8 @@ class VesselThread(Process): if not directory: self._logger.debug( - f"Directory {dirname} not specified in config - deleting File from Vessel {self.vessel.name}") + f"Directory {dirname} not specified in config - deleting File from Vessel {self.vessel.name}" + ) self.vessel.clearTempDir() return @@ -82,26 +91,28 @@ class VesselThread(Process): fileobj = File(name, directory) except FileNotFoundError: self._logger.debug( - f"File {name} does not exist in Directory {dirname} on shore - deleting from Vessel {self.name}") + f"File {name} does not exist in Directory {dirname} on shore - deleting from Vessel {self.name}" + ) self.vessel.clearTempDir() return else: fileobj = current - remotefile = RemoteFile(fileobj, self.vessel, - self._state["config"].chunksize) + remotefile = RemoteFile(fileobj, self.vessel, self._state["config"].chunksize) self._logger.debug( - f"Start processing file {fileobj.name} in directory {fileobj.directory.name} on vessel {self.vessel.name}") + f"Start processing file {fileobj.name} in directory {fileobj.directory.name} on vessel {self.vessel.name}" + ) while True: db = self._dbclass() if not db.getFileByUUID(fileobj.uuid): self._logger.debug( - f"File {fileobj.name} in directory {fileobj.directory.name} does not exist anymore - deleting from {self.vessel.name}") + f"File {fileobj.name} in directory {fileobj.directory.name} does not exist anymore - deleting from {self.vessel.name}" + ) self.vessel.clearTempDir() - del(db) + del db self.vessel.assertDirectories(fileobj.directory) @@ -109,16 +120,18 @@ class VesselThread(Process): if status == STATUS_COMPLETE: self._logger.debug( - f"File {fileobj.name} uploaded to vessel {self.vessel.name} completely - finalizing") + f"File {fileobj.name} uploaded to vessel {self.vessel.name} completely - finalizing" + ) remotefile.finalizeUpload() db = self._dbclass() db.logCompletion(fileobj, self.vessel) - del(db) + del db self.vessel._uploaded.append(fileobj.uuid) self._logger.debug( - f"Moved {fileobj.name} to its final destination on {self.vessel.name} - done!") + f"Moved {fileobj.name} to its final destination on {self.vessel.name} - done!" + ) self.checkFileCompletion(fileobj) return @@ -126,7 +139,8 @@ class VesselThread(Process): nextchunk = 0 if status == STATUS_START else status + 1 self._logger.debug( - f"Getting chunk #{nextchunk} for file {fileobj.name} for vessel {self.vessel.name}") + f"Getting chunk #{nextchunk} for file {fileobj.name} for vessel {self.vessel.name}" + ) chunk = remotefile.getChunk(nextchunk) self._logger.debug("Got chunk") @@ -135,45 +149,50 @@ class VesselThread(Process): # of the file, i.e. the complete file has already been uploaded if chunk.data: - self._logger.debug( - f"Uploading chunk to vessel {self.vessel.name}") + self._logger.debug(f"Uploading chunk to vessel {self.vessel.name}") self.vessel.pushChunk(chunk) else: self._logger.debug( - f"No more data to upload to vessel {self.vessel.name} for file {fileobj.name} - compiling") + f"No more data to upload to vessel {self.vessel.name} for file {fileobj.name} - compiling" + ) self.vessel.compileComplete(remotefile) def checkFileCompletion(self, fileobj: File) -> None: db = self._dbclass() complete = db.getCompletionByFileUUID(fileobj.uuid) - del(db) + del db for vessel in self._state["config"].vessels: - if vessel.name not in complete and fileobj.directory.name not in vessel._ignoredirs: + if ( + vessel.name not in complete + and fileobj.directory.name not in vessel._ignoredirs + ): return self._logger.debug( - f"File {fileobj.name} from Directory {fileobj.directory.name} transferred to all Vessels. Moving out of replication directory.") + f"File {fileobj.name} from Directory {fileobj.directory.name} transferred to all Vessels. Moving out of replication directory." + ) if fileobj.exists(): fileobj.moveCompleted() def processQueue(self) -> Optional[str]: - """Return a file from the processing queue - """ + """Return a file from the processing queue""" self._logger.debug( - f"Trying to fetch new file for vessel {self.vessel.name} from queue") + f"Trying to fetch new file for vessel {self.vessel.name} from queue" + ) for f in self._state["files"]: - if (not (f.uuid in self.vessel._uploaded)) and (not (f.directory.name in self.vessel._ignoredirs)): - self._logger.debug( - f"Using file {f.name} for vessel {self.vessel.name}") + if (not (f.uuid in self.vessel._uploaded)) and ( + not (f.directory.name in self.vessel._ignoredirs) + ): + self._logger.debug(f"Using file {f.name} for vessel {self.vessel.name}") return f if f.uuid in self.vessel._uploaded: reason = "already uploaded" else: reason = "Directory ignored" - self._logger.debug( - f"Disregarding file {f.name} for vessel {self.vessel.name} - {reason}") + self._logger.trace( + f"Disregarding file {f.name} for vessel {self.vessel.name} - {reason}" + ) - self._logger.debug( - f"Didn't find any new files for vessel {self.vessel.name}") + self._logger.trace(f"Didn't find any new files for vessel {self.vessel.name}") diff --git a/src/contentmonster/const.py b/src/contentmonster/const.py index dcd67b1..91b7768 100644 --- a/src/contentmonster/const.py +++ b/src/contentmonster/const.py @@ -1,4 +1,4 @@ # Constants for remote file status STATUS_START = -1 -STATUS_COMPLETE = -2 \ No newline at end of file +STATUS_COMPLETE = -2 diff --git a/src/contentmonster/worker.py b/src/contentmonster/worker.py index 5ea85ad..169e62c 100755 --- a/src/contentmonster/worker.py +++ b/src/contentmonster/worker.py @@ -1,18 +1,47 @@ #!/usr/bin/env python3 -from .classes.config import MonsterConfig -from .classes.vesselthread import VesselThread -from .classes.shorethread import ShoreThread +from contentmonster.classes.config import MonsterConfig +from contentmonster.classes.vesselthread import VesselThread +from contentmonster.classes.shorethread import ShoreThread from multiprocessing import Manager +from argparse import ArgumentParser import pathlib import time +import logging +import signal -if __name__ == '__main__': - config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini" +# Setup basic logging +logging.basicConfig(level=logging.INFO) + +def setup_signal_handlers(shore, threads): + def signal_handler(signum, frame): + logging.info("Signal received - stopping threads") + shore.terminate() + for thread in threads: + thread.terminate() + exit(0) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + +def main(): + parser = ArgumentParser(description="ContentMonster Worker") + parser.add_argument("-c", "--config", help="Path to configuration file") + args = parser.parse_args() + + if args.config: + config_path = args.config + else: + config_path = "settings.ini" + config = MonsterConfig() - config.readFile(config_path) + + try: + config.readFile(config_path) + except Exception as e: + logging.error(f"Failed to read configuration: {e}") + return with Manager() as manager: state = manager.dict() @@ -29,12 +58,14 @@ if __name__ == '__main__': shore = ShoreThread(state) shore.start() + setup_signal_handlers(shore, threads) + while True: try: time.sleep(10) - except KeyboardInterrupt: - print("Keyboard interrupt received - stopping threads") - shore.terminate() - for thread in threads: - thread.terminate() - exit() + except Exception as e: + logging.error(f"Unexpected error: {e}") + break + +if __name__ == '__main__': + main() \ No newline at end of file