Survive reboot and directory loss

"log" to stdout
This commit is contained in:
Kumi 2021-11-26 11:23:39 +01:00
parent e62af0f71f
commit 82737215fe
9 changed files with 230 additions and 44 deletions

View file

@ -26,7 +26,7 @@ class MonsterConfig:
raise ValueError("Config file does not contain a MONSTER section!")
try:
self.chunksize = parser["MONSTER"]["ChunkSize"]
self.chunksize = int(parser["MONSTER"]["ChunkSize"])
except KeyError:
pass

View file

@ -25,7 +25,8 @@ class Connection:
self._client.load_system_host_keys()
self._client.set_missing_host_key_policy(WarningPolicy)
self._client.connect(vessel.address, vessel.port, vessel.username,
vessel.password, passphrase=vessel.passphrase)
vessel.password, timeout=vessel.timeout,
passphrase=vessel.passphrase)
self._transport = self._client.get_transport()
self._transport.set_keepalive(10)
self._sftp = self._client.open_sftp()
@ -96,6 +97,21 @@ class Connection:
"""
return self._sftp.remove(str(path))
def assertTempDirectory(self) -> None:
"""Make sure that the temp directory exists on the Vessel
Raises:
ValueError: Raised if the path is already in use on the vessel but
is not a directory.
IOError: Raised if the directory does not exist but cannot be
created.
"""
if not self._exists(self._vessel.tempdir):
self._mkdir(self._vessel.tempdir)
elif not self._isdir(self._vessel.tempdir):
raise ValueError(
f"{self._vessel.tempdir} exists but is not a directory on Vessel {self._vessel.name}!")
def assertDirectories(self, directory) -> None:
"""Make sure that destination and temp directories exist on the Vessel
@ -157,7 +173,7 @@ class Connection:
"""
path = path or (self._vessel.tempdir / chunk.getTempName())
flo = BytesIO(chunk.data)
self._sftp.putfo(flo, path, len(chunk.data))
self._sftp.putfo(flo, str(path), len(chunk.data))
def compileComplete(self, remotefile) -> None:
"""Build a complete File from uploaded Chunks.
@ -221,8 +237,8 @@ class Connection:
[type]: [description]
"""
completefile = remotefile.file.getChunk(-1)
destination = remotefile.getFullPath()
self._sftp.rename(
destination = remotefile.file.getFullPath()
self._sftp.posix_rename(
str(self._vessel.tempdir / completefile.getTempName()), str(destination))
# Make sure that file has actually been created at destination

View file

@ -1,10 +1,13 @@
from watchdog.events import (FileSystemEventHandler, FileSystemEvent,
FileCreatedEvent, FileDeletedEvent,
FileCreatedEvent, FileDeletedEvent,
FileModifiedEvent, FileMovedEvent)
from multiprocessing import Queue
from classes.logger import Logger
import os.path
import time
class DogHandler(FileSystemEventHandler):
@ -22,6 +25,7 @@ class DogHandler(FileSystemEventHandler):
super().__init__(*args, **kwargs)
self._directory = directory
self._queue = queue
self._logger = Logger()
def dispatch(self, event: FileSystemEvent):
"""Dispatch events to the appropriate event handlers
@ -39,7 +43,12 @@ class DogHandler(FileSystemEventHandler):
event (watchdog.events.FileCreatedEvent): Event describing the
created file
"""
self._queue.put((self._directory, os.path.basename(event.src_path)))
self._logger.debug(f"Detected creation event of {event.src_path}")
size = os.path.getsize(event.src_path)
time.sleep(5)
if size == os.path.getsize(event.src_path):
self._queue.put((self._directory, os.path.basename(event.src_path)))
def on_modified(self, event: FileModifiedEvent):
"""Put file modification events on the queue
@ -48,7 +57,12 @@ class DogHandler(FileSystemEventHandler):
event (watchdog.events.FileModifiedEvent): Event describing the
modified file
"""
self._queue.put((self._directory, os.path.basename(event.src_path)))
self._logger.debug(f"Detected modification event of {event.src_path}")
size = os.path.getsize(event.src_path)
time.sleep(5)
if size == os.path.getsize(event.src_path):
self._queue.put((self._directory, os.path.basename(event.src_path)))
def on_moved(self, event: FileMovedEvent):
"""Put file move events on the queue
@ -57,6 +71,8 @@ class DogHandler(FileSystemEventHandler):
event (watchdog.events.FileMovedEvent): Event describing the moved
file (source and destination)
"""
self._logger.debug(
f"Detected move event of {event.src_path} to {event.dest_path}")
self._queue.put((self._directory, os.path.basename(event.src_path)))
self._queue.put((self._directory, os.path.basename(event.dest_path)))
@ -67,4 +83,5 @@ class DogHandler(FileSystemEventHandler):
event (watchdog.events.FileDeletedEvent): Event describing the
deleted file
"""
self._logger.debug(f"Detected deletion event of {event.src_path}")
self._queue.put((self._directory, os.path.basename(event.src_path)))

View file

@ -28,7 +28,7 @@ class File:
self.directory = directory
if not self.exists():
raise FileNotFoundError(f"File {self.name} does not exist in {self.directory}!")
raise FileNotFoundError(f"File {self.name} does not exist in {self.directory.name}!")
self.uuid = uuid or self.getUUID()

9
classes/logger.py Normal file
View file

@ -0,0 +1,9 @@
import logging
class Logger:
def debug(self, message):
print(message)
def info(self, message):
print(message)

View file

@ -1,4 +1,4 @@
from paramiko.ssh_exception import SSHException
from paramiko.ssh_exception import SSHException, NoValidConnectionsError
class retry:
"""Decorator used to automatically retry operations throwing exceptions
@ -9,9 +9,10 @@ class retry:
Args:
exceptions (tuple, optional): A tuple containing exception classes
that should be handled by the decorator. If none, handle only
paramiko.ssh_exception.SSHException. Defaults to None.
paramiko.ssh_exception.SSHException/NoValidConnectionsError.
Defaults to None.
"""
self.exceptions = exceptions or (SSHException,)
self.exceptions = exceptions or (SSHException, NoValidConnectionsError)
def __call__(self, f):
"""Return a function through the retry decorator

View file

@ -2,6 +2,7 @@ from classes.config import MonsterConfig
from classes.doghandler import DogHandler
from classes.directory import Directory
from classes.database import Database
from classes.logger import Logger
from watchdog.observers import Observer
@ -25,6 +26,7 @@ class ShoreThread(Process):
self._dogs = []
self._state = state
self.queue = Queue()
self._logger = Logger()
def getAllFiles(self) -> list:
"""Return File objects for all files in all Directories
@ -32,9 +34,12 @@ class ShoreThread(Process):
Returns:
list: List of all File objects discovered
"""
self._logger.debug("Getting all files from Shore")
files = []
for directory in self._state["config"].directories:
self._logger.debug(f"Getting all files in {directory.name}")
for f in directory.getFiles():
files.append(f)
@ -43,13 +48,14 @@ class ShoreThread(Process):
def clearFiles(self) -> None:
"""Clear the files variable in the application state
"""
self._logger.debug("Clearing global files variable")
del self._state["files"][:]
def monitor(self) -> None:
"""Initialize monitoring of Directories specified in configuration
"""
for directory in self._state["config"].directories:
print("Creating dog for " + str(directory.location))
self._logger.debug("Creating dog for " + str(directory.location))
handler = DogHandler(directory, self.queue)
dog = Observer()
dog.schedule(handler, str(directory.location), False)
@ -59,8 +65,11 @@ class ShoreThread(Process):
def run(self) -> NoReturn:
"""Launch the ShoreThread and start monitoring for file changes
"""
print("Launched Shore Thread")
self.getAllFiles()
self._logger.info("Launched Shore Thread")
for f in self.getAllFiles():
self._state["files"].append(f)
self.monitor()
while True:
@ -70,6 +79,7 @@ class ShoreThread(Process):
def joinDogs(self) -> None:
"""Join observers to receive updates on the queue
"""
self._logger.debug("Joining dogs")
for dog in self._dogs:
dog.join(1)
@ -80,12 +90,23 @@ class ShoreThread(Process):
directory (Directory): Directory (previously) containing the File
name (str): Filename of the deleted File
"""
# Remove file from processing queue
for f in self._state["files"]:
if f.directory == directory and f.name == name:
del(f)
self._logger.debug(f"Purging file {name} from {directory.name}")
# Remove file from database
# Remove file from processing queue
outlist = []
for f in self._state["files"]:
if f.directory.name == directory.name and f.name == name:
self._logger.debug(f"Found {name} in files queue, deleting.")
else:
outlist.append(f)
self.clearFiles()
for f in outlist:
self._state["files"].append(f)
# Remove file from database
self._logger.debug(f"Purging file {name} from database")
db = Database()
db.removeFile(directory, name)
@ -95,19 +116,27 @@ class ShoreThread(Process):
Args:
fileobj (classes.file.File): File object to add to the queue
"""
found = False
self._logger.debug(f"Adding file {fileobj.name} to directory {fileobj.directory.name}")
outlist = []
for f in self._state["files"]:
if f.directory == fileobj.directory and f.name == fileobj.name:
if f.directory.name == fileobj.directory.name and f.name == fileobj.name:
self._logger.debug(f"File {fileobj.name} already in processing queue")
if f.uuid != fileobj.uuid:
del(f)
self._logger.debug("UUID does not match - deleting entry")
else:
found = True
self._logger.debug("Found duplicate - deleting")
else:
outlist.append(f)
if not found:
# Do not queue file if it is of size 0
if os.path.getsize(str(fileobj.getFullPath())):
self._state["files"].append(fileobj)
self._logger.debug(f"File {fileobj.name} not in processing queue (anymore) - adding")
outlist.append(fileobj)
self.clearFiles()
for f in outlist:
self._state["files"].append(f)
def processFile(self, directory: Directory, name: str) -> None:
"""Process a file entry from the observer queue
@ -118,6 +147,7 @@ class ShoreThread(Process):
name (str): Filename of the created, deleted, modified or moved
File
"""
self._logger.debug(f"Processing change to file {name} in directory {directory.name}")
if (fileobj := directory.getFile(name)):
self.addFile(fileobj)
else:
@ -130,12 +160,14 @@ class ShoreThread(Process):
"directory" is a Directory object, and "basename" is the name of a
File that has been created, moved, modified or deleted.
"""
self._logger.debug("Waiting for new changes...")
directory, basename = self.queue.get() # Will block until an event is found
self.processFile(directory, basename)
def terminate(self, *args, **kwargs) -> NoReturn:
"""Terminate observer threads, then terminate self
"""
self._logger.info("Terminating dogs and shore thread")
for dog in self._dogs:
dog.terminate()
dog.join()

View file

@ -59,7 +59,8 @@ class Vessel:
def __init__(self, name: str, address: str, username: Optional[str] = None,
password: Optional[str] = None, passphrase: Optional[str] = None,
port: Optional[int] = None, tempdir: Optional[Union[str, pathlib.Path]] = None) -> None:
port: Optional[int] = None, timeout: Optional[int] = None,
tempdir: Optional[Union[str, pathlib.Path]] = None) -> None:
"""Initialize new Vessel object
Args:
@ -75,6 +76,7 @@ class Vessel:
self.password = password
self.passphrase = passphrase
self.port = port or 22
self.timeout = timeout or 10
self._connection = None
self._uploaded = self.getUploadedFromDB() # Files already uploaded
@ -89,8 +91,8 @@ class Vessel:
if self._connection:
try:
# ... check if it is up
self._connection._listdir()
except SSHException:
self._connection._listdir(".")
except (SSHException, OSError):
# ... and throw it away if it isn't
self._connection = None
@ -107,20 +109,23 @@ class Vessel:
db = Database()
return db.getCompletionForVessel(self)
def currentUpload(self) -> Optional[File]:
def currentUpload(self) -> Optional[tuple[str, str, str]]:
"""Get the File that is currently being uploaded to this Vessel
Returns:
classes.file.File: File object representing the file currently
being uploaded, if any
tuple: A tuple consisting of (directory, name, checksum), where
"directory" is the name of the Directory object the File is
located in, "name" is the filename (basename) of the File and
checksum is the SHA256 hash of the file at the time of insertion
into the database. None is returned if no such record is found.
"""
self.assertTempDirectory() # After a reboot, the tempdir may be gone
db = Database()
output = db.getFileByUUID(
fileuuid := self.connection.getCurrentUploadUUID())
if output:
directory, name, _ = output
return File(name, directory, fileuuid)
output = db.getFileByUUID(self.connection.getCurrentUploadUUID())
del db
return output
def clearTempDir(self) -> None:
"""Clean up the temporary directory on the Vessel
@ -138,7 +143,7 @@ class Vessel:
Vessel configuration and name provided by Chunk object. Defaults
to None.
"""
self.connection.pushChunk(chunk, path)
self.connection.pushChunk(chunk, str(path) if path else None)
def compileComplete(self, remotefile) -> None:
"""Build a complete File from uploaded Chunks.
@ -148,3 +153,29 @@ class Vessel:
describing the uploaded File
"""
self.connection.compileComplete(remotefile)
def assertDirectories(self, directory) -> None:
"""Make sure that destination and temp directories exist on the Vessel
Args:
directory (classes.directory.Directory): Directory object
representing the directory to check
Raises:
ValueError: Raised if a path is already in use on the vessel but
not a directory.
IOError: Raised if a directory that does not exist cannot be
created.
"""
self.connection.assertDirectories(directory)
def assertTempDirectory(self) -> None:
"""Make sure that the temp directory exists on the Vessel
Raises:
ValueError: Raised if the path is already in use on the vessel but
is not a directory.
IOError: Raised if the directory does not exist but cannot be
created.
"""
self.connection.assertTempDirectory()

View file

@ -5,6 +5,8 @@ from classes.vessel import Vessel
from classes.remotefile import RemoteFile
from classes.retry import retry
from classes.database import Database
from classes.logger import Logger
from classes.file import File
from const import STATUS_COMPLETE, STATUS_START
import time
@ -24,53 +26,131 @@ class VesselThread(Process):
super().__init__()
self.vessel = vessel
self._state = state
self._logger = Logger()
def run(self) -> NoReturn:
"""Run thread and process uploads to the vessel
"""
print("Launched Vessel Thread for " + self.vessel.name)
self.assertDirectories()
while True:
try:
self.upload()
time.sleep(5)
except Exception as e:
print("An exception occurred in the Vessel Thread for " +
self.vessel.name)
print(repr(e))
@retry()
def assertDirectories(self) -> None:
for directory in self._state["config"].directories:
print(
f"Making sure directory {directory.name} exists on Vessel {self.vessel.name}")
self.vessel.connection.assertDirectories(directory)
@retry()
def upload(self) -> None:
"""Continue uploading process
"""
if not (current := self.vessel.currentUpload() or self.processQueue()):
if not (current := (self.vessel.currentUpload() or self.processQueue())):
self._logger.debug(
f"No file needs to be uploaded to Vessel {self.vessel.name} at the moment")
return
remotefile = RemoteFile(current, self.vessel,
if isinstance(current, tuple):
dirname, name, _ = current
self._logger.debug(
f"Found file {name} in directory {dirname} for vessel {self.vessel.name}")
directory = None
for d in self._state["config"].directories:
if d.name == dirname:
directory = d
break
if not directory:
self._logger.debug(
f"Directory {dirname} not specified in config - deleting File from Vessel {self.name}")
self.vessel.clearTempDir()
return
try:
fileobj = File(name, directory)
except FileNotFoundError:
self._logger.debug(
f"File {name} does not exist in Directory {dirname} on shore - deleting from Vessel {self.name}")
self.vessel.clearTempDir()
return
else:
fileobj = current
remotefile = RemoteFile(fileobj, self.vessel,
self._state["config"].chunksize)
self._logger.debug(
f"Start processing file {fileobj.name} in directory {fileobj.directory.name} on vessel {self.vessel.name}")
while True:
db = Database()
if not db.getFileByUUID(fileobj.uuid):
self._logger.debug(
f"File {fileobj.name} in directory {fileobj.directory.name} does not exist anymore - deleting from {self.vessel.name}")
self.vessel.clearTempDir()
del(db)
self.vessel.assertDirectories(fileobj.directory)
status = remotefile.getStatus()
if status == STATUS_COMPLETE:
self._logger.debug(
f"File {fileobj.name} uploaded to vessel {self.vessel.name} completely - finalizing")
remotefile.finalizeUpload()
db = Database()
db.logCompletion(current, self.vessel)
db.logCompletion(fileobj, self.vessel)
del(db)
self.vessel._uploaded.append(fileobj.uuid)
self._logger.debug(
f"Moved {fileobj.name} to its final destination on {self.vessel.name} - done!")
return
nextchunk = 0 if status == STATUS_START else status + 1
self._logger.debug(
f"Getting chunk #{nextchunk} for file {fileobj.name} for vessel {self.vessel.name}")
chunk = remotefile.getChunk(nextchunk)
self._logger.debug("Got chunk")
# If the Chunk has no data, the selected range is beyond the end
# of the file, i.e. the complete file has already been uploaded
if chunk.data:
self._logger.debug(
f"Uploading chunk to vessel {self.vessel.name}")
self.vessel.pushChunk(chunk)
else:
self._logger.debug(
f"No more data to upload to vessel {self.vessel.name} for file {fileobj.name} - compiling")
self.vessel.compileComplete(remotefile)
def processQueue(self) -> Optional[str]:
"""Return a file from the processing queue
"""
self._logger.debug(
f"Trying to fetch new file for vessel {self.vessel.name} from queue")
for f in self._state["files"]:
if not f.uuid in self.vessel._uploaded:
self._logger.debug(
f"Using file {f.name} for vessel {self.vessel.name}")
return f
self._logger.debug(
f"Disregarding file {f.name} for vessel {self.vessel.name} - already uploaded")
self._logger.debug(
f"Didn't find any new files for vessel {self.vessel.name}")