Format code with black (line length: 120)

https://github.com/psf/black
This commit is contained in:
Ferdinand Thiessen 2020-10-15 22:10:50 +02:00
parent 2c55edf6a8
commit 41e60425a9
19 changed files with 284 additions and 306 deletions

View File

@ -11,11 +11,11 @@ from pathlib import Path
from logging.config import dictConfig
from werkzeug.local import LocalProxy
__version__ = pkg_resources.get_distribution('flaschengeist').version
__version__ = pkg_resources.get_distribution("flaschengeist").version
_module_path = Path(__file__).parent
logger = LocalProxy(lambda: logging.getLogger(__name__))
with (_module_path / 'logging.yml').open(mode='rb') as file:
with (_module_path / "logging.yml").open(mode="rb") as file:
config = yaml.safe_load(file.read())
logging.config.dictConfig(config)

View File

@ -32,31 +32,32 @@ class CustomJSONEncoder(JSONEncoder):
def __load_plugins(app):
logger.info('Search for plugins')
app.config['FG_PLUGINS'] = {}
for entry_point in pkg_resources.iter_entry_points('flaschengeist.plugin'):
logger.info("Search for plugins")
app.config["FG_PLUGINS"] = {}
for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugin"):
logger.debug("Found plugin: >{}<".format(entry_point.name))
plugin = None
if config.get(entry_point.name, 'enabled', fallback=False):
if config.get(entry_point.name, "enabled", fallback=False):
plugin = entry_point.load()(config[entry_point.name] if config.has_section(entry_point.name) else {})
if plugin.blueprint:
app.register_blueprint(plugin.blueprint)
logger.info("Load plugin >{}<".format(entry_point.name))
if isinstance(plugin, AuthPlugin):
logger.debug('Found authentication plugin: %s', entry_point.name)
if entry_point.name == config['FLASCHENGEIST']['AUTH']:
app.config['FG_AUTH_BACKEND'] = plugin
logger.debug("Found authentication plugin: %s", entry_point.name)
if entry_point.name == config["FLASCHENGEIST"]["AUTH"]:
app.config["FG_AUTH_BACKEND"] = plugin
else:
del plugin
else:
app.config["FG_PLUGINS"][entry_point.name] = plugin
if 'FG_AUTH_BACKEND' not in app.config:
logger.error('No authentication plugin configured or authentication plugin not found')
if "FG_AUTH_BACKEND" not in app.config:
logger.error("No authentication plugin configured or authentication plugin not found")
def install_all():
from flaschengeist.system.database import db
from flaschengeist.system.models import user, event, accessToken
db.create_all()
db.session.commit()
for name, plugin in current_app.config["FG_PLUGINS"].items():
@ -73,6 +74,7 @@ def create_app():
with app.app_context():
from .system.database import db
configure_app(app)
db.init_app(app)
__load_plugins(app)
@ -80,6 +82,7 @@ def create_app():
@app.route("/", methods=["GET"])
def __get_state():
from . import __version__ as version
return jsonify({"plugins": app.config["FG_PLUGINS"], "version": version})
@app.errorhandler(Exception)

View File

@ -9,7 +9,7 @@ class Plugin:
self.permissions = permissions
def install(self):
""" Installation routine
"""Installation routine
Is always called with Flask application context
"""
pass
@ -17,7 +17,7 @@ class Plugin:
class AuthPlugin(Plugin):
def login(self, user, pw):
""" Login routine, MUST BE IMPLEMENTED!
"""Login routine, MUST BE IMPLEMENTED!
Args:
user: User class containing at least the uid

View File

@ -14,37 +14,37 @@ from flaschengeist.system.decorator import login_required
from flaschengeist.system.controller import accessTokenController, userController
access_controller = LocalProxy(lambda: accessTokenController.AccessTokenController())
auth_bp = Blueprint('auth', __name__)
auth_bp = Blueprint("auth", __name__)
class AuthRoutePlugin(Plugin):
def __init__(self, conf):
super().__init__(blueprint=auth_bp)
#################################################
# Routes #
# #
# /auth POST: login (new token) #
# GET: get all tokens for user #
# /auth/<token> GET: get lifetime of token #
# PUT: set new lifetime #
# DELETE: logout / delete token #
#################################################
#################################################
# Routes #
# #
# /auth POST: login (new token) #
# GET: get all tokens for user #
# /auth/<token> GET: get lifetime of token #
# PUT: set new lifetime #
# DELETE: logout / delete token #
#################################################
@auth_bp.route("/auth", methods=['POST'])
@auth_bp.route("/auth", methods=["POST"])
def _create_token():
""" Login User
"""Login User
Login in User and create an AccessToken for the User.
Requires POST data {'userid': string, 'password': string}
Returns:
A JSON-File with user information and created token or errors
Login in User and create an AccessToken for the User.
Requires POST data {'userid': string, 'password': string}
Returns:
A JSON-File with user information and created token or errors
"""
logger.debug("Start log in.")
data = request.get_json()
try:
userid = data['userid']
password = data['password']
userid = data["userid"]
password = data["password"]
except KeyError:
raise BadRequest("Missing parameter(s)")
@ -61,15 +61,13 @@ class AuthRoutePlugin(Plugin):
access_controller.clear_expired()
return jsonify({"user": user, "token": token, "permissions": user.get_permissions()})
@auth_bp.route("/auth", methods=['GET'])
@auth_bp.route("/auth", methods=["GET"])
@login_required()
def _get_tokens(access_token, **kwargs):
tokens = access_controller.get_users_tokens(access_token.user)
return jsonify(tokens)
@auth_bp.route("/auth/<token>", methods=['DELETE'])
@auth_bp.route("/auth/<token>", methods=["DELETE"])
@login_required()
def _delete_token(token, access_token, **kwargs):
logger.debug("Try to delete access token {{ {} }}".format(token))
@ -83,8 +81,7 @@ class AuthRoutePlugin(Plugin):
access_controller.clear_expired()
return jsonify({"ok": "ok"})
@auth_bp.route("/auth/<token>", methods=['GET'])
@auth_bp.route("/auth/<token>", methods=["GET"])
@login_required()
def _get_token(token, access_token, **kwargs):
logger.debug("get token {{ {} }}".format(token))
@ -95,8 +92,7 @@ class AuthRoutePlugin(Plugin):
raise Forbidden
return jsonify(token)
@auth_bp.route("/auth/<token>", methods=['PUT'])
@auth_bp.route("/auth/<token>", methods=["PUT"])
@login_required()
def _set_lifetime(token, access_token, **kwargs):
token = access_controller.get_token(token, access_token.user)
@ -105,7 +101,7 @@ class AuthRoutePlugin(Plugin):
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
try:
lifetime = request.get_json()['value']
lifetime = request.get_json()["value"]
logger.debug("set lifetime {{ {} }} to access token {{ {} }}".format(lifetime, token))
access_controller.set_lifetime(token, lifetime)
return jsonify({"ok": "ok"})

View File

@ -14,83 +14,89 @@ class AuthLDAP(AuthPlugin):
def __init__(self, config):
super().__init__()
defaults = {
'PORT': '389',
'USE_SSL': 'False'
}
defaults = {"PORT": "389", "USE_SSL": "False"}
for name in defaults:
if name not in config:
config[name] = defaults[name]
app.config.update(
LDAP_SERVER=config['URL'],
LDAP_PORT=config.getint('PORT'),
LDAP_BINDDN=config['BINDDN'],
LDAP_SERVER=config["URL"],
LDAP_PORT=config.getint("PORT"),
LDAP_BINDDN=config["BINDDN"],
LDAP_USE_TLS=False,
LDAP_USE_SSL=config.getboolean('USE_SSL'),
LDAP_USE_SSL=config.getboolean("USE_SSL"),
LDAP_TLS_VERSION=ssl.PROTOCOL_TLSv1_2,
LDAP_REQUIRE_CERT=ssl.CERT_NONE,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
if 'SECRET' in config:
app.config['LDAP_SECRET'] = config['SECRET'],
if "SECRET" in config:
app.config["LDAP_SECRET"] = (config["SECRET"],)
self.ldap = LDAPConn(app)
self.dn = config['BASEDN']
self.dn = config["BASEDN"]
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.uid, password, 'uid', self.dn)
return self.ldap.authenticate(user.uid, password, "uid", self.dn)
def update_user(self, user):
self.ldap.connection.search('ou=user,{}'.format(self.dn), '(uid={})'.format(user.uid), SUBTREE,
attributes=['uid', 'givenName', 'sn', 'mail'])
r = self.ldap.connection.response[0]['attributes']
if r['uid'][0] == user.uid:
user.set_attribute('DN', self.ldap.connection.response[0]['dn'])
user.firstname = r['givenName'][0]
user.lastname = r['sn'][0]
if r['mail']:
user.mail = r['mail'][0]
if 'displayName' in r:
user.display_name = r['displayName'][0]
self.ldap.connection.search(
"ou=user,{}".format(self.dn),
"(uid={})".format(user.uid),
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
r = self.ldap.connection.response[0]["attributes"]
if r["uid"][0] == user.uid:
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
user.firstname = r["givenName"][0]
user.lastname = r["sn"][0]
if r["mail"]:
user.mail = r["mail"][0]
if "displayName" in r:
user.display_name = r["displayName"][0]
for group in self._get_groups(user.uid):
user.add_role(group)
def _get_groups(self, uid):
groups = []
self.ldap.connection.search('ou=user,{}'.format(self.dn), '(uid={})'.format(uid), SUBTREE,
attributes=['gidNumber'])
main_group_number = self.ldap.connection.response[0]['attributes']['gidNumber']
self.ldap.connection.search(
"ou=user,{}".format(self.dn), "(uid={})".format(uid), SUBTREE, attributes=["gidNumber"]
)
main_group_number = self.ldap.connection.response[0]["attributes"]["gidNumber"]
if main_group_number:
if type(main_group_number) is list:
main_group_number = main_group_number[0]
self.ldap.connection.search('ou=group,{}'.format(self.dn), '(gidNumber={})'.format(main_group_number),
attributes=['cn'])
groups.append(self.ldap.connection.response[0]['attributes']['cn'][0])
self.ldap.connection.search(
"ou=group,{}".format(self.dn), "(gidNumber={})".format(main_group_number), attributes=["cn"]
)
groups.append(self.ldap.connection.response[0]["attributes"]["cn"][0])
self.ldap.connection.search('ou=group,{}'.format(self.dn), '(memberUID={})'.format(uid), SUBTREE,
attributes=['cn'])
self.ldap.connection.search(
"ou=group,{}".format(self.dn), "(memberUID={})".format(uid), SUBTREE, attributes=["cn"]
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data['attributes']['cn'][0])
groups.append(data["attributes"]["cn"][0])
return groups
def modify_user(self, user: User, password, new_password=None):
try:
dn = user.attributes['DN'].value
dn = user.attributes["DN"].value
ldap_conn = self.ldap.connect(dn, password)
modifier = {}
for name, ldap_name in [("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName")]:
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if getattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
salted_password = hashed(HASHED_SALTED_SHA512, new_password)
modifier['userPassword'] = [(MODIFY_REPLACE, [salted_password])]
modifier["userPassword"] = [(MODIFY_REPLACE, [salted_password])]
ldap_conn.modify(dn, modifier)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest

View File

@ -7,24 +7,24 @@ from flaschengeist.system.models.user import User
def _hash_password(password):
salt = hashlib.sha256(os.urandom(60)).hexdigest().encode('ascii')
pass_hash = hashlib.pbkdf2_hmac('sha3-512', password.encode('utf-8'), salt, 100000)
salt = hashlib.sha256(os.urandom(60)).hexdigest().encode("ascii")
pass_hash = hashlib.pbkdf2_hmac("sha3-512", password.encode("utf-8"), salt, 100000)
pass_hash = binascii.hexlify(pass_hash)
return (salt + pass_hash).decode('ascii')
return (salt + pass_hash).decode("ascii")
def _verify_password(stored_password, provided_password):
salt = stored_password[:64]
stored_password = stored_password[64:]
pass_hash = hashlib.pbkdf2_hmac('sha3-512', provided_password.encode('utf-8'), salt.encode('ascii'), 100000)
pass_hash = binascii.hexlify(pass_hash).decode('ascii')
pass_hash = hashlib.pbkdf2_hmac("sha3-512", provided_password.encode("utf-8"), salt.encode("ascii"), 100000)
pass_hash = binascii.hexlify(pass_hash).decode("ascii")
return pass_hash == stored_password
class AuthPlain(modules.Auth):
def login(self, user: User, password: str):
if user and 'password' in user.attributes:
return _verify_password(user.attributes['password'].value, password)
if user and "password" in user.attributes:
return _verify_password(user.attributes["password"].value, password)
return False
def modify_user(self, user, password, new_password=None):

View File

@ -11,12 +11,12 @@ from . import Plugin, send_message_hook
class MailMessagePlugin(Plugin):
def __init__(self, config):
super().__init__()
self.server = config['SERVER']
self.port = config['PORT']
self.user = config['USER']
self.password = config['PASSWORD']
self.crypt = config['CRYPT']
self.mail = config['MAIL']
self.server = config["SERVER"]
self.port = config["PORT"]
self.user = config["USER"]
self.password = config["PASSWORD"]
self.crypt = config["CRYPT"]
self.mail = config["MAIL"]
@send_message_hook
def send_mail(self, msg: Message):
@ -26,18 +26,18 @@ class MailMessagePlugin(Plugin):
recipients = userController.get_user_by_role(msg.receiver)
mail = MIMEMultipart()
mail['From'] = self.mail
mail['To'] = ", ".join(recipients)
mail['Subject'] = msg.subject
mail["From"] = self.mail
mail["To"] = ", ".join(recipients)
mail["Subject"] = msg.subject
msg.attach(msg.message)
if not self.smtp:
self.__connect()
self.smtp.sendmail(self.mail, recipients, msg.as_string())
def __connect(self):
if self.crypt == 'SSL':
if self.crypt == "SSL":
self.smtp = smtplib.SMTP_SSL(self.server, self.port)
if self.crypt == 'STARTTLS':
if self.crypt == "STARTTLS":
self.smtp = smtplib.SMTP(self.smtpServer, self.port)
self.smtp.starttls()
self.smtp.login(self.user, self.password)

View File

@ -12,18 +12,18 @@ class RolesPlugin(Plugin):
def __init__(self, config):
super().__init__(config, roles_bp)
######################################################
# Routes #
# #
# /roles POST: register new #
# GET: get all roles #
# /roles/permissions GET: get all permissions #
# /roles/<rid> GET: get role with rid #
# PUT: modify role / permission #
# DELETE: remove role #
######################################################
######################################################
# Routes #
# #
# /roles POST: register new #
# GET: get all roles #
# /roles/permissions GET: get all permissions #
# /roles/<rid> GET: get role with rid #
# PUT: modify role / permission #
# DELETE: remove role #
######################################################
@roles_bp.route("/roles", methods=['POST'])
@roles_bp.route("/roles", methods=["POST"])
@login_required()
def add_role(self):
data = request.get_json()
@ -34,31 +34,27 @@ class RolesPlugin(Plugin):
role = roleController.create_role(data["name"], permissions)
return jsonify({"ok": "ok", "id": role.id})
@roles_bp.route("/roles", methods=['GET'])
@roles_bp.route("/roles", methods=["GET"])
@login_required()
def list_roles(self, **kwargs):
roles = roleController.get_roles()
return jsonify(roles)
@roles_bp.route("/roles/permissions", methods=['GET'])
@roles_bp.route("/roles/permissions", methods=["GET"])
@login_required()
def list_permissions(self, **kwargs):
permissions = roleController.get_permissions()
return jsonify(permissions)
@roles_bp.route("/roles/<rid>", methods=['GET'])
@roles_bp.route("/roles/<rid>", methods=["GET"])
@login_required()
def __get_role(self, rid, **kwargs):
role = roleController.get_role(rid)
if role:
return jsonify({
"id": role.id,
"name": role,
"permissions": role.permissions
})
return jsonify({"id": role.id, "name": role, "permissions": role.permissions})
raise NotFound
@roles_bp.route("/roles/<rid>", methods=['PUT'])
@roles_bp.route("/roles/<rid>", methods=["PUT"])
@login_required()
def __edit_role(self, rid, **kwargs):
role = roleController.get_role(rid)
@ -66,14 +62,14 @@ class RolesPlugin(Plugin):
raise NotFound
data = request.get_json()
if 'name' in data:
if "name" in data:
role.name = data["name"]
if "permissions" in data:
roleController.set_permissions(role, data["permissions"])
roleController.update_role(role)
return jsonify({"ok": "ok"})
@roles_bp.route("/roles/<rid>", methods=['DELETE'])
@roles_bp.route("/roles/<rid>", methods=["DELETE"])
@login_required()
def __delete_role(self, rid, **kwargs):
if not roleController.delete_role(rid):

View File

@ -16,28 +16,28 @@ class SchedulePlugin(Plugin):
def __init__(self, config):
super().__init__(blueprint=schedule_bp)
####################################################################################
# Routes #
# #
# /schedule/events POST: create new #
# GET: get all events this month #
# /schedule/events/<year>/<month> GET: get events by month / date #
# /<year>/<month>/<day> #
# /schedule/events/<id> GET: get event by ID #
# DELETE: delete specified event #
# PUT: modify specified event #
# /schedule/events/<id>/slots GET: get EventSlots of Event #
# POST: add new EventSlot to Event #
# /schedule/events/<id>/slots/<sid> PUT: modify EventSlot #
# GET: get specified EventSlot #
# /schedule/events/<id>/slots/<sid>/jobs POST: add User #
# /schedule/eventKinds
# /schedule/eventKinds/<id>
# PUT: modify user #
# DELETE: remove user #
####################################################################################
####################################################################################
# Routes #
# #
# /schedule/events POST: create new #
# GET: get all events this month #
# /schedule/events/<year>/<month> GET: get events by month / date #
# /<year>/<month>/<day> #
# /schedule/events/<id> GET: get event by ID #
# DELETE: delete specified event #
# PUT: modify specified event #
# /schedule/events/<id>/slots GET: get EventSlots of Event #
# POST: add new EventSlot to Event #
# /schedule/events/<id>/slots/<sid> PUT: modify EventSlot #
# GET: get specified EventSlot #
# /schedule/events/<id>/slots/<sid>/jobs POST: add User #
# /schedule/eventKinds
# /schedule/eventKinds/<id>
# PUT: modify user #
# DELETE: remove user #
####################################################################################
@schedule_bp.route("/events/<int:id>", methods=['GET'])
@schedule_bp.route("/events/<int:id>", methods=["GET"])
@login_required() # roles=['schedule_read'])
def __get_event(self, id, **kwargs):
event = eventController.get_event(id)
@ -45,9 +45,9 @@ class SchedulePlugin(Plugin):
raise NotFound
return jsonify(event)
@schedule_bp.route("/events", methods=['GET'])
@schedule_bp.route("/events/<int:year>/<int:month>", methods=['GET'])
@schedule_bp.route("/events/<int:year>/<int:month>/<int:day>", methods=['GET'])
@schedule_bp.route("/events", methods=["GET"])
@schedule_bp.route("/events/<int:year>/<int:month>", methods=["GET"])
@schedule_bp.route("/events/<int:year>/<int:month>/<int:day>", methods=["GET"])
@login_required() # roles=['schedule_read'])
def __get_events(self, year=datetime.now().year, month=datetime.now().month, day=None, **kwrags):
"""Get Event objects for specified date (or month or year),
@ -72,15 +72,14 @@ class SchedulePlugin(Plugin):
if month == 12:
end = datetime(year=year + 1, month=1, day=1)
else:
end = datetime(year=year, month=month+1, day=1)
end = datetime(year=year, month=month + 1, day=1)
events = eventController.get_events(begin, end)
return jsonify(events)
except ValueError:
raise BadRequest("Invalid date given")
@schedule_bp.route("/eventKinds", methods=['POST'])
@schedule_bp.route("/eventKinds", methods=["POST"])
@login_required()
def __new_event_kind(self, **kwargs):
data = request.get_json()
@ -89,7 +88,6 @@ class SchedulePlugin(Plugin):
kind = eventController.create_event_kind(data["name"])
return jsonify({"ok": "ok", "id": kind.id})
@schedule_bp.route("/slotKinds", methods=["POST"])
@login_required()
def __new_slot_kind(self, **kwargs):
@ -99,26 +97,25 @@ class SchedulePlugin(Plugin):
kind = eventController.create_job_kind(data["name"])
return jsonify({"ok": "ok", "id": kind.id})
@schedule_bp.route("/events", methods=['POST'])
@schedule_bp.route("/events", methods=["POST"])
@login_required()
def __new_event(self, **kwargs):
data = request.get_json()
event = eventController.create_event(begin=parser.isoparse(data["begin"]),
end=parser.isoparse(data["end"]),
description=data["description"],
kind=EventKind.query.get(data["kind"]))
event = eventController.create_event(
begin=parser.isoparse(data["begin"]),
end=parser.isoparse(data["end"]),
description=data["description"],
kind=EventKind.query.get(data["kind"]),
)
return jsonify({"ok": "ok", "id": event.id})
@schedule_bp.route("/events/<int:id>", methods=["DELETE"])
@login_required()
def __delete_event(self, id, **kwargs):
if not eventController.delete_event(id):
raise NotFound
db.session.commit()
return jsonify({'ok': 'ok'})
return jsonify({"ok": "ok"})
@schedule_bp.route("/eventKinds/<int:id>", methods=["PUT"])
@login_required()
@ -129,7 +126,6 @@ class SchedulePlugin(Plugin):
eventController.rename_event_kind(id, data["name"])
return jsonify({"ok": "ok"})
@schedule_bp.route("/events/<int:event_id>/slots", methods=["GET"])
@login_required()
def __get_slots(self, event_id, **kwargs):
@ -138,7 +134,6 @@ class SchedulePlugin(Plugin):
raise NotFound
return jsonify({event.slots})
@schedule_bp.route("/events/<int:event_id>/slots/<int:slot_id>", methods=["GET"])
@login_required()
def __get_slot(self, event_id, slot_id, **kwargs):
@ -147,7 +142,6 @@ class SchedulePlugin(Plugin):
return jsonify(slot)
raise NotFound
@schedule_bp.route("/events/<int:event_id>/slots/<int:slot_id>", methods=["DELETE"])
@login_required()
def __delete_slot(self, event_id, slot_id, **kwargs):
@ -155,7 +149,6 @@ class SchedulePlugin(Plugin):
return jsonify({"ok": "ok"})
raise NotFound
@schedule_bp.route("/events/<int:event_id>/slots/<int:slot_id>", methods=["PUT"])
@login_required()
def __update_slot(self, event_id, slot_id, **kwargs):
@ -163,13 +156,12 @@ class SchedulePlugin(Plugin):
if not data:
raise BadRequest
for job in data['jobs']:
for job in data["jobs"]:
eventController.add_job(job.kind, job.user)
if eventController.delete_event_slot(slot_id, event_id):
return jsonify({"ok": "ok"})
raise NotFound
@schedule_bp.route("/events/<int:event_id>/slots", methods=["POST"])
@login_required()
def __add_slot(self, event_id, **kwargs):
@ -190,6 +182,5 @@ class SchedulePlugin(Plugin):
eventController.add_slot(event, **attr)
return jsonify({"ok": "ok"})
def __edit_event(self):
...

View File

@ -7,36 +7,36 @@ from flaschengeist.system.decorator import login_required
from flaschengeist.system.controller import userController
users_bp = Blueprint("users", __name__)
permissions = {'EDIT_USER': 'edit_user'}
permissions = {"EDIT_USER": "edit_user"}
class UsersPlugin(Plugin):
def __init__(self, config):
super().__init__(blueprint=users_bp, permissions=permissions)
#################################################
# Routes #
# #
# /users POST: register new #
# GET: get all users #
# /users/<uid> GET: get user with uid #
# PUT: modify user #
# DELETE: remove user #
#################################################
#################################################
# Routes #
# #
# /users POST: register new #
# GET: get all users #
# /users/<uid> GET: get user with uid #
# PUT: modify user #
# DELETE: remove user #
#################################################
@users_bp.route("/users", methods=['POST'])
@users_bp.route("/users", methods=["POST"])
def __registration(self):
logger.debug("Register new User...")
return jsonify({"ok": "ok... well not implemented"})
@users_bp.route("/users", methods=['GET'])
@users_bp.route("/users", methods=["GET"])
@login_required()
def __list_users(self, **kwargs):
logger.debug("Retrieve list of all users")
users = userController.get_users()
return jsonify(users)
@users_bp.route("/users/<uid>", methods=['GET'])
@users_bp.route("/users/<uid>", methods=["GET"])
@login_required()
def __get_user(self, uid, **kwargs):
logger.debug("Get information of user {{ {} }}".format(uid))
@ -45,7 +45,7 @@ class UsersPlugin(Plugin):
return jsonify(user)
raise NotFound
@users_bp.route("/users/<uid>", methods=['PUT'])
@users_bp.route("/users/<uid>", methods=["PUT"])
@login_required()
def __edit_user(self, uid, **kwargs):
logger.debug("Modify information of user {{ {} }}".format(uid))
@ -53,16 +53,16 @@ class UsersPlugin(Plugin):
if not user:
raise NotFound
if uid != kwargs['access_token'].user.uid and user.has_permissions(permissions['EDIT_USER']):
if uid != kwargs["access_token"].user.uid and user.has_permissions(permissions["EDIT_USER"]):
return Forbidden
data = request.get_json()
if 'password' not in data:
if "password" not in data:
raise BadRequest("Password is missing")
for key in ["firstname", "lastname", "display_name", "mail"]:
if key in data:
setattr(user, key, data[key])
new_password = data['new_password'] if 'new_password' in data else None
userController.modify_user(user, data['password'], new_password)
new_password = data["new_password"] if "new_password" in data else None
userController.modify_user(user, data["password"], new_password)
userController.update_user(user)
return jsonify({"ok": "ok"})

View File

@ -4,51 +4,37 @@ from pathlib import Path
from werkzeug.middleware.proxy_fix import ProxyFix
from .. import _module_path, logger
default = {
'MAIL': {
'CRYPT': 'SSL/STARTLS'
}
}
default = {"MAIL": {"CRYPT": "SSL/STARTLS"}}
config = configparser.ConfigParser()
config.read_dict(default)
paths = [_module_path, Path.home()/".config"]
if 'FLASCHENGEIST_CONF' in os.environ:
paths = [_module_path, Path.home() / ".config"]
if "FLASCHENGEIST_CONF" in os.environ:
paths.append(Path(os.environ.get("FLASCHENGEIST_CONF")))
for loc in paths:
try:
with (loc/"flaschengeist.cfg").open() as source:
with (loc / "flaschengeist.cfg").open() as source:
logger.info("Reading config file from >{}<".format(loc))
config.read_file(source)
except IOError:
pass
# Always enable this builtin plugins!
config.read_dict({
'auth': {
'enabled': True
},
'roles': {
'enabled': True
},
'users': {
'enabled': True
}
})
config.read_dict({"auth": {"enabled": True}, "roles": {"enabled": True}, "users": {"enabled": True}})
def configure_app(app):
if not config.has_option('FLASCHENGEIST', 'SECRET_KEY'):
logger.warn('No secret key was configured, please configure one for production systems!')
app.config['SECRET_KEY'] = config.get('FLASCHENGEIST', 'SECRET_KEY', fallback='0a657b97ef546da90b2db91862ad4e29')
if not config.has_option("FLASCHENGEIST", "SECRET_KEY"):
logger.warn("No secret key was configured, please configure one for production systems!")
app.config["SECRET_KEY"] = config.get("FLASCHENGEIST", "SECRET_KEY", fallback="0a657b97ef546da90b2db91862ad4e29")
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://{user}:{passwd}@{host}/{database}'.format(
user=config['DATABASE']['USER'],
passwd=config['DATABASE']['PASSWORD'],
host=config['DATABASE']['HOST'],
database=config['DATABASE']['DATABASE']
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://{user}:{passwd}@{host}/{database}".format(
user=config["DATABASE"]["USER"],
passwd=config["DATABASE"]["PASSWORD"],
host=config["DATABASE"]["HOST"],
database=config["DATABASE"]["DATABASE"],
)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
if config.has_option("FLASCHENGEIST", "ROOT"):
logger.debug("Setting application root to >{}<".format(config["FLASCHENGEIST"]["ROOT"]))

View File

@ -8,37 +8,37 @@ from . import Singleton
class AccessTokenController(metaclass=Singleton):
""" Control all created AccessToken
"""Control all created AccessToken
This Class create, delete, find and manage AccessToken.
This Class create, delete, find and manage AccessToken.
Attributes:
lifetime: Variable for the Lifetime of one AccessToken in seconds.
Attributes:
lifetime: Variable for the Lifetime of one AccessToken in seconds.
"""
def __init__(self, lifetime=1800):
self.lifetime = lifetime
def validate_token(self, token, user_agent, permissions):
""" Verify access token
"""Verify access token
Verify an AccessToken and Roles so if the User has permission or not.
Retrieves the access token if valid else retrieves False
Verify an AccessToken and Roles so if the User has permission or not.
Retrieves the access token if valid else retrieves False
Args:
token: Token to verify.
user_agent: User agent of browser to check
permissions: Permissions needed to access restricted routes
Returns:
An the AccessToken for this given Token or False.
Args:
token: Token to verify.
user_agent: User agent of browser to check
permissions: Permissions needed to access restricted routes
Returns:
An the AccessToken for this given Token or False.
"""
logger.debug("check token {{ {} }} is valid".format(token))
access_token = AccessToken.query.filter_by(token=token).one_or_none()
if access_token:
logger.debug("token found, check if expired or invalid user agent differs")
if access_token.expires >= datetime.utcnow() and (
access_token.browser == user_agent.browser and
access_token.platform == user_agent.platform):
access_token.browser == user_agent.browser and access_token.platform == user_agent.platform
):
if not permissions or access_token.user.has_permissions(permissions):
access_token.refresh()
db.session.commit()
@ -50,7 +50,7 @@ class AccessTokenController(metaclass=Singleton):
return False
def create(self, user, user_agent=None) -> AccessToken:
""" Create an AccessToken
"""Create an AccessToken
Args:
user: For which User is to create an AccessToken
@ -61,8 +61,9 @@ class AccessTokenController(metaclass=Singleton):
"""
logger.debug("create access token")
token_str = secrets.token_hex(16)
token = AccessToken(token=token_str, user=user, lifetime=self.lifetime,
browser=user_agent.browser, platform=user_agent.platform)
token = AccessToken(
token=token_str, user=user, lifetime=self.lifetime, browser=user_agent.browser, platform=user_agent.platform
)
token.refresh()
db.session.add(token)
db.session.commit()

View File

@ -33,10 +33,7 @@ def delete_event(id):
def create_event(begin, kind, end=None, description=None):
try:
event = Event(begin=begin,
end=end,
description=description,
kind=kind)
event = Event(begin=begin, end=end, description=description, kind=kind)
db.session.add(event)
db.session.commit()
return event

View File

@ -11,14 +11,14 @@ def login_user(username, password):
if user is None:
user = User(uid=username)
db.session.add(user)
if current_app.config['FG_AUTH_BACKEND'].login(user, password):
if current_app.config["FG_AUTH_BACKEND"].login(user, password):
update_user(user)
return user
return None
def update_user(user):
current_app.config['FG_AUTH_BACKEND'].update_user(user)
current_app.config["FG_AUTH_BACKEND"].update_user(user)
db.session.commit()
@ -37,7 +37,7 @@ def modify_user(user, password, new_password=None):
NotImplemented: If backend is not capable of this operation
BadRequest: Password is wrong or other logic issues
"""
current_app.config['FG_AUTH_BACKEND'].modify_user(user, password, new_password)
current_app.config["FG_AUTH_BACKEND"].modify_user(user, password, new_password)
def get_users():

View File

@ -7,6 +7,7 @@ from flaschengeist import logger
def login_required(**kwargs):
from .controller.accessTokenController import AccessTokenController
ac_controller = AccessTokenController()
permissions = None
if "permissions" in kwargs:
@ -15,14 +16,16 @@ def login_required(**kwargs):
def real_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
token = request.headers.get('Token')
token = request.headers.get("Token")
access_token = ac_controller.validate_token(token, request.user_agent, permissions)
if access_token:
kwargs['access_token'] = access_token
kwargs["access_token"] = access_token
logger.debug("token {{ {} }} is valid".format(token))
return func(*args, **kwargs)
else:
logger.info("token {{ {} }} is not valid".format(token))
raise Unauthorized
return wrapper
return real_decorator

View File

@ -5,16 +5,17 @@ from flaschengeist import logger
class AccessToken(db.Model):
""" Model for an AccessToken
"""Model for an AccessToken
Args:
expires: Is a Datetime from current Time.
user: Is an User.
token: String to verify access later.
Args:
expires: Is a Datetime from current Time.
user: Is an User.
token: String to verify access later.
"""
__tablename__ = 'session'
__tablename__ = "session"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
user = db.relationship("User", back_populates="sessions")
expires = db.Column(db.DateTime)
@ -24,25 +25,25 @@ class AccessToken(db.Model):
platform = db.Column(db.String(30))
def refresh(self):
""" Update the Timestamp
"""Update the Timestamp
Update the Timestamp to the current Time.
Update the Timestamp to the current Time.
"""
logger.debug("update timestamp from access token {{ {} }}".format(self))
self.expires = datetime.utcnow() + timedelta(seconds=self.lifetime)
def serialize(self):
""" Create Dic to dump in JSON
"""Create Dic to dump in JSON
Returns:
A Dic with static Attributes.
Returns:
A Dic with static Attributes.
"""
return {
"token": self.token,
"expires": self.expires.replace(tzinfo=timezone.utc),
"lifetime": self.lifetime,
"browser": self.browser,
"platform": self.platform
"platform": self.platform,
}
def __eq__(self, token):
@ -50,4 +51,5 @@ class AccessToken(db.Model):
def __str__(self):
return "AccessToken(user={}, token={}, expires={}, lifetime={})".format(
self.user, self.token, self.expires, self.lifetime)
self.user, self.token, self.expires, self.lifetime
)

View File

@ -3,46 +3,40 @@ from ..database import db
class Event(db.Model):
"""Model for an Event"""
__tablename__ = 'event'
__tablename__ = "event"
id = db.Column(db.Integer, primary_key=True)
begin = db.Column(db.DateTime, nullable=False)
end = db.Column(db.DateTime)
description = db.Column(db.String(240))
kind_id = db.Column(db.Integer, db.ForeignKey('event_kind.id', ondelete="CASCADE"), nullable=False)
kind_id = db.Column(db.Integer, db.ForeignKey("event_kind.id", ondelete="CASCADE"), nullable=False)
kind = db.relationship("EventKind")
slots = db.relationship("EventSlot", back_populates="event", cascade="all, delete")
#notices = db.relationship("EventNotice", back_populates="event")
# notices = db.relationship("EventNotice", back_populates="event")
def serialize(self):
return {
"id": self.id,
"begin": self.begin,
"end": self.end,
"description": self.description,
"kind": self.kind
}
return {"id": self.id, "begin": self.begin, "end": self.end, "description": self.description, "kind": self.kind}
class EventKind(db.Model):
"""Model for an EventKind"""
__tablename__ = "event_kind"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), nullable=False, unique=True)
def serialize(self):
return {
"id": self.id,
"name": self.name
}
return {"id": self.id, "name": self.name}
class EventSlot(db.Model):
"""Model for an EventSlot"""
__tablename__ = "event_slot"
id = db.Column(db.Integer, primary_key=True)
start = db.Column(db.DateTime)
end = db.Column(db.DateTime)
event_id = db.Column(db.Integer, db.ForeignKey('event.id'), nullable=False)
event_id = db.Column(db.Integer, db.ForeignKey("event.id"), nullable=False)
event = db.relationship("Event", back_populates="slots")
slots = db.relationship("JobSlot", back_populates="event_slot")
@ -59,9 +53,9 @@ class JobSlot(db.Model):
__tablename__ = "job_slot"
id = db.Column(db.Integer, primary_key=True)
needed_persons = db.Column(db.Numeric(precision=4, scale=2))
event_slot_id = db.Column(db.Integer, db.ForeignKey('event_slot.id'))
event_slot_id = db.Column(db.Integer, db.ForeignKey("event_slot.id"))
event_slot = db.relationship("EventSlot", back_populates="slots")
kind_id = db.Column(db.Integer, db.ForeignKey('job_kind.id'))
kind_id = db.Column(db.Integer, db.ForeignKey("job_kind.id"))
kind = db.relationship("JobKind")
jobs = db.relationship("Job", back_populates="slot")
@ -70,9 +64,9 @@ class Job(db.Model):
__tablename__ = "job"
id = db.Column(db.Integer, primary_key=True)
value = db.Column(db.Numeric(precision=3, scale=2))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
user = db.relationship("User")
slot_id = db.Column(db.Integer, db.ForeignKey('job_slot.id'))
slot_id = db.Column(db.Integer, db.ForeignKey("job_slot.id"))
slot = db.relationship("JobSlot")

View File

@ -5,26 +5,28 @@ from werkzeug.local import LocalProxy
logger = LocalProxy(lambda: current_app.logger)
association_table = db.Table('user_x_role',
db.Column('user_id', db.Integer, db.ForeignKey('user.id')),
db.Column('role_id', db.Integer, db.ForeignKey('role.id'))
)
association_table = db.Table(
"user_x_role",
db.Column("user_id", db.Integer, db.ForeignKey("user.id")),
db.Column("role_id", db.Integer, db.ForeignKey("role.id")),
)
class User(db.Model):
""" Database Object for User
"""Database Object for User
Table for all saved User
Table for all saved User
Attributes:
id: Id in Database as Primary Key.
uid: User ID used by authentication provider
display_name: Name to show
firstname: Firstname of the User
lastname: Lastname of the User
mail: mail address of the User
Attributes:
id: Id in Database as Primary Key.
uid: User ID used by authentication provider
display_name: Name to show
firstname: Firstname of the User
lastname: Lastname of the User
mail: mail address of the User
"""
__tablename__ = 'user'
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
uid = db.Column(db.String(30))
display_name = db.Column(db.String(30))
@ -33,8 +35,9 @@ class User(db.Model):
mail = db.Column(db.String(30))
roles = db.relationship("Role", secondary=association_table)
sessions = db.relationship("AccessToken", back_populates="user")
attributes = db.relationship("UserAttribute", collection_class=attribute_mapped_collection('name'),
cascade="all, delete")
attributes = db.relationship(
"UserAttribute", collection_class=attribute_mapped_collection("name"), cascade="all, delete"
)
def set_attribute(self, name, value):
if name in self.attributes:
@ -50,16 +53,16 @@ class User(db.Model):
def update_data(self, data):
logger.debug("update data of user")
if 'uid' in data:
self.uid = data['uid']
if 'firstname' in data:
self.firstname = data['firstname']
if 'lastname' in data:
self.lastname = data['lastname']
if 'mail' in data:
self.mail = data['mail']
if 'display_name' in data:
self.display_name = data['display_name']
if "uid" in data:
self.uid = data["uid"]
if "firstname" in data:
self.firstname = data["firstname"]
if "lastname" in data:
self.lastname = data["lastname"]
if "mail" in data:
self.mail = data["mail"]
if "display_name" in data:
self.display_name = data["display_name"]
def get_permissions(self):
return ["user"] + [permission.name for role in self.roles for permission in role.permissions]
@ -78,26 +81,27 @@ class User(db.Model):
"firstname": self.firstname,
"lastname": self.lastname,
"mail": self.mail,
"roles": [r.name for r in self.roles]
"roles": [r.name for r in self.roles],
}
class UserAttribute(db.Model):
__tablename__ = 'user_attribute'
__tablename__ = "user_attribute"
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
user = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False)
name = db.Column(db.String(30))
value = db.Column(db.String(192))
role_permission_association_table = db.Table('role_x_permission',
db.Column('role_id', db.Integer, db.ForeignKey('role.id')),
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'))
)
role_permission_association_table = db.Table(
"role_x_permission",
db.Column("role_id", db.Integer, db.ForeignKey("role.id")),
db.Column("permission_id", db.Integer, db.ForeignKey("permission.id")),
)
class Role(db.Model):
__tablename__ = 'role'
__tablename__ = "role"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), unique=True)
permissions = db.relationship("Permission", secondary=role_permission_association_table, cascade="all, delete")
@ -107,7 +111,7 @@ class Role(db.Model):
class Permission(db.Model):
__tablename__ = 'permission'
__tablename__ = "permission"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), unique=True)

View File

@ -19,7 +19,7 @@ setup(
"werkzeug",
"bjoern",
"python-dateutil",
"pyhooks"
"pyhooks",
],
extras_require={"ldap": ["flask_ldapconn", "ldap3"]},
entry_points={
@ -29,7 +29,6 @@ setup(
"roles = flaschengeist.modules.roles:RolesPlugin",
"schedule = flaschengeist.modules.schedule:SchedulePlugin",
"mail = flaschengeist.modules.message_mail:MailMessagePlugin",
"auth_plain = flaschengeist.modules.auth_plain:AuthPlain",
"auth_ldap = flaschengeist.modules.auth_ldap:AuthLDAP [ldap]",
],