dumuzid/api/views/base.py

92 lines
3.1 KiB
Python
Raw Normal View History

2022-09-26 11:48:15 +00:00
from django.views import View
from django.http import HttpRequest, JsonResponse
from django.utils import timezone
from datastore.models import APIToken, APIUser
from typing import List, Optional
import json
class BaseAPIView(View):
http_method_names: List[str] = ["post"]
unwrap_request: bool = True
wrap_response: bool = True
public: bool = False
view_name: Optional[str]
data: Optional[dict]
token: Optional[APIToken]
def get_view_name(self) -> str:
return self.view_name or self.request.get_raw_uri().split("/")[-1] or self.__class__.__name__[:-4]
def wrapper(self, indict: dict, force: bool = False):
if self.wrap_response or force:
return {
f"{self.get_view_name()}Result": indict
}
def authentication_error(self, message: str) -> JsonResponse:
response_data: dict = {
"Authentication_Approved": False,
"Authentication_ReasonDenied": message
}
return JsonResponse(wrapper(response_data))
def error(self, message: str) -> dict:
response_data: dict = {
"ErrorText": message
}
return response_data
def handle_request(self, request: HttpRequest, *args, **kwargs) -> dict:
raise NotImplementedError(
f"Class {self.__class__.__name__} does not implement handle_request()!")
def post(self, request: HttpRequest, *args, **kwargs) -> JsonResponse:
self.data: dict = json.loads(request.body)
if self.unwrap_request:
self.data = self.data["request"]
if not self.public:
if not self.data["Authentication_Token"]:
return self.authentication_error("Authentication token required for non-public API endpoint")
if not (tokens := APIToken.objects.filter(value=self.data["Authentication_Token"], expiry__lte=timezone.now())).exists():
return self.authentication_error("Authentication token does not exist or has expired")
self.token = tokens.first()
response_data = self.handle_request(request, *args, **kwargs)
response_data["Authentication_Approved"] = True
return JsonResponse(wrapper(response_data))
class BaseAuthenticationView(BaseAPIView):
unwrap_request = False
wrap_response = False
public = True
def handle_request(self, request, *args, **kwargs) -> dict:
if (not "credentials" in self.data) or (not "Login" in self.data["credentials"]) or (not "Password" in self.data["credentials"]):
return self.error("You need to pass credentials including Login and Password")
if (not (users := APIUser.objects.filter(username=self.data["credentials"]["Login"])).exists()) or not (user := users.first()).check_password(self.data["credentials"]["Password"]):
return self.error("Username or password incorrect")
validity = timezone.timedelta(seconds=int(
self.data["credentials"].get("LifeTime", 0)))
token = APIToken.objects.create(
user=user, expiry=timezone.now() + validity)
return {
"Authentication_Token": token.value
}