flaschengeist/flaschengeist/controller/sessionController.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

121 lines
3.5 KiB
Python
Raw Normal View History

import secrets
from flaschengeist.models.session import Session
from flaschengeist.database import db
from flaschengeist import logger
from werkzeug.exceptions import Forbidden, Unauthorized
2020-10-15 00:19:51 +00:00
from datetime import datetime, timezone
2020-01-18 22:31:49 +00:00
lifetime = 1800
2019-05-02 16:50:59 +00:00
def validate_token(token, user_agent, permission):
"""Verify session
2019-04-17 12:46:46 +00:00
Verify a Session and Roles so if the User has permission or not.
Retrieves the access token if valid else retrieves False
Args:
token: Token to verify.
user_agent: User agent of browser to check
permission: Permission needed to access restricted routes
Returns:
A Session for this given Token
Raises:
Unauthorized: If token is invalid or expired
Forbidden: If permission is insufficient
"""
logger.debug("check token {{ {} }} is valid".format(token))
session = Session.query.filter_by(token=token).one_or_none()
if session:
logger.debug("token found, check if expired or invalid user agent differs")
if session.expires >= datetime.now(timezone.utc) and (
session.browser == user_agent.browser and session.platform == user_agent.platform
):
if not permission or session._user.has_permission(permission):
session.refresh()
db.session.commit()
return session
else:
raise Forbidden
else:
logger.debug("access token is out of date or invalid client used")
delete_session(session)
2020-11-13 07:24:25 +00:00
logger.debug("no valid access token with token: {{ {} }} and permission: {{ {} }}".format(token, permission))
raise Unauthorized
def create(user, user_agent=None) -> Session:
"""Create a Session
Args:
user: For which User is to create a Session
user_agent: User agent to identify session
Returns:
Session: A created Token for User
"""
logger.debug("create access token")
token_str = secrets.token_hex(16)
session = Session(
token=token_str,
_user=user,
lifetime=lifetime,
browser=user_agent.browser,
platform=user_agent.platform,
)
session.refresh()
db.session.add(session)
db.session.commit()
logger.debug("access token is {{ {} }}".format(session.token))
return session
def get_session(token, owner=None):
"""Retrieves Session from token string
Args:
token (str): Token string
owner (User, optional): User owning the token
Raises:
Forbidden: Raised if owner is set but does not match
Returns:
Session: Token object identified by given token string
"""
session = Session.query.filter(Session.token == token).one_or_none()
if session and (owner and owner != session._user):
raise Forbidden
return session
def get_users_sessions(user):
return Session.query.filter(Session._user == user)
def delete_session(token: Session):
"""Deletes given Session
Args:
token (Session): Token to delete
2019-04-17 12:46:46 +00:00
"""
db.session.delete(token)
db.session.commit()
def update_session(session):
session.refresh()
db.session.commit()
def set_lifetime(session, lifetime):
session.lifetime = lifetime
update_session(session)
2019-04-11 21:56:55 +00:00
def clear_expired():
"""Remove expired tokens from database"""
logger.debug("Clear expired Sessions")
deleted = Session.query.filter(Session.expires < datetime.now(timezone.utc)).delete()
logger.debug("{} sessions have been removed".format(deleted))
db.session.commit()