365 lines
11 KiB
Python
365 lines
11 KiB
Python
from .database import Database
|
|
|
|
from configparser import SectionProxy
|
|
from typing import Optional, Union
|
|
from datetime import datetime
|
|
|
|
from MySQLdb.cursors import DictCursor
|
|
from MySQLdb._exceptions import IntegrityError
|
|
from bcrypt import hashpw, gensalt
|
|
|
|
from ..const import *
|
|
|
|
|
|
class Vessel:
|
|
"""Class describing a Vessel"""
|
|
|
|
@classmethod
|
|
def fromConfig(cls, config: SectionProxy):
|
|
"""Create Vessel object from a Vessel section in the Config file
|
|
|
|
Args:
|
|
config (configparser.SectionProxy): Vessel section defining a
|
|
Vessel
|
|
|
|
Raises:
|
|
ValueError: Raised if section does not contain Address parameter
|
|
|
|
Returns:
|
|
classes.vessel.Vessel: Vessel object for the vessel specified in
|
|
the config section
|
|
"""
|
|
|
|
host = None
|
|
username = None
|
|
password = None
|
|
database = None
|
|
ssh = False
|
|
ssh_username = None
|
|
ssh_password = None
|
|
ssh_timeout = 100
|
|
ssh_passphrase = None
|
|
|
|
if "Username" in config.keys():
|
|
username = config["Username"]
|
|
|
|
if "Password" in config.keys():
|
|
password = config["Password"]
|
|
|
|
if "Database" in config.keys():
|
|
database = config["Database"]
|
|
|
|
if "SSH" in config.keys():
|
|
if config.getint("SSH") == 1:
|
|
ssh = True
|
|
|
|
ssh_username = config.get("SSHUser")
|
|
|
|
return cls(
|
|
config.name.split()[1],
|
|
config["Host"],
|
|
username,
|
|
password,
|
|
database,
|
|
ssh,
|
|
ssh_username,
|
|
ssh_password,
|
|
ssh_timeout,
|
|
ssh_passphrase,
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
host: str,
|
|
username: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
database: Optional[str] = None,
|
|
ssh=False,
|
|
ssh_username=None,
|
|
ssh_password=None,
|
|
ssh_timeout=None,
|
|
ssh_passphrase=None,
|
|
) -> None:
|
|
"""Initialize new Vessel object
|
|
|
|
Args:
|
|
name (str): Name of the Vessel
|
|
"""
|
|
self.name = name
|
|
self.host = host
|
|
self.username = username
|
|
self.password = password
|
|
self.database = database
|
|
self.ssh = ssh
|
|
self.ssh_username = ssh_username
|
|
self.ssh_password = ssh_password
|
|
self.ssh_timeout = ssh_timeout
|
|
self.ssh_passphrase = ssh_passphrase
|
|
|
|
self.db = self.connect()
|
|
|
|
def connect(self) -> Database:
|
|
return Database(self)
|
|
|
|
def reconnect(self):
|
|
self.db = self.connect()
|
|
|
|
@staticmethod
|
|
def getTimestamp() -> int:
|
|
return int(datetime.now().timestamp())
|
|
|
|
def getCourses(self) -> list:
|
|
results = self.db._execute(QUERY_COURSE, ctype=DictCursor)
|
|
return results
|
|
|
|
def getCourseContext(self, courseid: int) -> Optional[dict]:
|
|
results = self.db._execute(QUERY_COURSE_CONTEXT, (courseid,), ctype=DictCursor)
|
|
return results
|
|
|
|
def getUserInfoFields(self) -> list:
|
|
results = self.db._execute(QUERY_USER_INFO_FIELD, ctype=DictCursor)
|
|
return results
|
|
|
|
def getUserInfoData(
|
|
self, field: Optional[int] = None, user: Optional[int] = None
|
|
) -> list:
|
|
query = QUERY_USER_INFO_DATA
|
|
parameters = []
|
|
|
|
if field:
|
|
query += " WHERE fieldid = %s"
|
|
parameters.append(int(field))
|
|
|
|
if user:
|
|
if query != QUERY_USER_INFO_DATA:
|
|
query += " AND "
|
|
else:
|
|
query += " WHERE "
|
|
|
|
query += "userid = %s"
|
|
parameters.append(int(user))
|
|
|
|
results = self.db._execute(query, tuple(parameters), ctype=DictCursor)
|
|
|
|
return results
|
|
|
|
def getUsers(
|
|
self, username: Optional[str] = None, id: Optional[int] = None
|
|
) -> dict:
|
|
query = QUERY_USER
|
|
parameters = tuple()
|
|
|
|
if username:
|
|
query += f" WHERE username = %s"
|
|
parameters = (username,)
|
|
|
|
elif id:
|
|
query += f" WHERE id = %s"
|
|
parameters = (int(id),)
|
|
|
|
results = self.db._execute(query, parameters, ctype=DictCursor)
|
|
|
|
users = dict()
|
|
|
|
for result in results:
|
|
user = result
|
|
result["custom_fields"] = dict()
|
|
users[user["id"]] = user
|
|
|
|
ofields = self.getUserInfoFields()
|
|
|
|
for ofield in ofields:
|
|
odata = self.getUserInfoData(ofield["id"], id)
|
|
|
|
for value in odata:
|
|
try:
|
|
users[value["userid"]]["custom_fields"][
|
|
ofield["shortname"]
|
|
] = value["data"]
|
|
except KeyError:
|
|
pass
|
|
|
|
return users
|
|
|
|
def getHTMLCerts(self, after: int = 0, before: Optional[int] = None):
|
|
before = before or Vessel.getTimestamp()
|
|
results = self.db._execute(
|
|
f"{QUERY_HTML_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}",
|
|
ctype=DictCursor,
|
|
)
|
|
ocerts = self.db._execute(QUERY_HTML_CERT, ctype=DictCursor)
|
|
|
|
certs = dict()
|
|
|
|
for ocert in ocerts:
|
|
certs[ocert["id"]] = ocert
|
|
|
|
for result in results:
|
|
try:
|
|
result["cert"] = certs[result["htmlcertid"]]
|
|
except KeyError:
|
|
result["cert"] = None
|
|
|
|
result["certtype"] = "htmlcert"
|
|
|
|
return results
|
|
|
|
def getCustomCerts(self, after: int = 0, before: Optional[int] = None):
|
|
before = before or Vessel.getTimestamp()
|
|
results = self.db._execute(
|
|
f"{QUERY_CUSTOM_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}",
|
|
ctype=DictCursor,
|
|
)
|
|
ocerts = self.db._execute(QUERY_CUSTOM_CERT, ctype=DictCursor)
|
|
|
|
certs = dict()
|
|
|
|
for ocert in ocerts:
|
|
certs[ocert["id"]] = ocert
|
|
|
|
for result in results:
|
|
try:
|
|
result["cert"] = certs[result["customcertid"]]
|
|
except KeyError:
|
|
result["cert"] = None
|
|
|
|
result["certtype"] = "customcert"
|
|
|
|
return results
|
|
|
|
def getCerts(self, after: int = 0, before: Optional[int] = None):
|
|
before = before or Vessel.getTimestamp()
|
|
return sorted(
|
|
self.getHTMLCerts(after, before) + self.getCustomCerts(after, before),
|
|
key=lambda d: d["timecreated"],
|
|
)
|
|
|
|
def setPassword(self, username: str, password: str):
|
|
hashed = hashpw(password.encode(), gensalt(prefix=b"2b"))
|
|
query = QUERY_USER_SET_PASSWORD
|
|
self.db._execute(query, (hashed, username))
|
|
|
|
def getEnrols(self, enrol: Optional[str] = None):
|
|
results = list(self.db._execute(QUERY_ENROL, ctype=DictCursor))
|
|
|
|
if enrol:
|
|
return list(filter(lambda x: x["enrol"] == enrol, results))
|
|
|
|
return results
|
|
|
|
def createEnrol(self, courseid: int, enrol: str = "manual"):
|
|
self.db._execute(QUERY_ENROL_INSERT, (enrol, courseid))
|
|
|
|
def createEnrolment(self, userid: int, courseid: int, enrol: str = "manual"):
|
|
enrol = list(filter(lambda x: x["courseid"] == courseid, self.getEnrols(enrol)))
|
|
|
|
if not enrol:
|
|
self.createEnrol(courseid, enrol)
|
|
enrol = list(
|
|
filter(lambda x: x["courseid"] == courseid, self.getEnrols(enrol))
|
|
)
|
|
|
|
assert enrol
|
|
|
|
self.db._execute(
|
|
QUERY_ENROL_USER,
|
|
(enrol[0]["id"], userid, Vessel.getTimestamp(), Vessel.getTimestamp()),
|
|
)
|
|
|
|
def getEnrolments(self):
|
|
results = list(self.db._execute(QUERY_ENROLMENTS, ctype=DictCursor))
|
|
return results
|
|
|
|
def createUser(self, username, password, email, firstname, lastname):
|
|
email = email or f"{username}@pin.seachefsacademy.com"
|
|
self.db._execute(
|
|
QUERY_USER_CREATE,
|
|
(
|
|
username,
|
|
email,
|
|
firstname,
|
|
lastname,
|
|
Vessel.getTimestamp(),
|
|
Vessel.getTimestamp(),
|
|
),
|
|
)
|
|
self.setPassword(username, password)
|
|
|
|
def assignRole(self, userid: int, courseid: int, roleid: int = 5):
|
|
contextid = self.getCourseContext(courseid)[0]["id"]
|
|
self.db._execute(
|
|
QUERY_ASSIGN_ROLE, (roleid, contextid, userid, Vessel.getTimestamp())
|
|
)
|
|
|
|
def getRole(self, userid: int, courseid: int) -> Optional[int]:
|
|
contextid = self.getCourseContext(courseid)[0]["id"]
|
|
results = self.db._execute(
|
|
QUERY_GET_ROLE, (contextid, userid), ctype=DictCursor
|
|
)
|
|
if results:
|
|
return results[0]["roleid"]
|
|
|
|
def getUserIdByName(self, username: str) -> Optional[int]:
|
|
results = self.db._execute(QUERY_GET_USERID, (username,), ctype=DictCursor)
|
|
if results:
|
|
return results[0]["id"]
|
|
|
|
def setEmail(self, userid: int, email: str):
|
|
email = (
|
|
email
|
|
or f"{self.getUsers(id=userid)[userid]['username']}@pin.seachefsacademy.com"
|
|
)
|
|
self.db._execute(QUERY_USER_SET_EMAIL, (email, userid))
|
|
|
|
def setName(self, userid: int, first: str, last: str):
|
|
self.db._execute(QUERY_USER_SET_NAME, (first, last, userid))
|
|
|
|
def getCustomCourseFields(self):
|
|
results = list(self.db._execute(QUERY_COURSE_FIELDS, ctype=DictCursor))
|
|
return results
|
|
|
|
def getCourseByContext(self, contextid: int) -> Optional[int]:
|
|
results = self.db._execute(
|
|
QUERY_COURSE_CONTEXT_REVERSE, (contextid,), ctype=DictCursor
|
|
)
|
|
if results:
|
|
return results[0]["instanceid"]
|
|
|
|
def getCourseModules(self, courseid: int):
|
|
results = list(
|
|
self.db._execute(QUERY_COURSE_MODULES, (courseid,), ctype=DictCursor)
|
|
)
|
|
return results
|
|
|
|
def getCourseModuleCompletion(self, moduleid: int):
|
|
results = list(
|
|
self.db._execute(QUERY_MODULE_COMPLETION, (moduleid,), ctype=DictCursor)
|
|
)
|
|
return results
|
|
|
|
def setCourseModuleCompletion(self, moduleid: int, userid: int):
|
|
try:
|
|
self.db._execute(
|
|
QUERY_INSERT_MODULE_COMPLETION,
|
|
(moduleid, userid, Vessel.getTimestamp()),
|
|
)
|
|
except IntegrityError:
|
|
pass # Module completion record already exists
|
|
self.db._execute(QUERY_UPDATE_MODULE_COMPLETION, (moduleid, userid))
|
|
|
|
def setCourseCompletion(self, courseid: int, userid: int):
|
|
modules = self.getCourseModules(courseid)
|
|
|
|
for module in modules:
|
|
self.setCourseModuleCompletion(module["id"], userid)
|
|
|
|
def writeLog(self, event, data):
|
|
self.db._execute(QUERY_LOG_INSERT, (event, data))
|
|
|
|
def getCourseCompletions(self, courseid: int):
|
|
results = list(
|
|
self.db._execute(QUERY_COURSE_COMPLETION, (courseid,), ctype=DictCursor)
|
|
)
|
|
return results
|