flaschengeist/flaschengeist/controller/sessionController.py

134 lines
3.7 KiB
Python

import secrets
from datetime import datetime, timezone
from werkzeug.exceptions import Forbidden, Unauthorized
from .. import logger
from ..models import Session
from ..database import db
lifetime = 1800
def validate_token(token, user_agent, permission):
"""Verify session
Verify a Session and Roles so if the User has permission or not.
Retrieves the access token if valid else retrieves False
Args:
token: Token to verify.
user_agent: User agent of browser to check
permission: Permission needed to access restricted routes
Returns:
A Session for this given Token
Raises:
Unauthorized: If token is invalid or expired
Forbidden: If permission is insufficient
"""
logger.debug("check token {{ {} }} is valid".format(token))
session = Session.query.filter_by(token=token).one_or_none()
if session:
logger.debug("token found, check if expired or invalid user agent differs")
if session.expires >= datetime.now(timezone.utc) and (
session.browser == user_agent.browser and session.platform == user_agent.platform
):
if not permission or session.user_.has_permission(permission):
session.refresh()
db.session.commit()
return session
else:
raise Forbidden
else:
logger.debug("access token is out of date or invalid client used")
delete_session(session)
logger.debug("no valid access token with token: {{ {} }} and permission: {{ {} }}".format(token, permission))
raise Unauthorized
def create(user, user_agent=None) -> Session:
"""Create a Session
Args:
user: For which User is to create a Session
user_agent: User agent to identify session
Returns:
Session: A created Token for User
"""
logger.debug("create access token")
token_str = secrets.token_hex(16)
session = Session(
token=token_str,
user_=user,
lifetime=lifetime,
browser=user_agent.browser,
platform=user_agent.platform,
)
session.refresh()
db.session.add(session)
db.session.commit()
logger.debug("access token is {{ {} }}".format(session.token))
return session
def get_session(token, owner=None):
"""Retrieves Session from token string
Args:
token (str): Token string
owner (User, optional): User owning the token
Raises:
Forbidden: Raised if owner is set but does not match
Returns:
Session: Token object identified by given token string
"""
session = Session.query.filter(Session.token == token).one_or_none()
if session and (owner and owner != session.user_):
raise Forbidden
return session
def get_users_sessions(user):
return Session.query.filter(Session.user_ == user)
def delete_sessions(user):
"""Deletes all active sessions of a user
Args:
user (User): User to delete all sessions for
"""
Session.query.filter(Session.user_.id_ == user.id_).delete()
db.session.commit()
def delete_session(token: Session):
"""Deletes given Session
Args:
token (Session): Token to delete
"""
db.session.delete(token)
db.session.commit()
def update_session(session):
session.refresh()
db.session.commit()
def set_lifetime(session, lifetime):
session.lifetime = lifetime
update_session(session)
def clear_expired():
"""Remove expired tokens from database"""
logger.debug("Clear expired Sessions")
deleted = Session.query.filter(Session.expires < datetime.now(timezone.utc)).delete()
logger.debug("{} sessions have been removed".format(deleted))
db.session.commit()