from flask import Blueprint, request, jsonify, g from models import db, User from datetime import datetime, timedelta import jwt from werkzeug.security import generate_password_hash, check_password_hash import os user_bp = Blueprint('user', __name__) SECRET_KEY = os.environ.get('SECRET_KEY', 'your_secret_key') # Set a secret key for JWT def generate_token(user): payload = { 'user_id': user.id, 'exp': datetime.utcnow() + timedelta(days=1) # Token expires in 1 day } return jwt.encode(payload, SECRET_KEY, algorithm='HS256') def authenticate(): token = request.headers.get('Authorization') if token: try: payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256']) g.user = User.query.get(payload['user_id']) return True except jwt.ExpiredSignatureError: return False except jwt.InvalidTokenError: return False return False @user_bp.before_request def before_request(): if request.endpoint not in ['user.login_user', 'user.create_user']: if not authenticate(): return jsonify({'message': 'Unauthorized access!'}), 401 @user_bp.route('/users', methods=['POST']) def create_user(): if not authenticate() or g.user.role != 'admin': return jsonify({'message': 'Unauthorized access! Only admins can create users.'}), 401 data = request.json hashed_password = generate_password_hash(data['password']) new_user = User( username=data['username'], password=hashed_password, role=data.get('role', 'user'), last_login=None ) db.session.add(new_user) db.session.commit() return jsonify({'message': 'User created!'}), 201 @user_bp.route('/users/login', methods=['POST']) def login_user(): data = request.json user = User.query.filter_by(username=data['username']).first() if user and check_password_hash(user.password, data['password']): user.last_login = datetime.now().strftime('%Y-%m-%d %H:%M:%S') db.session.commit() token = generate_token(user) return jsonify({'message': 'Login successful!', 'token': token, 'role': user.role}), 200 return jsonify({'message': 'Invalid credentials!'}), 401 @user_bp.route('/users', methods=['GET']) def get_users(): users = User.query.all() return jsonify([{ 'id': user.id, 'username': user.username, 'role': user.role, 'last_login': user.last_login } for user in users]) @user_bp.route('/users/', methods=['PUT']) def edit_user(user_id): if not authenticate() or g.user.role != 'admin': return jsonify({'message': 'Unauthorized access! Only admins can edit users.'}), 401 data = request.json user = User.query.get(user_id) if not user: return jsonify({'message': 'User not found!'}), 404 user.username = data.get('username', user.username) user.role = data.get('role', user.role) db.session.commit() return jsonify({'message': 'User updated!'}), 200 @user_bp.route('/users/', methods=['DELETE']) def delete_user(user_id): if not authenticate() or g.user.role != 'admin': return jsonify({'message': 'Unauthorized access! Only admins can delete users.'}), 401 user = User.query.get(user_id) if not user: return jsonify({'message': 'User not found!'}), 404 db.session.delete(user) db.session.commit() return jsonify({'message': 'User deleted!'}), 200