oebb_py/main.py
Klaus-Uwe Mitterer 87f77a769a Fix path encodings
2017-10-26 22:38:55 +02:00

208 lines
5.8 KiB
Python

import cgi
import datetime
import pytz
import workers.conn
import workers.val
import workers.closest
import workers.radar
def application(env, re):
if env["REQUEST_METHOD"] == "POST":
args = cgi.parse_qs(env['wsgi.input'].readline().decode(), True)
elif env["REQUEST_METHOD"] == "GET":
args = cgi.parse_qs(env['QUERY_STRING'], True)
else:
re("405 Method Not Allowed", [])
return
conn = False
val = False
cfrm = None
cto = None
split = env["PATH_INFO"].split("/")
split = [i.strip() for i in split]
while "" in split:
split.remove("")
if len(split) > 2:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "Only one (validate) or two (conn) arguments may be passed as path.".encode()
return
if len(split) > 0:
if len(split) == 1:
val = True
else:
conn = True
cto = split[1].encode("latin-1").decode("utf-8")
cfrm = split[0].encode("latin-1").decode("utf-8")
try:
rtype = "conn" if conn else "val" if val else args["type"][0]
except:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "A request type must be provided.".encode()
return
json = "json" in args
if rtype.lower() in ["conn", "connection"]:
try:
try:
frm = (cfrm or args["from"][0]).encode("latin-1").decode("utf-8")
to = (cto or args["to"][0]).encode("latin-1").decode("utf-8")
except UnicodeDecodeError:
frm = cfrm or args["from"][0]
to = cto or args["to"][0]
if not frm or not to:
raise ValueError()
except:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "\"from\" and \"to\" values are required for this type of request.".encode()
return
count = args["count"][0] if "count" in args and args["count"] else 6
date = args["date"][0] if "date" in args and args["date"] else datetime.datetime.strftime(datetime.datetime.now(pytz.timezone("Europe/Vienna")),"%d.%m.%Y")
time = args["time"][0] if "time" in args and args["time"] else datetime.datetime.strftime(datetime.datetime.now(pytz.timezone("Europe/Vienna")),"%H:%M")
mode = True if "mode" in args and args["mode"] and args["mode"][0].lower() == "arr" else False
details = True if "details" in args else False
try:
count = int(count)
if count < 0 or count > 10:
raise ValueError()
except:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "The \"count\" value must be a value between 0 and 10.".encode()
return
try:
outtime = datetime.datetime.strptime("%s %s" % (date, time), "%d.%m.%Y %H:%M")
except:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "The \"date\" value must be in DD.MM.YYYY format, the \"time\" value must be in HH:MM format.".encode()
return
via = list(args["via"]) if "via" in args else None
if via and len(via) > 3:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "It is not possible to route through more than three \"via\" stations.".encode()
return
try:
output = workers.conn.worker(frm, to, count, outtime, mode, details, json, via)
except Exception as e:
re("500 Internal Server Error", [])
yield "<h1>500 Internal Server Error</h1>".encode()
if "debug" in args:
yield str(e).encode()
return
re("200 OK", [("Content-Type", "application/json" if json else "text/xml")])
yield output.encode()
return
elif rtype.lower() in ["val", "validate"]:
try:
try:
name = (cfrm or args["from"][0]).encode("latin-1").decode("utf-8")
except UnicodeDecodeError:
name = cfrm or args["from"][0]
if not name:
raise ValueError()
except:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "A \"name\" value is required for this type of request.".encode()
return
try:
output = workers.val.worker(name, json)
except Exception as e:
re("500 Internal Server Error", [])
yield "<h1>500 Internal Server Error</h1>".encode()
if "debug" in args:
yield str(e).encode()
return
re("200 OK", [("Content-Type", "application/json" if json else "text/xml")])
yield output.encode()
return
elif rtype.lower() in ["closest", "close", "near", "nearby"]:
try:
lat = float(args["lat"][0].replace(",", "."))
lon = float(args["lon"][0].replace(",", "."))
if (not lat and not lat == float(0)) or (not lon and not lon == float(0)):
raise ValueError()
except:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "\"lat\" and \"lon\" values are required for this type of request.".encode()
return
distance = args["distance"][0] if "distance" in args and args["distance"] else 1000
try:
distance = int(distance)
if distance < 0 or distance > 10000:
raise ValueError()
except:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>"
yield "\"distance\" must be a value between 0 and 10000."
return
try:
output = workers.closest.worker(lat, lon, distance, json)
except Exception as e:
re("500 Internal Server Error", [])
yield "<h1>500 Internal Server Error</h1>".encode()
if "debug" in args:
yield e.encode()
return
re("200 OK", [("Content-Type", "application/json" if json else "text/xml")])
yield output.encode()
return
elif rtype.lower() in ["radar", "live"]:
trains = args["train"] if "train" in args else None
try:
output = workers.radar.worker(trains, json)
except Exception as e:
re("500 Internal Server Error", [])
yield "<h1>500 Internal Server Error</h1>".encode()
if "debug" in args:
yield e.encode()
return
re("200 OK", [("Content-Type", "application/json" if json else "text/xml")])
yield output.encode()
return
else:
re("400 Bad Request", [])
yield "<h1>400 Bad Request</h1>".encode()
yield "The request type you submitted is invalid.".encode()
return