import os import re from urllib.parse import urlparse import mysql.connector from flask import Flask, request, jsonify from flask_limiter import Limiter from flask_limiter.util import get_remote_address app = Flask(__name__) limiter = Limiter(get_remote_address, app=app, default_limits=["60 per minute"]) DATABASE_URL = os.environ.get( "DATABASE_URL", "mysql://root:my-secret-pw@localhost:3306/devsecops_db" ) ## I added more devices as I felt they are relevant to the ones asked in pdf VALID_DEVICE_TYPES = {"iOS", "Android", "Watch", "TV", "Tablet", "Desktop", "IoT"} USER_KEY_PATTERN = re.compile(r"^[a-zA-Z0-9_\-\.@]{1,255}$") def get_db_connection(): parsed = urlparse(DATABASE_URL) return mysql.connector.connect( host=parsed.hostname, port=parsed.port or 3306, user=parsed.username, password=parsed.password, database=parsed.path.lstrip("/"), ) def validate_device_type(device_type): return device_type in VALID_DEVICE_TYPES def validate_user_key(user_key): return bool(user_key) and USER_KEY_PATTERN.match(user_key) @app.route("/log/auth", methods=["POST"]) @limiter.limit("30 per minute") def log_auth(): data = request.get_json(silent=True) if not data: return jsonify({"statusCode": 400, "message": "bad_request"}), 400 user_key = data.get("userKey", "") device_type = data.get("deviceType", "") if not validate_user_key(user_key) or not validate_device_type(device_type): return jsonify({"statusCode": 400, "message": "bad_request"}), 400 try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)", (user_key, device_type), ) conn.commit() cursor.close() conn.close() return jsonify({"statusCode": 200, "message": "success"}), 200 except Exception: return jsonify({"statusCode": 400, "message": "bad_request"}), 400 @app.route("/log/auth/statistics", methods=["GET"]) @limiter.limit("30 per minute") def log_auth_statistics(): device_type = request.args.get("deviceType", "") if not validate_device_type(device_type): return jsonify({"deviceType": device_type, "count": -1}), 400 try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "SELECT COUNT(*) FROM device_registrations WHERE device_type = %s", (device_type,), ) count = cursor.fetchone()[0] cursor.close() conn.close() return jsonify({"deviceType": device_type, "count": count}), 200 except Exception: return jsonify({"deviceType": device_type, "count": -1}), 500 @app.route("/device/register", methods=["POST"]) @limiter.limit("30 per minute") def register_device(): data = request.get_json(silent=True) if not data: return jsonify({"statusCode": 400}), 400 user_key = data.get("userKey", "") device_type = data.get("deviceType", "") if not validate_user_key(user_key) or not validate_device_type(device_type): return jsonify({"statusCode": 400}), 400 try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)", (user_key, device_type), ) conn.commit() cursor.close() conn.close() return jsonify({"statusCode": 200}), 200 except Exception: return jsonify({"statusCode": 400}), 400 if __name__ == "__main__": app.run(host="0.0.0.0", port=5000)