flaschengeist/flaschengeist/plugins/schedule/event_controller.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

178 lines
4.3 KiB
Python
Raw Normal View History

2020-09-07 14:13:18 +00:00
from werkzeug.exceptions import BadRequest, NotFound
2020-09-05 20:26:00 +00:00
from sqlalchemy.exc import IntegrityError
from flaschengeist import logger
from flaschengeist.database import db
from flaschengeist.plugins.schedule.models import EventType, Event, Job, JobType, Service
def update():
db.session.commit()
def get_event_types():
return EventType.query.all()
def get_event_type(identifier):
"""Get EventType by ID (int) or name (string)"""
if isinstance(identifier, int):
et = EventType.query.get(identifier)
elif isinstance(identifier, str):
et = EventType.query.filter(EventType.name == identifier).one_or_none()
else:
logger.debug("Invalid identifier type for EventType")
raise BadRequest
if not et:
raise NotFound
return et
def create_event_type(name):
try:
event = EventType(name=name)
db.session.add(event)
db.session.commit()
return event
except IntegrityError:
raise BadRequest("Name already exists")
def rename_event_type(identifier, new_name):
event_type = get_event_type(identifier)
event_type.name = new_name
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Name already exists")
def delete_event_type(name):
event_type = get_event_type(name)
db.session.delete(event_type)
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Type still in use")
2020-09-05 20:26:00 +00:00
def get_job_types():
return JobType.query.all()
def get_job_type(type_id):
job_type = JobType.query.get(type_id)
print(job_type)
if not job_type:
raise NotFound
return job_type
def create_job_type(name):
try:
job_type = JobType(name=name)
db.session.add(job_type)
db.session.commit()
return job_type
except IntegrityError:
raise BadRequest("Name already exists")
def rename_job_type(name, new_name):
job_type = get_job_type(name)
job_type.name = new_name
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Name already exists")
def delete_job_type(name):
job_type = get_job_type(name)
db.session.delete(job_type)
try:
db.session.commit()
except IntegrityError:
raise BadRequest("Type still in use")
def get_event(event_id):
event = Event.query.get(event_id)
if event is None:
raise NotFound
return event
2020-09-07 14:13:18 +00:00
def get_events(start, end):
2020-09-07 14:13:18 +00:00
"""Query events which start from begin until end
Args:
start (datetime): Earliest start
2020-09-07 14:13:18 +00:00
end (datetime): Latest start
Returns: collection of Event objects
"""
return Event.query.filter((start <= Event.start), (Event.start < end)).all()
2020-09-07 14:13:18 +00:00
def delete_event(event_id):
2020-09-07 14:13:18 +00:00
"""Delete event with given ID
Args:
event_id: id of Event to delete
2020-09-07 14:13:18 +00:00
Raises:
NotFound if not found
2020-09-07 14:13:18 +00:00
"""
event = get_event(event_id)
db.session.delete(event)
db.session.commit()
2020-09-07 14:13:18 +00:00
def create_event(event_type, start, end=None, jobs=[], description=None):
2020-09-05 20:26:00 +00:00
try:
logger.debug(event_type)
event = Event(start=start, end=end, description=description, type=event_type, jobs=jobs)
2020-09-05 20:26:00 +00:00
db.session.add(event)
db.session.commit()
return event
except IntegrityError:
logger.debug("Database error when creating new event", exc_info=True)
raise BadRequest
def get_job(job_slot_id, event_id):
js = Job.query.filter(Job.id == job_slot_id).filter(Job._event_id == event_id).one_or_none()
if js is None:
raise NotFound
return js
2020-09-07 14:13:18 +00:00
def add_job(event, job_type, required_services, start, end=None, comment=None):
job = Job(required_services=required_services, type=job_type, start=start, end=end, comment=comment)
event.jobs.append(job)
update()
return job
def update():
try:
db.session.commit()
except IntegrityError:
logger.debug("Error, looks like a Job with that type already exists on an event", exc_info=True)
raise BadRequest()
def delete_job(job: Job):
db.session.delete(job)
db.session.commit()
def assign_to_job(job: Job, user, value):
service = Service.query.get((job.id, user._id))
if service:
service.value = value
else:
service = Service(user_=user, value=value, job_=job)
db.session.add(service)
db.session.commit()