reportmonster/src/reportmonster/classes/vessel.py

267 lines
9 KiB
Python

from classes.database import Database
from configparser import SectionProxy
from typing import Optional, Union
from datetime import datetime
from MySQLdb.cursors import DictCursor
from bcrypt import hashpw, gensalt
from const import *
class Vessel:
"""Class describing a Vessel
"""
@classmethod
def fromConfig(cls, config: SectionProxy):
"""Create Vessel object from a Vessel section in the Config file
Args:
config (configparser.SectionProxy): Vessel section defining a
Vessel
Raises:
ValueError: Raised if section does not contain Address parameter
Returns:
classes.vessel.Vessel: Vessel object for the vessel specified in
the config section
"""
host = None
username = None
password = None
database = None
ssh = False
ssh_username = None
ssh_password = None
ssh_timeout = 100
ssh_passphrase = None
if "Username" in config.keys():
username = config["Username"]
if "Password" in config.keys():
password = config["Password"]
if "Database" in config.keys():
database = config["Database"]
if "SSH" in config.keys():
if int(config["SSH"]) == 1:
ssh = True
return cls(config.name.split()[1], config["Host"], username, password, database, ssh, ssh_username, ssh_password, ssh_timeout, ssh_passphrase)
def __init__(self, name: str, host: str, username: Optional[str] = None,
password: Optional[str] = None, database: Optional[str] = None,
ssh = False, ssh_username = None, ssh_password = None, ssh_timeout = None, ssh_passphrase = None) -> None:
"""Initialize new Vessel object
Args:
name (str): Name of the Vessel
"""
self.name = name
self.host = host
self.username = username
self.password = password
self.database = database
self.ssh = ssh
self.ssh_username = ssh_username
self.ssh_password = ssh_password
self.ssh_timeout = ssh_timeout
self.ssh_passphrase = ssh_passphrase
self.db = self.connect()
def connect(self):
return Database(self)
@staticmethod
def getTimestamp() -> int:
return int(datetime.now().timestamp())
def getCourses(self) -> list:
results = self.db._execute(QUERY_COURSE, ctype=DictCursor)
return results
def getCourseContext(self, courseid: int) -> Optional[dict]:
results = self.db._execute(QUERY_COURSE_CONTEXT, (courseid,), ctype=DictCursor)
return results
def getUserInfoFields(self) -> list:
results = self.db._execute(QUERY_USER_INFO_FIELD, ctype=DictCursor)
return results
def getUserInfoData(self, field: Optional[int] = None, user: Optional[int] = None) -> list:
query = QUERY_USER_INFO_DATA
parameters = []
if field:
query += " WHERE fieldid = %s"
parameters.append(int(field))
if user:
if query != QUERY_USER_INFO_DATA:
query += " AND "
else:
query += " WHERE "
query += "userid = %s"
parameters.append(int(user))
results = self.db._execute(query, tuple(parameters), ctype=DictCursor)
return results
def getUsers(self, username: Optional[str] = None, id: Optional[int] = None) -> dict:
query = QUERY_USER
parameters = tuple()
if username:
query += f" WHERE username = %s"
parameters = (username,)
elif id:
query += f" WHERE id = %s"
parameters = (int(id),)
results = self.db._execute(query, parameters, ctype=DictCursor)
users = dict()
for result in results:
user = result
result["custom_fields"] = dict()
users[user["id"]] = user
ofields = self.getUserInfoFields()
for ofield in ofields:
odata = self.getUserInfoData(ofield["id"], id)
for value in odata:
try:
users[value["userid"]]["custom_fields"][ofield["shortname"]] = value["data"]
except KeyError:
pass
return users
def getHTMLCerts(self, after: int = 0, before: Optional[int] = None):
before = before or Vessel.getTimestamp()
results = self.db._execute(f"{QUERY_HTML_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}", ctype=DictCursor)
ocerts = self.db._execute(QUERY_HTML_CERT, ctype=DictCursor)
certs = dict()
for ocert in ocerts:
certs[ocert["id"]] = ocert
for result in results:
try:
result["cert"] = certs[result["htmlcertid"]]
except KeyError:
result["cert"] = None
result["certtype"] = "htmlcert"
return results
def getCustomCerts(self, after: int = 0, before: Optional[int] = None):
before = before or Vessel.getTimestamp()
results = self.db._execute(f"{QUERY_CUSTOM_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}", ctype=DictCursor)
ocerts = self.db._execute(QUERY_CUSTOM_CERT, ctype=DictCursor)
certs = dict()
for ocert in ocerts:
certs[ocert["id"]] = ocert
for result in results:
try:
result["cert"] = certs[result["customcertid"]]
except KeyError:
result["cert"] = None
result["certtype"] = "customcert"
return results
def getCerts(self, after: int = 0, before: Optional[int] = None):
before = before or Vessel.getTimestamp()
return sorted(self.getHTMLCerts(after, before) + self.getCustomCerts(after, before), key=lambda d: d["timecreated"])
def setPassword(self, username: str, password: str):
hashed = hashpw(password.encode(), gensalt(prefix=b"2b"))
query = QUERY_USER_SET_PASSWORD
self.db._execute(query, (password, username))
def getEnrols(self, enrol: Optional[str] = None):
results = list(self.db._execute(QUERY_ENROL, ctype=DictCursor))
if enrol:
return list(filter(lambda x: x["enrol"] == enrol, results))
return results
def createEnrol(self, courseid: int, enrol: str = "manual"):
self.db._execute(QUERY_ENROL_INSERT, (enrol, courseid))
def createEnrolment(self, userid: int, courseid: int, enrol: str = "manual"):
enrol = list(filter(lambda x: x["courseid"] == courseid, self.getEnrols(enrol)))
if not enrol:
self.createEnrol(courseid, enrol)
enrol = list(filter(lambda x: x["courseid"] == courseid , self.getEnrols(enrol)))
assert enrol
self.db._execute(QUERY_ENROL_USER, (enrol[0]["id"], userid, Vessel.getTimestamp(), Vessel.getTimestamp()))
def getEnrolments(self):
results = list(self.db._execute(QUERY_ENROLMENTS, ctype=DictCursor))
return results
def createUser(self, username, password, email, firstname, lastname):
email = email or f"{username}@pin.seachefsacademy.com"
self.db._execute(QUERY_USER_CREATE, (username, email, firstname, lastname, Vessel.getTimestamp(), Vessel.getTimestamp()))
self.setPassword(username, password)
def assignRole(self, userid: int, courseid: int, roleid: int = 5):
contextid = self.getCourseContext(courseid)[0]["id"]
self.db._execute(QUERY_ASSIGN_ROLE, (roleid, contextid, userid, Vessel.getTimestamp()))
def getRole(self, userid: int, courseid: int) -> Optional[int]:
contextid = self.getCourseContext(courseid)[0]["id"]
results = self.db._execute(QUERY_GET_ROLE, (contextid, userid), ctype=DictCursor)
if results:
return results[0]["roleid"]
def getUserIdByName(self, username: str) -> Optional[int]:
results = self.db._execute(QUERY_GET_USERID, (username,), ctype=DictCursor)
if results:
return results[0]["id"]
def setEmail(self, userid: int, email: str):
email = email or f"{self.getUsers(id=userid)[userid]['username']}@pin.seachefsacademy.com"
self.db._execute(QUERY_USER_SET_EMAIL, (email, userid))
def getCustomCourseFields(self):
results = list(self.db._execute(QUERY_COURSE_FIELDS, ctype=DictCursor))
return results
def getCourseByContext(self, contextid: int) -> Optional[int]:
results = self.db._execute(QUERY_COURSE_CONTEXT_REVERSE, (contextid,), ctype=DictCursor)
if results:
return results[0]["instanceid"]
def getCourseModules(self, courseid: int):
results = list(self.db._execute(QUERY_COURSE_MODULES, (courseid,), ctype=DictCursor))
return results
def getCourseModuleCompletion(self, moduleid: int):
results = list(self.db._execute(QUERY_MODULE_COMPLETION, (moduleid,), ctype=DictCursor))
return results