flaschengeist/flaschengeist/modules/auth/__init__.py

110 lines
3.9 KiB
Python

#############################################
# Plugin: Auth #
# Functionality: Allow management of #
# authentication, login, logout, etc #
#############################################
from flask import Blueprint, request, jsonify
from werkzeug.exceptions import Forbidden, BadRequest, Unauthorized
from werkzeug.local import LocalProxy
from flaschengeist import logger
from flaschengeist.system.decorator import login_required
from flaschengeist.system.controller import accessTokenController, userController
access_controller = LocalProxy(lambda: accessTokenController.AccessTokenController())
auth_bp = Blueprint('auth', __name__)
def register():
return auth_bp
#################################################
# Routes #
# #
# /auth POST: login (new token) #
# GET: get all tokens for user #
# /auth/<token> GET: get lifetime of token #
# PUT: set new lifetime #
# DELETE: logout / delete token #
#################################################
@auth_bp.route("/auth", methods=['POST'])
def _create_token():
""" Login User
Login in User and create an AccessToken for the User.
Returns:
A JSON-File with user information and created token or errors
"""
logger.debug("Start log in.")
data = request.get_json()
username = data['username']
password = data['password']
logger.debug("search user {{ {} }} in database".format(username))
user = userController.login_user(username, password)
if not user:
raise Unauthorized
logger.debug("user is {{ {} }}".format(user))
token = access_controller.create(user, user_agent=request.user_agent)
logger.debug("access token is {{ {} }}".format(token))
logger.info("User {{ {} }} success login.".format(username))
# Lets cleanup the DB
access_controller.clear_expired()
return jsonify({"user": user, "token": token.token})
@auth_bp.route("/auth", methods=['GET'])
@login_required()
def _get_tokens(access_token, **kwargs):
tokens = access_controller.get_users_tokens(access_token.user)
return jsonify(tokens)
@auth_bp.route("/auth/<token>", methods=['DELETE'])
@login_required()
def _delete_token(token, access_token, **kwargs):
logger.debug("Try to delete access token {{ {} }}".format(token))
token = access_controller.get_token(token, access_token.user)
if not token:
logger.debug("Token not found in database!")
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
access_controller.delete_token(token)
access_controller.clear_expired()
return jsonify({"ok": "ok"})
@auth_bp.route("/auth/<token>", methods=['GET'])
@login_required()
def _get_token(token, access_token, **kwargs):
logger.debug("get token {{ {} }}".format(token))
token = access_controller.get_token(token, access_token.user)
if not token:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
return jsonify(token)
@auth_bp.route("/auth/<token>", methods=['PUT'])
@login_required()
def _set_lifetime(token, access_token, **kwargs):
token = access_controller.get_token(token, access_token.user)
if not token:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
try:
lifetime = request.get_json()['value']
logger.debug("set lifetime {{ {} }} to access token {{ {} }}".format(lifetime, token))
access_controller.set_lifetime(token, lifetime)
return jsonify({"ok": "ok"})
except (KeyError, TypeError):
raise BadRequest