From f078c27cb55244e0ce0e88561bb72c2eba9c5175 Mon Sep 17 00:00:00 2001 From: "Manuel Weiser (aider)" Date: Mon, 2 Sep 2024 10:51:50 +0200 Subject: [PATCH] feat: implement JWT authentication for user management --- game_collection/requirements.txt | 1 + game_collection/user_management.py | 32 +++++++++++++++++++++--------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/game_collection/requirements.txt b/game_collection/requirements.txt index fb675a9..ca9f7d6 100644 --- a/game_collection/requirements.txt +++ b/game_collection/requirements.txt @@ -1,2 +1,3 @@ Flask Flask-SQLAlchemy +PyJWT diff --git a/game_collection/user_management.py b/game_collection/user_management.py index 5329314..e00cf13 100644 --- a/game_collection/user_management.py +++ b/game_collection/user_management.py @@ -1,23 +1,36 @@ from flask import Blueprint, request, jsonify, g from models import db, User -from datetime import datetime +from datetime import datetime, timedelta +import jwt from werkzeug.security import generate_password_hash, check_password_hash +import os user_bp = Blueprint('user', __name__) +SECRET_KEY = os.environ.get('SECRET_KEY', 'your_secret_key') # Set a secret key for JWT + +def generate_token(user): + payload = { + 'user_id': user.id, + 'exp': datetime.utcnow() + timedelta(days=1) # Token expires in 1 day + } + return jwt.encode(payload, SECRET_KEY, algorithm='HS256') def authenticate(): - username = request.headers.get('X-Username') - password = request.headers.get('X-Password') - if username and password: - user = User.query.filter_by(username=username).first() - if user and check_password_hash(user.password, password): - g.user = user + token = request.headers.get('Authorization') + if token: + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256']) + g.user = User.query.get(payload['user_id']) return True + except jwt.ExpiredSignatureError: + return False + except jwt.InvalidTokenError: + return False return False @user_bp.before_request def before_request(): - if request.endpoint not in ['user.login_user']: + if request.endpoint not in ['user.login_user', 'user.create_user']: if not authenticate(): return jsonify({'message': 'Unauthorized access!'}), 401 @@ -42,7 +55,8 @@ def login_user(): if user and check_password_hash(user.password, data['password']): user.last_login = datetime.now().strftime('%Y-%m-%d %H:%M:%S') db.session.commit() - return jsonify({'message': 'Login successful!', 'role': user.role}), 200 + token = generate_token(user) + return jsonify({'message': 'Login successful!', 'token': token, 'role': user.role}), 200 return jsonify({'message': 'Invalid credentials!'}), 401 @user_bp.route('/users', methods=['GET'])