feat: implement JWT authentication for user management
This commit is contained in:
parent
3d172fb5af
commit
f078c27cb5
@ -1,2 +1,3 @@
|
|||||||
Flask
|
Flask
|
||||||
Flask-SQLAlchemy
|
Flask-SQLAlchemy
|
||||||
|
PyJWT
|
||||||
|
@ -1,23 +1,36 @@
|
|||||||
from flask import Blueprint, request, jsonify, g
|
from flask import Blueprint, request, jsonify, g
|
||||||
from models import db, User
|
from models import db, User
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
|
import jwt
|
||||||
from werkzeug.security import generate_password_hash, check_password_hash
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||||||
|
import os
|
||||||
|
|
||||||
user_bp = Blueprint('user', __name__)
|
user_bp = Blueprint('user', __name__)
|
||||||
|
SECRET_KEY = os.environ.get('SECRET_KEY', 'your_secret_key') # Set a secret key for JWT
|
||||||
|
|
||||||
|
def generate_token(user):
|
||||||
|
payload = {
|
||||||
|
'user_id': user.id,
|
||||||
|
'exp': datetime.utcnow() + timedelta(days=1) # Token expires in 1 day
|
||||||
|
}
|
||||||
|
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
|
||||||
|
|
||||||
def authenticate():
|
def authenticate():
|
||||||
username = request.headers.get('X-Username')
|
token = request.headers.get('Authorization')
|
||||||
password = request.headers.get('X-Password')
|
if token:
|
||||||
if username and password:
|
try:
|
||||||
user = User.query.filter_by(username=username).first()
|
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
|
||||||
if user and check_password_hash(user.password, password):
|
g.user = User.query.get(payload['user_id'])
|
||||||
g.user = user
|
|
||||||
return True
|
return True
|
||||||
|
except jwt.ExpiredSignatureError:
|
||||||
|
return False
|
||||||
|
except jwt.InvalidTokenError:
|
||||||
|
return False
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@user_bp.before_request
|
@user_bp.before_request
|
||||||
def before_request():
|
def before_request():
|
||||||
if request.endpoint not in ['user.login_user']:
|
if request.endpoint not in ['user.login_user', 'user.create_user']:
|
||||||
if not authenticate():
|
if not authenticate():
|
||||||
return jsonify({'message': 'Unauthorized access!'}), 401
|
return jsonify({'message': 'Unauthorized access!'}), 401
|
||||||
|
|
||||||
@ -42,7 +55,8 @@ def login_user():
|
|||||||
if user and check_password_hash(user.password, data['password']):
|
if user and check_password_hash(user.password, data['password']):
|
||||||
user.last_login = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
user.last_login = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return jsonify({'message': 'Login successful!', 'role': user.role}), 200
|
token = generate_token(user)
|
||||||
|
return jsonify({'message': 'Login successful!', 'token': token, 'role': user.role}), 200
|
||||||
return jsonify({'message': 'Invalid credentials!'}), 401
|
return jsonify({'message': 'Invalid credentials!'}), 401
|
||||||
|
|
||||||
@user_bp.route('/users', methods=['GET'])
|
@user_bp.route('/users', methods=['GET'])
|
||||||
|
Loading…
Reference in New Issue
Block a user