flaschengeist/flaschengeist/plugins/events/event_controller.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

205 lines
5.0 KiB
Python
Raw Normal View History

2021-03-20 23:55:50 +00:00
from datetime import datetime
from typing import Optional
from sqlalchemy import or_, and_
2020-09-07 14:13:18 +00:00
from werkzeug.exceptions import BadRequest, NotFound
2020-09-05 20:26:00 +00:00
from sqlalchemy.exc import IntegrityError
from flaschengeist import logger
from flaschengeist.database import db
2021-03-20 23:55:50 +00:00
from flaschengeist.plugins.events.models import EventType, Event, Job, JobType, Service
from flaschengeist.utils.datetime import from_iso_format
def update():
db.session.commit()
def get_event_types():
return EventType.query.all()
def get_event_type(identifier):
"""Get EventType by ID (int) or name (string)"""
if isinstance(identifier, int):
et = EventType.query.get(identifier)
elif isinstance(identifier, str):
et = EventType.query.filter(EventType.name == identifier).one_or_none()
else:
logger.debug("Invalid identifier type for EventType")
raise BadRequest
if not et:
raise NotFound
return et
def create_event_type(name):
try:
event = EventType(name=name)
db.session.add(event)
db.session.commit()
return event
except IntegrityError:
raise BadRequest("Name already exists")
def rename_event_type(identifier, new_name):
event_type = get_event_type(identifier)
event_type.name = new_name
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Name already exists")
def delete_event_type(name):
event_type = get_event_type(name)
db.session.delete(event_type)
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Type still in use")
2020-09-05 20:26:00 +00:00
def get_job_types():
return JobType.query.all()
def get_job_type(type_id):
job_type = JobType.query.get(type_id)
print(job_type)
if not job_type:
raise NotFound
return job_type
def create_job_type(name):
try:
job_type = JobType(name=name)
db.session.add(job_type)
db.session.commit()
return job_type
except IntegrityError:
raise BadRequest("Name already exists")
def rename_job_type(name, new_name):
job_type = get_job_type(name)
job_type.name = new_name
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Name already exists")
def delete_job_type(name):
job_type = get_job_type(name)
db.session.delete(job_type)
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Type still in use")
def get_event(event_id) -> Event:
event = Event.query.get(event_id)
if event is None:
raise NotFound
return event
2020-09-07 14:13:18 +00:00
2021-03-20 23:55:50 +00:00
def get_templates():
return Event.query.filter(Event.is_template == True).all()
def get_events(start: Optional[datetime] = None, end=None):
2020-09-07 14:13:18 +00:00
"""Query events which start from begin until end
Args:
start (datetime): Earliest start
2020-09-07 14:13:18 +00:00
end (datetime): Latest start
Returns: collection of Event objects
"""
2021-03-20 23:55:50 +00:00
query = Event.query.filter(Event.is_template.__eq__(False))
if start is not None:
query = query.filter(start <= Event.start)
if end is not None:
query = query.filter(Event.start < end)
return query.all()
2020-09-07 14:13:18 +00:00
def delete_event(event_id):
2020-09-07 14:13:18 +00:00
"""Delete event with given ID
Args:
event_id: id of Event to delete
2020-09-07 14:13:18 +00:00
Raises:
NotFound if not found
2020-09-07 14:13:18 +00:00
"""
event = get_event(event_id)
db.session.delete(event)
db.session.commit()
2020-09-07 14:13:18 +00:00
2021-03-20 23:55:50 +00:00
def create_event(event_type, start, end=None, jobs=[], is_template=None, name=None, description=None):
2020-09-05 20:26:00 +00:00
try:
logger.debug(event_type)
event = Event(
2021-03-24 19:47:04 +00:00
start=start,
end=end,
name=name,
description=description,
type=event_type,
is_template=is_template,
jobs=jobs,
)
2020-09-05 20:26:00 +00:00
db.session.add(event)
db.session.commit()
return event
except IntegrityError:
logger.debug("Database error when creating new event", exc_info=True)
raise BadRequest
def get_job(job_slot_id, event_id):
js = Job.query.filter(Job.id == job_slot_id).filter(Job.event_id_ == event_id).one_or_none()
if js is None:
raise NotFound
return js
2020-09-07 14:13:18 +00:00
def add_job(event, job_type, required_services, start, end=None, comment=None):
job = Job(required_services=required_services, type=job_type, start=start, end=end, comment=comment)
event.jobs.append(job)
update()
return job
def update():
try:
db.session.commit()
except IntegrityError:
logger.debug("Error, looks like a Job with that type already exists on an event", exc_info=True)
raise BadRequest()
def delete_job(job: Job):
db.session.delete(job)
db.session.commit()
def assign_to_job(job: Job, user, value):
service = Service.query.get((job.id, user.id_))
if value < 0:
if not service:
raise BadRequest
db.session.delete(service)
else:
if service:
service.value = value
else:
service = Service(user_=user, value=value, job_=job)
db.session.add(service)
db.session.commit()