144 lines
4.2 KiB
Python
144 lines
4.2 KiB
Python
import secrets
|
|
from datetime import datetime, timezone
|
|
|
|
from ua_parser import user_agent_parser
|
|
from werkzeug.exceptions import Forbidden, Unauthorized
|
|
|
|
from .. import logger
|
|
from ..database import db
|
|
from ..models import Session
|
|
|
|
lifetime = 1800
|
|
|
|
|
|
def get_user_agent(request_headers):
|
|
return user_agent_parser.Parse(request_headers.get("User-Agent", "") if request_headers else "")
|
|
|
|
|
|
def validate_token(token, request_headers, permission):
|
|
"""Verify session
|
|
|
|
Verify a Session and Roles so if the User has permission or not.
|
|
Retrieves the access token if valid else retrieves False
|
|
|
|
Args:
|
|
token: Token to verify.
|
|
request_headers: Headers to validate user agent of browser
|
|
permission: Permission needed to access restricted routes
|
|
Returns:
|
|
A Session for this given Token
|
|
Raises:
|
|
Unauthorized: If token is invalid or expired
|
|
Forbidden: If permission is insufficient
|
|
"""
|
|
logger.debug("check token {{ {} }} is valid".format(token))
|
|
session = Session.query.filter_by(token=token).one_or_none()
|
|
if session:
|
|
logger.debug("token found, check if expired or invalid user agent differs")
|
|
user_agent = get_user_agent(request_headers)
|
|
platform = user_agent["os"]["family"]
|
|
browser = user_agent["user_agent"]["family"]
|
|
|
|
if session.expires >= datetime.now(timezone.utc) and (
|
|
session.browser == browser and session.platform == platform
|
|
):
|
|
if not permission or session.user_.has_permission(permission):
|
|
session.refresh()
|
|
db.session.commit()
|
|
return session
|
|
else:
|
|
raise Forbidden
|
|
else:
|
|
logger.debug("access token is out of date or invalid client used")
|
|
delete_session(session)
|
|
logger.debug("no valid access token with token: {{ {} }} and permission: {{ {} }}".format(token, permission))
|
|
raise Unauthorized
|
|
|
|
|
|
def create(user, request_headers=None) -> Session:
|
|
"""Create a Session
|
|
|
|
Args:
|
|
user: For which User is to create a Session
|
|
request_headers: Headers to validate user agent of browser
|
|
|
|
Returns:
|
|
Session: A created Token for User
|
|
"""
|
|
logger.debug("create access token")
|
|
token_str = secrets.token_hex(16)
|
|
user_agent = get_user_agent(request_headers)
|
|
logger.debug(f"platform: {user_agent['os']['family']}, browser: {user_agent['user_agent']['family']}")
|
|
session = Session(
|
|
token=token_str,
|
|
_user_id=user.id_,
|
|
lifetime=lifetime,
|
|
platform=user_agent["os"]["family"],
|
|
browser=user_agent["user_agent"]["family"],
|
|
)
|
|
session.refresh()
|
|
db.session.add(session)
|
|
db.session.commit()
|
|
logger.debug("access token is {{ {} }}".format(session.token))
|
|
return session
|
|
|
|
|
|
def get_session(token, owner=None):
|
|
"""Retrieves Session from token string
|
|
|
|
Args:
|
|
token (str): Token string
|
|
owner (User, optional): User owning the token
|
|
|
|
Raises:
|
|
Forbidden: Raised if owner is set but does not match
|
|
Returns:
|
|
Session: Token object identified by given token string
|
|
"""
|
|
session = Session.query.filter(Session.token == token).one_or_none()
|
|
if session and (owner and owner != session.user_):
|
|
raise Forbidden
|
|
return session
|
|
|
|
|
|
def get_users_sessions(user):
|
|
return Session.query.filter(Session.user_ == user)
|
|
|
|
|
|
def delete_sessions(user):
|
|
"""Deletes all active sessions of a user
|
|
|
|
Args:
|
|
user (User): User to delete all sessions for
|
|
"""
|
|
Session.query.filter(Session.user_.id_ == user.id_).delete()
|
|
db.session.commit()
|
|
|
|
|
|
def delete_session(token: Session):
|
|
"""Deletes given Session
|
|
|
|
Args:
|
|
token (Session): Token to delete
|
|
"""
|
|
db.session.delete(token)
|
|
db.session.commit()
|
|
|
|
|
|
def update_session(session):
|
|
session.refresh()
|
|
db.session.commit()
|
|
|
|
|
|
def set_lifetime(session, lifetime):
|
|
session.lifetime = lifetime
|
|
update_session(session)
|
|
|
|
|
|
def clear_expired():
|
|
"""Remove expired tokens from database"""
|
|
logger.debug("Clear expired Sessions")
|
|
deleted = Session.query.filter(Session.expires < datetime.now(timezone.utc)).delete()
|
|
logger.debug("{} sessions have been removed".format(deleted))
|
|
db.session.commit()
|