flaschengeist/flaschengeist/system/controller/accessTokenController.py

116 lines
4.1 KiB
Python

from ..models.accessToken import AccessToken
from flaschengeist.system.database import db
from datetime import datetime, timedelta
import secrets
from . import Singleton
import logging
logger = logging.getLogger("flaschenpost")
class AccessTokenController(metaclass=Singleton):
""" Control all created AccessToken
This Class create, delete, find and manage AccessToken.
Attributes:
lifetime: Variable for the Lifetime of one AccessToken in seconds.
"""
instance = None
tokenList = None
def __init__(self, lifetime=1800):
""" Initialize AccessTokenController
Initialize Thread and set tokenList empty.
"""
logger.debug("init access token controller")
self.lifetime = lifetime
def validate_token(self, token, roles):
""" Verify access token
Verify an AccessToken and Group so if the User has permission or not.
Retrieves the access token if valid else retrieves False
Args:
token: Token to verify.
roles: Roles needed to access restricted routes
Returns:
An the AccessToken for this given Token or False.
"""
logger.debug("check token {{ {} }} is valid".format(token))
for access_token in AccessToken.query.filter_by(token=token):
time_end = access_token.timestamp + timedelta(seconds=access_token.lifetime)
now = datetime.utcnow()
logger.debug("now is {{ {} }}, endtime is {{ {} }}".format(now, time_end))
if now <= time_end:
logger.debug("check if token {{ {} }} is same as {{ {} }}".format(token, access_token))
if not roles or (roles and self.userHasRole(access_token.user, roles)):
access_token.updateTimestamp()
db.session.commit()
return access_token
else:
logger.debug("access token is {{ {} }} out of date".format(access_token))
db.session.delete(access_token)
db.session.commit()
logger.debug("no valid access token with token: {{ {} }} and group: {{ {} }}".format(token, roles))
return False
def userHasRole(self, user, roles):
for group in user.groups:
for role in group.roles:
if role.name in roles:
return True
return False
def create(self, user, user_agent=None) -> AccessToken:
""" Create an AccessToken
Create an AccessToken for an User and add it to the tokenList.
Args:
user: For which User is to create an AccessToken
user_agent: User agent to identify session
Returns:
AccessToken: A created Token for User
"""
logger.debug("create access token")
token_str = secrets.token_hex(16)
token = AccessToken(token=token_str, user=user, lifetime=self.lifetime,
browser=user_agent.browser, platform=user_agent.platform)
db.session.add(token)
db.session.commit()
logger.debug("access token is {{ {} }}".format(token))
return token
def getAccessTokensFromUser(self, user):
return AccessToken.query.filter(AccessToken.user == user)
@staticmethod
def delete_token(token):
if token is isinstance(token, AccessToken):
db.session.delete(token)
else:
AccessToken.query.filter_by(token=token).delete()
db.session.commit()
@staticmethod
def update_token(token):
token.update_timestamp()
db.session.commit()
def clear_expired(self):
logger.debug("Clear expired AccessToken")
might_expired = datetime.utcnow() - timedelta(seconds=self.lifetime)
tokens = AccessToken.query.filter(AccessToken.timestamp < might_expired)
logger.debug(tokens)
for token in tokens:
if token.timestamp < datetime.utcnow() - timedelta(seconds=token.lifetime):
logger.debug("Delete token %s", token.token)
db.session.delete(token)
db.session.commit()