import http
import logging
from collections import defaultdict
from datetime import date, timedelta
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from beats.db_helpers import serialize_from_document, serialize_to_document
from beats.domain import Beat, BeatRepository, Project, ProjectRepository
from beats.exceptions import ProjectWasNotStarted, TwoProjectInProgess
from beats.validation_models import RecordTimeValidator
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/projects",
tags=["Projects"],
responses={404: {"description": "Not found"}},
)
@router.get("/")
async def list_projects(archived: bool = False):
data = [
serialize_from_document(p)
for p in ProjectRepository.list({"archived": archived})
]
return data
@router.post("/", status_code=http.HTTPStatus.CREATED)
async def create_project(project: Project) -> dict:
project_data = ProjectRepository.create(project.model_dump(exclude_none=True))
return serialize_from_document(project_data)
@router.put("/")
async def update_project(project: Project) -> dict:
updated_project = ProjectRepository.update(
serialize_to_document(project.model_dump(exclude_none=True))
)
return serialize_from_document(updated_project)
@router.post("/{project_id}/archive")
async def archive_project(project_id: str):
ProjectRepository.update({"_id": project_id, "archived": True})
return {"status": "success"}
@router.get("/{project_id}/today/")
async def today_time_for_project(project_id: str):
logs = list(BeatRepository.list({"project_id": project_id}))
beats = [Beat(**serialize_from_document(log)) for log in logs]
today = date.today()
today_logs = [b for b in beats if b.start.date() == today]
return {"duration": str(sum([log.duration for log in today_logs], timedelta()))}
@router.get("/{project_id}/week/")
async def current_week_time_for_project(project_id: str):
logs = list(BeatRepository.list({"project_id": project_id}))
today = date.today()
start_of_week = today - timedelta(days=today.weekday()) end_of_week = start_of_week + timedelta(days=6)
beats = [Beat(**serialize_from_document(log)) for log in logs]
week_logs = [b for b in beats if start_of_week <= b.start.date() <= end_of_week]
per_day = defaultdict(timedelta)
for log in week_logs:
per_day[log.start.strftime("%A")] += log.duration
result = {}
total_duration = timedelta()
for i in range(7):
day_date = start_of_week + timedelta(days=i)
day_name = day_date.strftime("%A")
duration = per_day.get(day_name, timedelta())
result[day_name] = str(duration)
total_duration += duration
result["total_hours"] = round(total_duration.total_seconds() / 3600, 2)
return result
@router.get("/{project_id}/total/", response_model=None)
async def total_work_time_per_month_on_project(project_id: str):
logs = list(BeatRepository.list({"project_id": project_id}))
logs_since_start = []
warnings = []
for log in logs:
log = serialize_from_document(log)
if "end" not in log:
continue
if "start" not in log:
return JSONResponse(
content={"error": f"Invalid log data - {log}"},
status_code=status.HTTP_400_BAD_REQUEST,
)
try:
beat = Beat(**log)
if beat.start.date():
if beat.duration > timedelta(hours=24):
warnings.append(
f"Warning: Log {beat} has duration longer than 24 hours ({beat.duration})."
)
logs_since_start.append(beat)
except Exception as e:
logger.error(f"Error processing log {log}: {e}")
return JSONResponse(
content={"error": f"Invalid log data - {log}"},
status_code=status.HTTP_400_BAD_REQUEST,
)
durations_per_month = defaultdict(timedelta)
for log in logs_since_start:
month_key = log.start.strftime("%Y-%m")
durations_per_month[month_key] += log.duration
result = {
month: round(duration.total_seconds() / 3600, 2)
for month, duration in sorted(durations_per_month.items())
}
return {
"durations_per_month": result,
"warnings": warnings,
}
@router.get("/{project_id}/summary/")
async def get_project_summary(project_id: str):
logs = list(BeatRepository.list({"project_id": project_id}))
logs = [Beat(**serialize_from_document(log)) for log in logs]
statistical = {}
for log in logs:
if log.day not in statistical:
statistical[log.day] = []
statistical[log.day].append(log.duration)
from datetime import timedelta
statistical = {key: str(sum(statistical[key], timedelta())) for key in statistical}
return statistical
@router.post("/{project_id}/start", response_model=None)
async def start_project_timer(project_id: str, time_validator: RecordTimeValidator):
available_project_ids = [str(p["_id"]) for p in ProjectRepository.list()]
if project_id not in available_project_ids:
return {"project_id": "This project id does not exist"}
active_logs = list(BeatRepository.list({"end": None}))
if active_logs:
log = Beat(**serialize_from_document(active_logs[0]))
return JSONResponse(
content={
"error": "another beat already in progress",
"beat": log.model_dump_json(exclude_none=True),
},
status_code=status.HTTP_400_BAD_REQUEST,
)
new_log = Beat(project_id=project_id, start=time_validator.time)
created_log = Beat(
**serialize_from_document(
BeatRepository.create(new_log.model_dump(exclude_none=True))
)
)
return created_log
@router.post("/stop", response_model=None)
async def end_project_timer(time_validator: RecordTimeValidator):
active_logs = list(BeatRepository.list({"end": None}))
if not active_logs:
raise ProjectWasNotStarted
if len(active_logs) > 1:
raise TwoProjectInProgess
log_data = serialize_from_document(active_logs[0])
logger.info(f"We got log {log_data}")
log = Beat(**log_data)
logger.info(f"Validated log: {log.model_dump()}")
log.stop_timer(time=time_validator.time)
BeatRepository.update(serialize_to_document(log.model_dump(exclude_none=True)))
return log