Files
Harshavardhan Musanalli d720f43439 Finish remaining code
2026-02-13 09:48:41 +01:00

122 lines
3.6 KiB
Python

import os
import re
from urllib.parse import urlparse
import mysql.connector
from flask import Flask, request, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
limiter = Limiter(get_remote_address, app=app, default_limits=["60 per minute"])
DATABASE_URL = os.environ.get(
"DATABASE_URL", "mysql://root:my-secret-pw@localhost:3306/devsecops_db"
)
## I added more devices as I felt they are relevant to the ones asked in pdf
VALID_DEVICE_TYPES = {"iOS", "Android", "Watch", "TV", "Tablet", "Desktop", "IoT"}
USER_KEY_PATTERN = re.compile(r"^[a-zA-Z0-9_\-\.@]{1,255}$")
def get_db_connection():
parsed = urlparse(DATABASE_URL)
return mysql.connector.connect(
host=parsed.hostname,
port=parsed.port or 3306,
user=parsed.username,
password=parsed.password,
database=parsed.path.lstrip("/"),
)
def validate_device_type(device_type):
return device_type in VALID_DEVICE_TYPES
def validate_user_key(user_key):
return bool(user_key) and USER_KEY_PATTERN.match(user_key)
@app.route("/log/auth", methods=["POST"])
@limiter.limit("30 per minute")
def log_auth():
data = request.get_json(silent=True)
if not data:
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
user_key = data.get("userKey", "")
device_type = data.get("deviceType", "")
if not validate_user_key(user_key) or not validate_device_type(device_type):
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)",
(user_key, device_type),
)
conn.commit()
cursor.close()
conn.close()
return jsonify({"statusCode": 200, "message": "success"}), 200
except Exception:
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
@app.route("/log/auth/statistics", methods=["GET"])
@limiter.limit("30 per minute")
def log_auth_statistics():
device_type = request.args.get("deviceType", "")
if not validate_device_type(device_type):
return jsonify({"deviceType": device_type, "count": -1}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM device_registrations WHERE device_type = %s",
(device_type,),
)
count = cursor.fetchone()[0]
cursor.close()
conn.close()
return jsonify({"deviceType": device_type, "count": count}), 200
except Exception:
return jsonify({"deviceType": device_type, "count": -1}), 500
@app.route("/device/register", methods=["POST"])
@limiter.limit("30 per minute")
def register_device():
data = request.get_json(silent=True)
if not data:
return jsonify({"statusCode": 400}), 400
user_key = data.get("userKey", "")
device_type = data.get("deviceType", "")
if not validate_user_key(user_key) or not validate_device_type(device_type):
return jsonify({"statusCode": 400}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)",
(user_key, device_type),
)
conn.commit()
cursor.close()
conn.close()
return jsonify({"statusCode": 200}), 200
except Exception:
return jsonify({"statusCode": 400}), 400
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)