Finish remaining code
This commit is contained in:
121
app/server.py
Normal file
121
app/server.py
Normal file
@@ -0,0 +1,121 @@
|
||||
import os
|
||||
import re
|
||||
from urllib.parse import urlparse
|
||||
import mysql.connector
|
||||
from flask import Flask, request, jsonify
|
||||
from flask_limiter import Limiter
|
||||
from flask_limiter.util import get_remote_address
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
limiter = Limiter(get_remote_address, app=app, default_limits=["60 per minute"])
|
||||
|
||||
DATABASE_URL = os.environ.get(
|
||||
"DATABASE_URL", "mysql://root:my-secret-pw@localhost:3306/devsecops_db"
|
||||
)
|
||||
|
||||
## I added more devices as I felt they are relevant to the ones asked in pdf
|
||||
VALID_DEVICE_TYPES = {"iOS", "Android", "Watch", "TV", "Tablet", "Desktop", "IoT"}
|
||||
USER_KEY_PATTERN = re.compile(r"^[a-zA-Z0-9_\-\.@]{1,255}$")
|
||||
|
||||
|
||||
def get_db_connection():
|
||||
parsed = urlparse(DATABASE_URL)
|
||||
return mysql.connector.connect(
|
||||
host=parsed.hostname,
|
||||
port=parsed.port or 3306,
|
||||
user=parsed.username,
|
||||
password=parsed.password,
|
||||
database=parsed.path.lstrip("/"),
|
||||
)
|
||||
|
||||
|
||||
def validate_device_type(device_type):
|
||||
return device_type in VALID_DEVICE_TYPES
|
||||
|
||||
|
||||
def validate_user_key(user_key):
|
||||
return bool(user_key) and USER_KEY_PATTERN.match(user_key)
|
||||
|
||||
|
||||
@app.route("/log/auth", methods=["POST"])
|
||||
@limiter.limit("30 per minute")
|
||||
def log_auth():
|
||||
data = request.get_json(silent=True)
|
||||
if not data:
|
||||
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
|
||||
|
||||
user_key = data.get("userKey", "")
|
||||
device_type = data.get("deviceType", "")
|
||||
|
||||
if not validate_user_key(user_key) or not validate_device_type(device_type):
|
||||
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)",
|
||||
(user_key, device_type),
|
||||
)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return jsonify({"statusCode": 200, "message": "success"}), 200
|
||||
except Exception:
|
||||
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
|
||||
|
||||
|
||||
@app.route("/log/auth/statistics", methods=["GET"])
|
||||
@limiter.limit("30 per minute")
|
||||
def log_auth_statistics():
|
||||
device_type = request.args.get("deviceType", "")
|
||||
|
||||
if not validate_device_type(device_type):
|
||||
return jsonify({"deviceType": device_type, "count": -1}), 400
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"SELECT COUNT(*) FROM device_registrations WHERE device_type = %s",
|
||||
(device_type,),
|
||||
)
|
||||
count = cursor.fetchone()[0]
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return jsonify({"deviceType": device_type, "count": count}), 200
|
||||
except Exception:
|
||||
return jsonify({"deviceType": device_type, "count": -1}), 500
|
||||
|
||||
|
||||
@app.route("/device/register", methods=["POST"])
|
||||
@limiter.limit("30 per minute")
|
||||
def register_device():
|
||||
data = request.get_json(silent=True)
|
||||
if not data:
|
||||
return jsonify({"statusCode": 400}), 400
|
||||
|
||||
user_key = data.get("userKey", "")
|
||||
device_type = data.get("deviceType", "")
|
||||
|
||||
if not validate_user_key(user_key) or not validate_device_type(device_type):
|
||||
return jsonify({"statusCode": 400}), 400
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)",
|
||||
(user_key, device_type),
|
||||
)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return jsonify({"statusCode": 200}), 200
|
||||
except Exception:
|
||||
return jsonify({"statusCode": 400}), 400
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=5000)
|
||||
Reference in New Issue
Block a user