133 lines
4.9 KiB
Python
133 lines
4.9 KiB
Python
#############################################
|
|
# Plugin: Auth #
|
|
# Functionality: Allow management of #
|
|
# authentication, login, logout, etc #
|
|
#############################################
|
|
|
|
from flask import Blueprint, current_app, request, jsonify
|
|
from werkzeug.local import LocalProxy
|
|
|
|
from flaschengeist.system.decorator import login_required
|
|
from flaschengeist.system.exceptions import PermissionDenied
|
|
from flaschengeist.system.controller import mainController as mc
|
|
import flaschengeist.system.controller.accessTokenController as ac
|
|
|
|
from flaschengeist.system.models.accessToken import AccessToken
|
|
|
|
logger = LocalProxy(lambda: current_app.logger)
|
|
access_controller = LocalProxy(lambda: ac.AccessTokenController())
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
|
|
def register():
|
|
return auth_bp
|
|
|
|
############################################
|
|
# Routes #
|
|
############################################
|
|
|
|
|
|
@auth_bp.route("/login", methods=['POST'])
|
|
def _login():
|
|
""" Login User
|
|
|
|
Login in User and create an AccessToken for the User.
|
|
Returns:
|
|
A JSON-File with user information and created token or errors
|
|
"""
|
|
logger.debug("Start log in.")
|
|
data = request.get_json()
|
|
logger.info(request)
|
|
username = data['username']
|
|
password = data['password']
|
|
logger.debug("username is {{ {} }}".format(username))
|
|
try:
|
|
logger.debug("search {{ {} }} in database".format(username))
|
|
main_controller = mc.MainController()
|
|
user = main_controller.login_user(username, password)
|
|
logger.debug("user is {{ {} }}".format(user))
|
|
token = access_controller.create(user, user_agent=request.user_agent)
|
|
logger.debug("access token is {{ {} }}".format(token))
|
|
logger.debug("validate access token")
|
|
dic = user.default()
|
|
dic["accessToken"] = token.token
|
|
logger.info("User {{ {} }} success login.".format(username))
|
|
logger.debug("return login {{ {} }}".format(dic))
|
|
return jsonify(dic)
|
|
except PermissionDenied as err:
|
|
logger.debug("permission denied exception in login", exc_info=True)
|
|
return jsonify({"error": str(err)}), 401
|
|
except Exception as err:
|
|
logger.error("exception in login.", exc_info=True)
|
|
return jsonify({"error": "permission denied"}), 401
|
|
|
|
|
|
@auth_bp.route("/logout", methods=['GET'])
|
|
@login_required()
|
|
def _logout(**kwargs):
|
|
try:
|
|
logger.debug("logout user")
|
|
token = kwargs['accToken']
|
|
logger.debug("access token is {{ {} }}".format(token))
|
|
logger.debug("delete access token")
|
|
access_controller.deleteAccessToken(token)
|
|
access_controller.clearExpired()
|
|
logger.info("return ok logout user")
|
|
return jsonify({"ok": "ok"})
|
|
except Exception as err:
|
|
logger.warning("exception in logout user.", exc_info=True)
|
|
return jsonify({"error": str(err)}), 500
|
|
|
|
|
|
@auth_bp.route("/user/getAccessTokens", methods=['GET', 'POST'])
|
|
# @auth_bp.route("/accessTokens", methods=['GET', 'POST'])
|
|
@login_required()
|
|
def _getAccessTokens(**kwargs):
|
|
try:
|
|
if request.method == 'POST':
|
|
data = request.get_json()
|
|
token = AccessToken(data['id'], kwargs['accToken'].user, None, None, None)
|
|
access_controller.delete_token(token)
|
|
tokens = access_controller.getAccessTokensFromUser(kwargs['accToken'].user)
|
|
r = [t.toJSON() for t in tokens]
|
|
logger.debug("return {{ {} }}".format(r))
|
|
return jsonify(r)
|
|
except Exception as err:
|
|
logger.debug("exception", exc_info=True)
|
|
return jsonify({"error": str(err)}), 500
|
|
|
|
|
|
@auth_bp.route("/getLifetime", methods=['GET'])
|
|
@login_required()
|
|
def _getLifeTime(**kwargs):
|
|
try:
|
|
logger.debug("get lifetime of access token")
|
|
token = kwargs['accToken']
|
|
logger.debug("accessToken is {{ {} }}".format(token))
|
|
return jsonify({"value": token.lifetime})
|
|
except Exception as err:
|
|
logger.warning("exception in get lifetime of access token.", exc_info=True)
|
|
return jsonify({"error": str(err)}), 500
|
|
|
|
|
|
@auth_bp.route("/setLifetime", methods=['POST'])
|
|
@login_required()
|
|
def _saveLifeTime(**kwargs):
|
|
try:
|
|
token = kwargs['accToken']
|
|
logger.debug("save lifetime for access token {{ {} }}".format(token))
|
|
data = request.get_json()
|
|
lifetime = data['value']
|
|
logger.debug("lifetime is {{ {} }}".format(lifetime))
|
|
logger.info("set lifetime {{ {} }} to access token {{ {} }}".format(
|
|
lifetime, token))
|
|
token.lifetime = lifetime
|
|
logger.info("update access token timestamp")
|
|
token = access_controller.update(token)
|
|
return jsonify({"value": token.lifetime})
|
|
except Exception as err:
|
|
logger.warning(
|
|
"exception in save lifetime for access token.", exc_info=True)
|
|
return jsonify({"error": str(err)}), 500
|