reportmonster/src/reportmonster/classes/vessel.py
2022-08-08 10:37:41 +00:00

268 lines
9.1 KiB
Python

from .database import Database
from ..const import *
from configparser import SectionProxy
from typing import Optional, Union
from datetime import datetime
from MySQLdb.cursors import DictCursor
from bcrypt import hashpw, gensalt
class Vessel:
"""Class describing a Vessel
"""
@classmethod
def fromConfig(cls, config: SectionProxy):
"""Create Vessel object from a Vessel section in the Config file
Args:
config (configparser.SectionProxy): Vessel section defining a
Vessel
Raises:
ValueError: Raised if section does not contain Address parameter
Returns:
classes.vessel.Vessel: Vessel object for the vessel specified in
the config section
"""
host = None
username = None
password = None
database = None
ssh = False
ssh_username = None
ssh_password = None
ssh_timeout = 100
ssh_passphrase = None
if "Username" in config.keys():
username = config["Username"]
if "Password" in config.keys():
password = config["Password"]
if "Database" in config.keys():
database = config["Database"]
if "SSH" in config.keys():
if int(config["SSH"]) == 1:
ssh = True
return cls(config.name.split()[1], config["Host"], username, password, database, ssh, ssh_username, ssh_password, ssh_timeout, ssh_passphrase)
def __init__(self, name: str, host: str, username: Optional[str] = None,
password: Optional[str] = None, database: Optional[str] = None,
ssh = False, ssh_username = None, ssh_password = None, ssh_timeout = None, ssh_passphrase = None) -> None:
"""Initialize new Vessel object
Args:
name (str): Name of the Vessel
"""
self.name = name
self.host = host
self.username = username
self.password = password
self.database = database
self.ssh = ssh
self.ssh_username = ssh_username
self.ssh_password = ssh_password
self.ssh_timeout = ssh_timeout
self.ssh_passphrase = ssh_passphrase
self.db = self.connect()
def connect(self):
return Database(self)
@staticmethod
def getTimestamp() -> int:
return int(datetime.now().timestamp())
def getCourses(self) -> list:
results = self.db._execute(QUERY_COURSE, ctype=DictCursor)
return results
def getCourseContext(self, courseid: int) -> Optional[dict]:
results = self.db._execute(QUERY_COURSE_CONTEXT, (courseid,), ctype=DictCursor)
return results
def getUserInfoFields(self) -> list:
results = self.db._execute(QUERY_USER_INFO_FIELD, ctype=DictCursor)
return results
def getUserInfoData(self, field: Optional[int] = None, user: Optional[int] = None) -> list:
query = QUERY_USER_INFO_DATA
parameters = []
if field:
query += " WHERE fieldid = %s"
parameters.append(int(field))
if user:
if query != QUERY_USER_INFO_DATA:
query += " AND "
else:
query += " WHERE "
query += "userid = %s"
parameters.append(int(user))
results = self.db._execute(query, tuple(parameters), ctype=DictCursor)
return results
def getUsers(self, username: Optional[str] = None, id: Optional[int] = None) -> dict:
query = QUERY_USER
parameters = tuple()
if username:
query += f" WHERE username = %s"
parameters = (username,)
elif id:
query += f" WHERE id = %s"
parameters = (int(id),)
results = self.db._execute(query, parameters, ctype=DictCursor)
users = dict()
for result in results:
user = result
result["custom_fields"] = dict()
users[user["id"]] = user
ofields = self.getUserInfoFields()
for ofield in ofields:
odata = self.getUserInfoData(ofield["id"], id)
for value in odata:
try:
users[value["userid"]]["custom_fields"][ofield["shortname"]] = value["data"]
except KeyError:
pass
return users
def getHTMLCerts(self, after: int = 0, before: Optional[int] = None):
before = before or Vessel.getTimestamp()
results = self.db._execute(f"{QUERY_HTML_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}", ctype=DictCursor)
ocerts = self.db._execute(QUERY_HTML_CERT, ctype=DictCursor)
certs = dict()
for ocert in ocerts:
certs[ocert["id"]] = ocert
for result in results:
try:
result["cert"] = certs[result["htmlcertid"]]
except KeyError:
result["cert"] = None
result["certtype"] = "htmlcert"
return results
def getCustomCerts(self, after: int = 0, before: Optional[int] = None):
before = before or Vessel.getTimestamp()
results = self.db._execute(f"{QUERY_CUSTOM_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}", ctype=DictCursor)
ocerts = self.db._execute(QUERY_CUSTOM_CERT, ctype=DictCursor)
certs = dict()
for ocert in ocerts:
certs[ocert["id"]] = ocert
for result in results:
try:
result["cert"] = certs[result["customcertid"]]
except KeyError:
result["cert"] = None
result["certtype"] = "customcert"
return results
def getCerts(self, after: int = 0, before: Optional[int] = None):
before = before or Vessel.getTimestamp()
return sorted(self.getHTMLCerts(after, before) + self.getCustomCerts(after, before), key=lambda d: d["timecreated"])
def setPassword(self, username: str, password: str):
hashed = hashpw(password.encode(), gensalt(prefix=b"2b"))
query = QUERY_USER_SET_PASSWORD
self.db._execute(query, (password, username))
def getEnrols(self, enrol: Optional[str] = None):
results = list(self.db._execute(QUERY_ENROL, ctype=DictCursor))
if enrol:
return list(filter(lambda x: x["enrol"] == enrol, results))
return results
def createEnrol(self, courseid: int, enrol: str = "manual"):
self.db._execute(QUERY_ENROL_INSERT, (enrol, courseid))
def createEnrolment(self, userid: int, courseid: int, enrol: str = "manual"):
enrol = list(filter(lambda x: x["courseid"] == courseid, self.getEnrols(enrol)))
if not enrol:
self.createEnrol(courseid, enrol)
enrol = list(filter(lambda x: x["courseid"] == courseid , self.getEnrols(enrol)))
assert enrol
self.db._execute(QUERY_ENROL_USER, (enrol[0]["id"], userid, Vessel.getTimestamp(), Vessel.getTimestamp()))
def getEnrolments(self):
results = list(self.db._execute(QUERY_ENROLMENTS, ctype=DictCursor))
return results
def createUser(self, username, password, email, firstname, lastname):
email = email or f"{username}@pin.seachefsacademy.com"
self.db._execute(QUERY_USER_CREATE, (username, email, firstname, lastname, Vessel.getTimestamp(), Vessel.getTimestamp()))
self.setPassword(username, password)
def assignRole(self, userid: int, courseid: int, roleid: int = 5):
contextid = self.getCourseContext(courseid)[0]["id"]
self.db._execute(QUERY_ASSIGN_ROLE, (roleid, contextid, userid, Vessel.getTimestamp()))
def getRole(self, userid: int, courseid: int) -> Optional[int]:
contextid = self.getCourseContext(courseid)[0]["id"]
results = self.db._execute(QUERY_GET_ROLE, (contextid, userid), ctype=DictCursor)
if results:
return results[0]["roleid"]
def getUserIdByName(self, username: str) -> Optional[int]:
results = self.db._execute(QUERY_GET_USERID, (username,), ctype=DictCursor)
if results:
return results[0]["id"]
def setEmail(self, userid: int, email: str):
email = email or f"{self.getUsers(id=userid)[userid]['username']}@pin.seachefsacademy.com"
self.db._execute(QUERY_USER_SET_EMAIL, (email, userid))
def getCustomCourseFields(self):
results = list(self.db._execute(QUERY_COURSE_FIELDS, ctype=DictCursor))
return results
def getCourseByContext(self, contextid: int) -> Optional[int]:
results = self.db._execute(QUERY_COURSE_CONTEXT_REVERSE, (contextid,), ctype=DictCursor)
if results:
return results[0]["instanceid"]
def getCourseModules(self, courseid: int):
results = list(self.db._execute(QUERY_COURSE_MODULES, (courseid,), ctype=DictCursor))
return results
def getCourseModuleCompletion(self, moduleid: int):
results = list(self.db._execute(QUERY_MODULE_COMPLETION, (moduleid,), ctype=DictCursor))
return results
def writeLog(self, event, data):
self.db._execute(QUERY_LOG_INSERT, (event, data))