From 488aea5d3b399e69094e2ae22a6d6f1cb88f1dc2 Mon Sep 17 00:00:00 2001 From: Kumi Date: Wed, 15 Mar 2023 08:42:50 +0000 Subject: [PATCH] Quite a few changes: Use single gauge Added a README Added a lot of comments Get more specific message for HTTP errors on startup Add command line arguments Get rid of default metrics Add error messages to assertions Make importable by adding main function --- README.md | 42 ++++++++ allkeyshop.py | 280 +++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 263 insertions(+), 59 deletions(-) create mode 100644 README.md mode change 100644 => 100755 allkeyshop.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..3daf6b2 --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +# Prometheus Exporter for allkeyshop.com + +This is a simple exporter for allkeyshop.com. It exports the lowest price for a +given game. + +## Prerequisites + +- Python >= 3.8 +- prometheus-client (pip install prometheus-client) + +## Configuration + +The exporter is configured using settings.ini. The provided settings.dist.ini +is a template for the configuration file. + +To add a new game/product, add a new section to the configuration file. The +section name can be either the product ID from allkeyshop.com or the product +name. + +## Usage + +To run the exporter, simply execute the allkeyshop.py script. The exporter will +listen on port 8090 by default. + +To get a list of all available command line options, run the following command: + +```bash +./allkeyshop.py --help +``` + +A sample output of the exporter looks like this: + +``` +# HELP allkeyshop_best_price Best price for a product on allkeyshop.com +# TYPE allkeyshop_best_price gauge +allkeyshop_best_price{currency="eur",product_name="Persona 5 Royal"} 49.99 +allkeyshop_best_price{currency="eur",product_name="Cyberpunk 2077"} 49.48 +``` + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file \ No newline at end of file diff --git a/allkeyshop.py b/allkeyshop.py old mode 100644 new mode 100755 index 5f2f6e9..12bc59a --- a/allkeyshop.py +++ b/allkeyshop.py @@ -1,8 +1,20 @@ -from prometheus_client import start_http_server, Gauge +#!/usr/bin/env python3 + +################################################################### +# allkeyshop.py - Prometheus exporter for allkeyshop.com +# 2022 - 2023 kumitterer (https://kumig.it/kumitterer) +# +# This program is free software under the terms of the MIT License, +# except where otherwise noted. +################################################################### + +from prometheus_client import start_http_server, Gauge, CollectorRegistry from configparser import ConfigParser +from argparse import ArgumentParser from pathlib import Path from urllib.request import urlopen, Request +from urllib.error import HTTPError from html.parser import HTMLParser from typing import List, Tuple, Optional @@ -12,6 +24,11 @@ import time class AllKeyShop: + """A class abstracting interaction with allkeyshop.com + """ + + # Define AllKeyShop's internal names for platforms as they are used in URLs + PLATFORM_PC = "cd-key" PLATFORM_PS5 = "ps5" PLATFORM_PS4 = "ps4" @@ -20,31 +37,58 @@ class AllKeyShop: PLATFORM_SWITCH = "nintendo-switch" class ProductParser(HTMLParser): + """A parser for the product page of allkeyshop.com + Yields the product ID of the product in its result attribute + """ + def __init__(self): super().__init__() self.reset() self.result: int def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]): + # Basically, we're looking for a tag with the "data-product-id" + # attribute and parse the value of that attribute as an integer + for attr in attrs: if attr[0] == "data-product-id": try: self.result = int(attr[1]) - except: + except (ValueError, IndexError): + # Not sure if this can even happen, + # but better safe than sorry + pass + except Exception as e: + # If this happens, something is seriously wrong + + print(f"Error while parsing product ID: {e}") class HTTPRequest(Request): + """Custom HTTP request class with a custom user agent + """ + def __init__(self, url: str, *args, **kwargs): super().__init__(url, *args, **kwargs) self.headers["user-agent"] = "allkeyshop.com prometheus exporter (https://kumig.it/kumitterer/prometheus-allkeyshop)" class ProductPageRequest(HTTPRequest): + """Class for generating requests to the product page of allkeyshop.com + """ @staticmethod def to_slug(string: str) -> str: + """Helper function for generating slugs from strings - # Shamelessly stolen from https://www.30secondsofcode.org/python/s/slugify - # Website, name & logo © 2017-2022 30 seconds of code (https://github.com/30-seconds) - # Individual snippets licensed under CC-BY-4.0 (https://creativecommons.org/licenses/by/4.0/) + Shamelessly stolen from https://www.30secondsofcode.org/python/s/slugify + Website, name & logo © 2017-2022 30 seconds of code (https://github.com/30-seconds) + Individual snippets licensed under CC-BY-4.0 (https://creativecommons.org/licenses/by/4.0/) + + Args: + string (str): The string to generate a slug from + + Returns: + str: The generated slug + """ string = string.lower().strip() string = re.sub(r'[^\w\s-]', '', string) @@ -54,17 +98,35 @@ class AllKeyShop: return string def __init__(self, product: str, platform: str, *args, **kwargs): + + # Get slug to use in URL + slug = self.__class__.to_slug(product) + # Allow "pc" as shorthand platform name for PCs + if platform == "pc": platform = AllKeyShop.PLATFORM_PC - url = f"https://www.allkeyshop.com/blog/buy-{slug}-{platform}-compare-prices/" + # Set request URL + url = f"https://www.allkeyshop.com/blog/buy-{slug}-{platform}-compare-prices/" super().__init__(url, *args, **kwargs) class OffersRequest(HTTPRequest): + """Class for generating requests to the offers API of allkeyshop.com + """ + def __init__(self, product: int, *args, currency: str, **kwargs): + """Initializes the request + + Args: + product (int): Product ID of the product to get offers for + currency (str): Currency to get offers in (e.g. "eur") + region (str, optional): Region to get offers for (e.g. "eu"). Defaults to "". + edition (str, optional): Edition to get offers for (e.g. "standard"). Defaults to "". + moreq (str, optional): Additional query parameters. Defaults to "". + """ region: str = kwargs.pop("region", "") edition: str = kwargs.pop("edition", "") moreq: str = kwargs.pop("moreq", "") @@ -74,99 +136,199 @@ class AllKeyShop: super().__init__(url, *args, **kwargs) def __init__(self, product: int | str, platform: Optional[str] = None, **kwargs): + """Initializes the AllKeyShop object + + Args: + product (int | str): Product ID or name of the product to get offers for + platform (Optional[str], optional): Platform to get offers for, if a product name is passed. Defaults to None. + """ self.product: int self.kwargs: dict = kwargs if isinstance(product, int): + # Product ID is already known - no need to resolve it self.product = product else: - assert platform + # Resolve product ID from product name and platform + assert platform, "Platform must be specified if product name is passed" self.product = self.__class__.resolve_product(product, platform) @classmethod def resolve_product(cls, product: str, platform: str) -> int: + """Resolves a product ID from a product name and platform + + Args: + product (str): Name of the product to resolve + platform (str): Platform to get the product ID for + + Returns: + int: Product ID matching the given product name and platform + """ + + # Get product page + content = urlopen(cls.ProductPageRequest(product, platform)) html = content.read().decode() + # Pass the content to the custom HTML parser + parser = cls.ProductParser() parser.feed(html) - assert parser.result + # Return the result, or raise an exception if no result was found + + assert parser.result, f"Could not resolve product ID for product {product} on platform {platform}" return parser.result def get_offers(self) -> dict: + """Gets all offers for the product + + Returns: + dict: Offers for the product + """ + + # Get offers + content = urlopen(self.__class__.OffersRequest( self.product, **self.kwargs)) raw = content.read() content = json.loads(raw) - assert content["success"] + + # Return the offers, or raise an exception if the request failed + + assert content["success"], "Something went wrong while getting offers" return content["offers"] -config = ConfigParser() -config.read(Path(__file__).parent / "settings.ini") +def main(): + # Parse command line arguments -gauges: List[Tuple[Gauge, AllKeyShop]] = list() + parser = ArgumentParser( + description="Prometheus exporter for allkeyshop.com") + parser.add_argument("-c", "--config", type=Path, default=Path(__file__).parent / + "settings.ini", help="Path to config file (default: settings.ini in script directory)") + parser.add_argument("-p", "--port", type=int, default=8090, + help="Port to listen on (default: 8090)") + parser.add_argument("-a", "--address", type=str, default="0.0.0.0", + help="Address to listen on (default: 0.0.0.0)") + args = parser.parse_args() -if config.has_section("DEFAULT"): - defaults = config["DEFAULT"] -else: - defaults = dict() + # Read configuration file -for section, settings in filter(lambda x: x[0] != "DEFAULT", config.items()): - try: - currency: int - assert (currency := settings.get( - "Currency", fallback=defaults.get("Currency")).lower()) + config = ConfigParser() + config.read(args.config) - region: str = settings.get( - "Region", fallback=defaults.get("Region", "")) - edition: str = settings.get("Edition", fallback="") + if config.has_section("DEFAULT"): + defaults = config["DEFAULT"] + else: + defaults = dict() - product: str | int - platform: Optional[str] - name: str + # Initialize a custom CollectorRegistry so we don't get the default metrics + registry = CollectorRegistry() + + # Initialize Gauge + + gauge = Gauge( + f"allkeyshop_best_price", f"Best price for a product on allkeyshop.com", + ["product_name", "currency"], registry=registry) + + # Initialize products + + products: List[Tuple[AllKeyShop, str, str]] = list() + + for section, settings in filter(lambda x: x[0] != "DEFAULT", config.items()): try: - product = int(section) - name = settings.get("Name", fallback=section) - aks: AllKeyShop = AllKeyShop( - product, currency=currency, region=region, edition=edition) - except ValueError: - product = name = section - platform = settings.get( - "Platform", fallback=defaults.get("Platform")) - assert platform - aks: AllKeyShop = AllKeyShop( - product, platform, currency=currency, region=region, edition=edition) + # Assert that we know the currency we want to use - gauge = Gauge( - f"allkeyshop_{AllKeyShop.ProductPageRequest.to_slug(name).replace('-', '_')}_{currency}", f"Best price for {name}") - gauges.append((gauge, aks)) + currency: str + assert (currency := settings.get( + "Currency", fallback=defaults.get("Currency"))), "Currency not set for section {section}" + currency = currency.lower() - except Exception as e: - print(f"Error setting up gauge for section {section}: {e}") + # Check if we need a specific region or edition + region: str = settings.get( + "Region", fallback=defaults.get("Region", "")) + edition: str = settings.get("Edition", fallback="") -start_http_server(8090) + product: str | int + platform: Optional[str] + name: str -while True: - for gauge, aks in gauges: - try: - offers: List[dict] = aks.get_offers() - available_offers: List[dict] = filter( - lambda x: x["stock"] == "InStock", offers) - store: Optional[str] + # Initialize AllKeyShop object - if (store := settings.get("Store", fallback=defaults.get("Store", ""))): - available_offers: List[dict] = filter( - lambda x: x["platform"] == store, available_offers) + try: + product = int(section) + name = settings.get("Name", fallback=section) + aks: AllKeyShop = AllKeyShop( + product, currency=currency, region=region, edition=edition) - best_offer: dict = min( - available_offers, key=lambda x: x["price"][currency]["price"]) - gauge.set(best_offer["price"][currency]["price"]) + except ValueError: + product = name = section + platform = settings.get( + "Platform", fallback=defaults.get("Platform")) + assert platform + aks: AllKeyShop = AllKeyShop( + product, platform, currency=currency, + region=region, edition=edition) + + # Finally, add the product to the list + + products.append((aks, name, currency)) + + # If something goes wrong at this point, we assume that there is a + # problem with the configuration file and exit + + except HTTPError as e: + print(f"Error calling URL {e.url} for section {section}: {e}") + exit(1) except Exception as e: - print(f"Error updating gauge value for {gauge._name}: {e}") + print(f"Error setting up gauge for section {section}: {e}") + exit(1) - time.sleep(60) + # Self-explanatory line, no? + + start_http_server(args.port, args.address, registry) + + # Start updating the prices + + while True: + for aks, name, currency in products: + try: + # Get all offers and filter out the ones that are not in stock + + offers: List[dict] = aks.get_offers() + available_offers: List[dict] = filter( + lambda x: x["stock"] == "InStock", offers) + + # If we have a store preference, filter out the offers that are + # not from that store + + store: Optional[str] + + if (store := settings.get("Store", fallback=defaults.get("Store", ""))): + available_offers: List[dict] = filter( + lambda x: x["platform"] == store, available_offers) + + # Get the best offer and update the gauge + + best_offer: dict = min( + available_offers, key=lambda x: x["price"][currency]["price"]) + gauge.labels(product_name=name, currency=currency).set( + best_offer["price"][currency]["price"]) + + # If something goes wrong at this stage, we assume that there is just + # a problem with our connectivity or the website itself and continue + + except Exception as e: + print(f"Error updating gauge value for {gauge._name}: {e}") + + # Finally, wait for a minute before updating the prices again + + time.sleep(60) + + +if __name__ == "__main__": + main()