122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
import os
|
|
import re
|
|
from urllib.parse import urlparse
|
|
import mysql.connector
|
|
from flask import Flask, request, jsonify
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
|
|
app = Flask(__name__)
|
|
|
|
limiter = Limiter(get_remote_address, app=app, default_limits=["60 per minute"])
|
|
|
|
DATABASE_URL = os.environ.get(
|
|
"DATABASE_URL", "mysql://root:my-secret-pw@localhost:3306/devsecops_db"
|
|
)
|
|
|
|
## I added more devices as I felt they are relevant to the ones asked in pdf
|
|
VALID_DEVICE_TYPES = {"iOS", "Android", "Watch", "TV", "Tablet", "Desktop", "IoT"}
|
|
USER_KEY_PATTERN = re.compile(r"^[a-zA-Z0-9_\-\.@]{1,255}$")
|
|
|
|
|
|
def get_db_connection():
|
|
parsed = urlparse(DATABASE_URL)
|
|
return mysql.connector.connect(
|
|
host=parsed.hostname,
|
|
port=parsed.port or 3306,
|
|
user=parsed.username,
|
|
password=parsed.password,
|
|
database=parsed.path.lstrip("/"),
|
|
)
|
|
|
|
|
|
def validate_device_type(device_type):
|
|
return device_type in VALID_DEVICE_TYPES
|
|
|
|
|
|
def validate_user_key(user_key):
|
|
return bool(user_key) and USER_KEY_PATTERN.match(user_key)
|
|
|
|
|
|
@app.route("/log/auth", methods=["POST"])
|
|
@limiter.limit("30 per minute")
|
|
def log_auth():
|
|
data = request.get_json(silent=True)
|
|
if not data:
|
|
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
|
|
|
|
user_key = data.get("userKey", "")
|
|
device_type = data.get("deviceType", "")
|
|
|
|
if not validate_user_key(user_key) or not validate_device_type(device_type):
|
|
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)",
|
|
(user_key, device_type),
|
|
)
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({"statusCode": 200, "message": "success"}), 200
|
|
except Exception:
|
|
return jsonify({"statusCode": 400, "message": "bad_request"}), 400
|
|
|
|
|
|
@app.route("/log/auth/statistics", methods=["GET"])
|
|
@limiter.limit("30 per minute")
|
|
def log_auth_statistics():
|
|
device_type = request.args.get("deviceType", "")
|
|
|
|
if not validate_device_type(device_type):
|
|
return jsonify({"deviceType": device_type, "count": -1}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM device_registrations WHERE device_type = %s",
|
|
(device_type,),
|
|
)
|
|
count = cursor.fetchone()[0]
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({"deviceType": device_type, "count": count}), 200
|
|
except Exception:
|
|
return jsonify({"deviceType": device_type, "count": -1}), 500
|
|
|
|
|
|
@app.route("/device/register", methods=["POST"])
|
|
@limiter.limit("30 per minute")
|
|
def register_device():
|
|
data = request.get_json(silent=True)
|
|
if not data:
|
|
return jsonify({"statusCode": 400}), 400
|
|
|
|
user_key = data.get("userKey", "")
|
|
device_type = data.get("deviceType", "")
|
|
|
|
if not validate_user_key(user_key) or not validate_device_type(device_type):
|
|
return jsonify({"statusCode": 400}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO device_registrations (user_key, device_type) VALUES (%s, %s)",
|
|
(user_key, device_type),
|
|
)
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({"statusCode": 200}), 200
|
|
except Exception:
|
|
return jsonify({"statusCode": 400}), 400
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000)
|