commit 5f8f94284f13f09fdd8147ac7a5b725f351e88cb Author: Klaus-Uwe Mitterer Date: Sat Nov 20 15:40:07 2021 +0100 Current code status diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a4a534e --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +__pycache__/ +*.pyc +*.swp +settings.ini +database.sqlite3 +venv/ +.vscode \ No newline at end of file diff --git a/__main__.py b/__main__.py new file mode 100644 index 0000000..e69de29 diff --git a/classes/__init__.py b/classes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/classes/chunk.py b/classes/chunk.py new file mode 100644 index 0000000..fe33592 --- /dev/null +++ b/classes/chunk.py @@ -0,0 +1,13 @@ +import hashlib + +class Chunk: + def __init__(self, fileobj, count, data): + self.file = fileobj + self.count = count if count >= 0 else "complete" + self.data = data + + def getTempName(self): + return f"{self.file.uuid}_{self.count}.part" + + def getHash(self): + return hashlib.sha256(self.data).hexdigest() \ No newline at end of file diff --git a/classes/config.py b/classes/config.py new file mode 100644 index 0000000..63d113e --- /dev/null +++ b/classes/config.py @@ -0,0 +1,25 @@ +import configparser + +from classes.vessel import Vessel +from classes.directory import Directory + +class MonsterConfig: + @classmethod + def fromFile(cls, path): + parser = configparser.ConfigParser() + parser.read(path) + + if not "MONSTER" in parser.sections(): + raise ValueError("Config file does not contain a MONSTER section!") + + directories = [] + vessels = [] + + for section in parser.sections(): + if section.startswith("Directory"): + directories.append(Directory.fromConfig(parser[section])) + elif section.startswith("Vessel"): + vessels.append(Vessel.fromConfig(parser[section])) + + def __init__(self): + pass \ No newline at end of file diff --git a/classes/connection.py b/classes/connection.py new file mode 100644 index 0000000..22c407a --- /dev/null +++ b/classes/connection.py @@ -0,0 +1,75 @@ +import paramiko as pikuniku # :P + +from paramiko.client import SSHClient + +from io import BytesIO + +import errno +import stat + +class Connection: + def __init__(self, vessel): + self._vessel = vessel + self._client = SSHClient() + self._client.load_system_host_keys() + self._client.connect(vessel.address) + self._transport = self._client.get_transport() + self._transport.set_keepalive(10) + self._sftp = self._client.open_sftp() + + def _exists(self, path): + try: + self._sftp.stat(str(path)) + return True + except FileNotFoundError: + return False + + def _isdir(self, path): + return stat.S_ISDIR(self._sftp.lstat(str(path)).st_mode) + + def _mkdir(self, path): + return self._sftp.mkdir(str(path)) + + def _listdir(self, path): + return self._sftp.listdir(str(path)) + + def _remove(self, path): + return self._sftp.remove(str(path)) + + def assertTempDirectory(self, directory): + for d in [directory, directory.tempdir]: + if not self._exists(d): + self._mkdir(d) + elif not self._isdir(d): + raise ValueError(f"{d} exists but is not a directory on Vessel {self._vessel.name}!") + + def assertChunkComplete(self, chunk): + path = chunk.file.directory.tempdir / chunk.getTempName() + + if self._exists(path): + _,o,_ = self._client.exec_command("sha256sum -b " + str(path)) + o.channel.recv_exit_status() + if not o.readline().split()[0] == chunk.getHash(): + self._remove(path) + else: + return True + return False + + def pushChunk(self, chunk): + path = chunk.file.directory.tempdir / chunk.getTempName() + flo = BytesIO(chunk.data) + self._sftp.putfo(flo, path, len(chunk.data)) + + def compileComplete(self, remotefile): + numchunks = remotefile.getStatus() + 1 + files = " ".join([str(remotefile.file.directory.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)]) + completefile = remotefile.file.getChunk(-1) + outname = completefile.getTempName() + outpath = remotefile.file.directory.tempdir / outname + _,o,_ = self._client.exec_command(f"cat {files} > {outpath}") + o.channel.recv_exit_status() + + return self.assertChunkComplete(completefile) + + def __del__(self): + self._client.close() \ No newline at end of file diff --git a/classes/database.py b/classes/database.py new file mode 100644 index 0000000..0470d86 --- /dev/null +++ b/classes/database.py @@ -0,0 +1,76 @@ +import sqlite3 +import pathlib +import uuid + +class Database: + def __init__(self, filename=None): + filename = filename or pathlib.Path(__file__).parent.parent.absolute() / "database.sqlite3" + self._con = sqlite3.connect(filename) + self.migrate() + + def _execute(self, query, parameters=None): + cur = self.getCursor() + cur.execute(query, parameters) + self.commit() + + def commit(self): + return self._con.commit() + + def getCursor(self): + return self._con.cursor() + + def getVersion(self): + cur = self.getCursor() + try: + cur.execute("SELECT value FROM contentmonster_settings WHERE key = 'dbversion'") + assert (version := cur.fetchone()) + return int(version[0]) + except (sqlite3.OperationalError, AssertionError): + return 0 + + def getFileUUID(self, fileobj): + hash = fileobj.getHash() + + cur = self.getCursor() + cur.execute("SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?", (fileobj.directory.name, fileobj.name)) + + fileuuid = None + for result in cur.fetchall(): + if result[1] == hash: + fileuuid = result[0] + else: + self.removeFileByUUID(result[0]) + + return fileuuid or self.addFile(fileobj, hash) + + def addFile(self, fileobj, hash=None): + hash = hash or fileobj.getHash() + fileuuid = str(uuid.uuid4()) + self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", (fileuuid, fileobj.directory.name, fileobj.name, hash)) + return fileuuid + + def removeFileByUUID(self, fileuuid): + self._execute("DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) + + def logStart(self, file, vessel): + self._execute("INSERT INTO contentmonster_file_log(file, vessel, status) VALUES(?, ?, ?)", (file.uuid, vessel.name, False)) + + def logCompletion(self, file, vessel): + self._execute("UPDATE contentmonster_file_log SET status = ? WHERE file = ? AND vessel = ?", (True, file.uuid, vessel.name)) + + def migrate(self): + cur = self.getCursor() + + if self.getVersion() == 0: + cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)") + cur.execute("INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')") + self.commit() + + if self.getVersion() == 1: + cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))") + cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), status BOOLEAN, PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)") + cur.execute("UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'") + self.commit() + + def __del__(self): + self._con.close() \ No newline at end of file diff --git a/classes/directory.py b/classes/directory.py new file mode 100644 index 0000000..03c6c05 --- /dev/null +++ b/classes/directory.py @@ -0,0 +1,28 @@ +from classes.file import File + +import os +import pathlib + +class Directory: + @classmethod + def fromConfig(cls, config): + if "Location" in config.keys(): + return cls(config.name.split()[1], config["Location"]) + else: + raise ValueError("Definition for Directory " + config.name.split()[1] + " does not contain Location!") + + def __init__(self, name, location): + self.name = name + + if os.path.isdir(location): + self.location = pathlib.Path(location) + else: + raise ValueError(f"Location {location} for Directory {name} does not exist or is not a directory.") + + def getFiles(self): + files = [f for f in os.listdir(self.location) if os.path.isfile] + return [File(f, self) for f in files] + + @property + def tempdir(self): + return self.location / ".temp" \ No newline at end of file diff --git a/classes/file.py b/classes/file.py new file mode 100644 index 0000000..8755cb4 --- /dev/null +++ b/classes/file.py @@ -0,0 +1,27 @@ +from classes.chunk import Chunk +from classes.database import Database + +import hashlib + +class File: + def getUUID(self): + db = Database() + db.getFileUUID(self) + + def __init__(self, name, directory, uuid=None): + self.name = name + self.directory = directory + self.uuid = uuid or self.getUUID() + + def getFullPath(self): + return self.directory / self.name + + def getHash(self): + return self.getChunk(-1).getHash() + + def getChunk(self, count, size=1048576): + with open(self.getFullPath(), "rb") as binary: + binary.seek((count * size) if count > 0 else 0) + data = binary.read(size if count >= 0 else None) + + return Chunk(self, count, data) \ No newline at end of file diff --git a/classes/remotefile.py b/classes/remotefile.py new file mode 100644 index 0000000..16c4691 --- /dev/null +++ b/classes/remotefile.py @@ -0,0 +1,43 @@ +STATUS_START = -1 +STATUS_COMPLETE = -2 + +class RemoteFile: + def __init__(self, fileobj, vessel, chunksize=1048576): + self.file = fileobj + self.vessel = vessel + self.tempdir = self.vessel.connection.assertTempDirectory(self.file.directory) + self.chunksize = chunksize + + def getStatus(self): + ls = self.vessel.connection._listdir(self.tempdir) + files = [f for f in ls if f.startswith(self.file.uuid) and f.endswith(".part")] + + ids = [-1] + + for f in files: + part = f.split("_")[1].split(".")[0] + if part == "complete": + if self.validateComplete(): + return STATUS_COMPLETE + ids.append(int(part)) + + count = max(ids) + + while count >= 0: + if self.validateChunk(count): + return count + count -=1 + + return STATUS_START + + def validateChunk(self, count): + return self.vessel.connection.assertChunkComplete(self.getChunk(count)) + + def validateComplete(self): + return self.validateChunk(-1) + + def compileComplete(self): + self.vessel.connection.compileComplete(self) + + def getChunk(self, count): + return self.file.getChunk(count, self.chunksize) \ No newline at end of file diff --git a/classes/retry.py b/classes/retry.py new file mode 100644 index 0000000..298e7bf --- /dev/null +++ b/classes/retry.py @@ -0,0 +1,13 @@ +from paramiko.ssh_exception import SSHException + +class retry: + def __init__(self, exceptions=None): + self.exceptions = exceptions or (SSHException,) + + def __call__(self, f): + def wrapped_f(*args): + try: + f(*args) + except self.exceptions as e: + print("Caught expected exception: " + e) + return wrapped_f \ No newline at end of file diff --git a/classes/vessel.py b/classes/vessel.py new file mode 100644 index 0000000..dcd6cf4 --- /dev/null +++ b/classes/vessel.py @@ -0,0 +1,28 @@ +from classes.connection import Connection + +from paramiko.ssh_exception import SSHException + +class Vessel: + @classmethod + def fromConfig(cls, config): + if "Address" in config.keys(): + return cls(config.name.split()[1], config["Address"]) + else: + raise ValueError("Definition for Vessel " + config.name.split()[1] + " does not contain Address!") + + def __init__(self, name: str, address: str): + self.name = name + self.address = address + self._connection = None + + @property + def connection(self): + if self._connection: + try: + self._connection._listdir() + return self._connection + except SSHException: + self._connection = None + self._connection = Connection(self) + + def currentUpload() \ No newline at end of file diff --git a/classes/vesselthread.py b/classes/vesselthread.py new file mode 100644 index 0000000..6d1793e --- /dev/null +++ b/classes/vesselthread.py @@ -0,0 +1,9 @@ +from multiprocessing import Process + +class VesselThread(Process): + def __init__(self, vessel, files): + super().__init__() + self.vessel = vessel + + def run(self): + pass \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..aa35c71 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +paramiko \ No newline at end of file diff --git a/settings.example.ini b/settings.example.ini new file mode 100644 index 0000000..a6f3b29 --- /dev/null +++ b/settings.example.ini @@ -0,0 +1,9 @@ +[MONSTER] +Database = database.sqlite3 +ChunkSize = 1048576 + +[Directory sampledir] +Location = /home/user/replication + +[Vessel samplevessel] +Address = example.com diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..b543820 --- /dev/null +++ b/worker.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 + +from classes.config import MonsterConfig +from classes.vesselthread import VesselThread + +from multiprocessing import Manager + +import pathlib + +config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini" + +config = MonsterConfig.fromFile(settings_path) +