Skip to content

Backend Python API

main.py

Entry point for the Quill Medical FastAPI application.

This module configures the API, dependencies, authentication, and patient record routes. All endpoints are exposed under the /api prefix. In development, Swagger UI and ReDoc documentation are also served under /api/docs and /api/redoc.

TotpSetupOut

Bases: BaseModel

Output model containing a provisioning URI for authenticator apps.

Source code in app/main.py
318
319
320
321
class TotpSetupOut(BaseModel):
    """Output model containing a provisioning URI for authenticator apps."""

    provision_uri: str

TotpVerifyIn

Bases: BaseModel

Input model for verifying a TOTP code during setup.

Source code in app/main.py
351
352
353
354
class TotpVerifyIn(BaseModel):
    """Input model for verifying a TOTP code during setup."""

    code: str

set_auth_cookies

set_auth_cookies(response, access, refresh, xsrf)

Set authentication cookies for access, refresh, and CSRF tokens.

Parameters:

Name Type Description Default
response Response

The outgoing FastAPI response object.

required
access str

Encoded access token (short-lived).

required
refresh str

Encoded refresh token (long-lived).

required
xsrf str

Cross-site request forgery token.

required
Source code in app/main.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def set_auth_cookies(response: Response, access: str, refresh: str, xsrf: str):
    """Set authentication cookies for access, refresh, and CSRF tokens.

    Args:
        response (Response): The outgoing FastAPI response object.
        access (str): Encoded access token (short-lived).
        refresh (str): Encoded refresh token (long-lived).
        xsrf (str): Cross-site request forgery token.
    """
    response.set_cookie("access_token", access, path="/", **COOKIE_KW)
    response.set_cookie(
        "refresh_token",
        refresh,
        path=f"{settings.API_PREFIX}/auth/refresh",
        **COOKIE_KW,
    )
    response.set_cookie(
        "XSRF-TOKEN",
        xsrf,
        path="/",
        httponly=False,
        samesite="lax",
        secure=settings.SECURE_COOKIES,
        domain=settings.COOKIE_DOMAIN,
    )

clear_auth_cookies

clear_auth_cookies(response)

Clear authentication cookies from the client.

Parameters:

Name Type Description Default
response Response

The outgoing FastAPI response object.

required
Source code in app/main.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def clear_auth_cookies(response: Response):
    """Clear authentication cookies from the client.

    Args:
        response (Response): The outgoing FastAPI response object.
    """
    response.delete_cookie(
        "access_token", path="/", domain=settings.COOKIE_DOMAIN
    )
    response.delete_cookie(
        "refresh_token",
        path=f"{settings.API_PREFIX}/auth/refresh",
        domain=settings.COOKIE_DOMAIN,
    )
    response.delete_cookie(
        "XSRF-TOKEN", path="/", domain=settings.COOKIE_DOMAIN
    )

current_user

current_user(request, db=DEP_GET_SESSION)

Get the currently authenticated user from cookies.

Parameters:

Name Type Description Default
request Request

Incoming FastAPI request.

required
db Session

Active SQLAlchemy session.

DEP_GET_SESSION

Returns:

Name Type Description
User User

The authenticated and active user.

Raises:

Type Description
HTTPException

If the user is not authenticated or inactive.

Source code in app/main.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def current_user(request: Request, db: Session = DEP_GET_SESSION) -> User:
    """Get the currently authenticated user from cookies.

    Args:
        request (Request): Incoming FastAPI request.
        db (Session): Active SQLAlchemy session.

    Returns:
        User: The authenticated and active user.

    Raises:
        HTTPException: If the user is not authenticated or inactive.
    """
    tok = request.cookies.get("access_token")
    if not tok:
        raise HTTPException(401, "Not authenticated")
    try:
        payload = decode_token(tok)
    except Exception as e:
        raise HTTPException(401, "Invalid token") from e
    sub = payload.get("sub")
    user = db.scalar(select(User).where(User.username == sub))
    if not user or not user.is_active:
        raise HTTPException(401, "Inactive user")
    request.state.roles = [r.name for r in user.roles]
    return user

require_roles

require_roles(*need)

Create a dependency that enforces required roles on a route.

Parameters:

Name Type Description Default
*need str

Role names that the caller must possess.

()

Returns:

Name Type Description
Callable

A dependency for injection into route dependencies=[...].

Raises:

Type Description
HTTPException

403 if the user lacks required roles.

Source code in app/main.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def require_roles(*need: str):
    """Create a dependency that enforces required roles on a route.

    Args:
        *need (str): Role names that the caller must possess.

    Returns:
        Callable: A dependency for injection into route `dependencies=[...]`.

    Raises:
        HTTPException: 403 if the user lacks required roles.
    """

    def dep(request: Request, _u: User = DEP_CURRENT_USER):
        have = set(getattr(request.state, "roles", []))
        if not set(need).issubset(have):
            raise HTTPException(403, "Forbidden")
        return _u

    return dep

require_csrf

require_csrf(request, u=DEP_CURRENT_USER)

Validate CSRF by comparing header and cookie and checking the signature.

Parameters:

Name Type Description Default
request Request

Incoming request carrying the header and cookie.

required
u User

Current authenticated user.

DEP_CURRENT_USER

Returns:

Name Type Description
User

The validated user (pass-through).

Raises:

Type Description
HTTPException

403 on CSRF failure.

Source code in app/main.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def require_csrf(request: Request, u: User = DEP_CURRENT_USER):
    """Validate CSRF by comparing header and cookie and checking the signature.

    Args:
        request (Request): Incoming request carrying the header and cookie.
        u (User): Current authenticated user.

    Returns:
        User: The validated user (pass-through).

    Raises:
        HTTPException: 403 on CSRF failure.
    """
    header = request.headers.get("x-csrf-token")
    cookie = request.cookies.get("XSRF-TOKEN")
    if (
        not header
        or not cookie
        or header != cookie
        or not verify_csrf(cookie, cast(str, u.username))
    ):
        raise HTTPException(403, "CSRF failed")
    return u

login

login(data, response, db=DEP_GET_SESSION)

Authenticate a user and set auth cookies.

Validates username/password, optionally verifies a time-based one-time code when two-factor is enabled, then issues access/refresh/CSRF cookies.

Parameters:

Name Type Description Default
data LoginIn

Login payload including username, password, and optional TOTP.

required
response Response

Outgoing response used to set cookies.

required
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict

Minimal result and the caller's username/roles.

Raises:

Type Description
HTTPException

400 for invalid credentials or TOTP; 401 if token decoding fails.

Source code in app/main.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@router.post("/auth/login")
def login(
    data: LoginIn, response: Response, db: Session = DEP_GET_SESSION
) -> dict:
    """Authenticate a user and set auth cookies.

    Validates username/password, optionally verifies a time-based one-time code
    when two-factor is enabled, then issues access/refresh/CSRF cookies.

    Args:
        data (LoginIn): Login payload including username, password, and optional TOTP.
        response (Response): Outgoing response used to set cookies.
        db (Session): Database session.

    Returns:
        dict: Minimal result and the caller's username/roles.

    Raises:
        HTTPException: 400 for invalid credentials or TOTP; 401 if token decoding fails.
    """

    user = db.scalar(
        select(User).where(User.username == data.username.strip())
    )

    if not user or not verify_password(
        data.password, cast(str, user.password_hash)
    ):
        raise HTTPException(400, "Invalid credentials")

    if getattr(user, "is_totp_enabled", False):
        if not data.totp_code:
            raise HTTPException(
                status_code=400,
                detail={
                    "message": "Two-factor required",
                    "error_code": "two_factor_required",
                },
            )
        if not verify_totp_code(
            cast(str, user.totp_secret or ""), data.totp_code
        ):
            raise HTTPException(
                status_code=400,
                detail={
                    "message": "Invalid two-factor code",
                    "error_code": "invalid_totp",
                },
            )
    roles = [r.name for r in user.roles]
    access = create_access_token(cast(str, user.username), roles)
    refresh = create_refresh_token(cast(str, user.username))
    xsrf = make_csrf(cast(str, user.username))
    set_auth_cookies(response, access, refresh, xsrf)
    return {
        "detail": "ok",
        "user": {"username": user.username, "roles": roles},
    }

register

register(payload, db=DEP_GET_SESSION)

Register a new user with username, email, and password.

Performs minimal validation and uniqueness checks, then stores a hashed password.

Parameters:

Name Type Description Default
payload RegisterIn

Registration data.

required
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict

Confirmation payload.

Raises:

Type Description
HTTPException

400 if fields are missing/short or username/email already exists.

Source code in app/main.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
@router.post("/auth/register")
def register(payload: RegisterIn, db: Session = DEP_GET_SESSION):
    """Register a new user with username, email, and password.

    Performs minimal validation and uniqueness checks, then stores a hashed
    password.

    Args:
        payload (RegisterIn): Registration data.
        db (Session): Database session.

    Returns:
        dict: Confirmation payload.

    Raises:
        HTTPException: 400 if fields are missing/short or username/email
                        already exists.
    """
    username = payload.username.strip()
    email = payload.email.strip()
    if not username or not email or not payload.password:
        raise HTTPException(status_code=400, detail="Missing fields")

    if len(payload.password) < 6:
        raise HTTPException(status_code=400, detail="Password too short")

    existing = db.scalar(select(User).where(User.username == username))

    if existing:
        raise HTTPException(status_code=400, detail="Username already exists")

    existing = db.scalar(select(User).where(User.email == email))

    if existing:
        raise HTTPException(status_code=400, detail="Email already exists")

    user = User(
        username=username,
        email=email,
        password_hash=hash_password(payload.password),
    )
    db.add(user)
    db.commit()
    return {"detail": "created"}

totp_setup

totp_setup(u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Create (if missing) a TOTP secret and return a provisioning URI.

The frontend should render the URI as a QR code for an authenticator app.

Parameters:

Name Type Description Default
u User

Current authenticated user (injected).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
TotpSetupOut

Provisioning URI encoded with issuer and account name.

Source code in app/main.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
@router.post("/auth/totp/setup", response_model=TotpSetupOut)
def totp_setup(u: User = DEP_CURRENT_USER, db: Session = DEP_GET_SESSION):
    """Create (if missing) a TOTP secret and return a provisioning URI.

    The frontend should render the URI as a QR code for an authenticator app.

    Args:
        u (User): Current authenticated user (injected).
        db (Session): Database session.

    Returns:
        TotpSetupOut: Provisioning URI encoded with issuer and account name.
    """
    if not getattr(u, "totp_secret", None):
        u.totp_secret = generate_totp_secret()

    db.add(u)
    db.commit()
    issuer = getattr(settings, "PROJECT_NAME", "Quill")
    uri = totp_provisioning_uri(
        cast(str, u.totp_secret or ""),
        cast(str, u.username),
        issuer=issuer,
    )
    return {"provision_uri": uri}

totp_verify

totp_verify(payload, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Verify a TOTP code and enable two-factor authentication for the user.

Parameters:

Name Type Description Default
payload TotpVerifyIn

The six-digit code from the authenticator app.

required
u User

Current authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict

Confirmation payload.

Raises:

Type Description
HTTPException

400 if no secret exists or code is invalid.

Source code in app/main.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
@router.post("/auth/totp/verify")
def totp_verify(
    payload: TotpVerifyIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
):
    """Verify a TOTP code and enable two-factor authentication for the user.

    Args:
        payload (TotpVerifyIn): The six-digit code from the authenticator app.
        u (User): Current authenticated user.
        db (Session): Database session.

    Returns:
        dict: Confirmation payload.

    Raises:
        HTTPException: 400 if no secret exists or code is invalid.
    """
    if not getattr(u, "totp_secret", None):
        raise HTTPException(
            status_code=400,
            detail={
                "message": "No TOTP secret set",
                "error_code": "no_totp_secret",
            },
        )
    if not verify_totp_code(
        cast(str, u.totp_secret or ""),
        payload.code,
    ):
        raise HTTPException(
            status_code=400,
            detail={"message": "Invalid code", "error_code": "invalid_totp"},
        )
    u.is_totp_enabled = True
    db.add(u)
    db.commit()
    return {"detail": "enabled"}

totp_disable

totp_disable(u=DEP_REQUIRE_CSRF, db=DEP_GET_SESSION)

Disable TOTP for the current user.

CSRF protection is required.

Parameters:

Name Type Description Default
u User

Current authenticated user (CSRF-checked).

DEP_REQUIRE_CSRF
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict

Confirmation payload.

Source code in app/main.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
@router.post("/auth/totp/disable")
def totp_disable(u: User = DEP_REQUIRE_CSRF, db: Session = DEP_GET_SESSION):
    """Disable TOTP for the current user.

    CSRF protection is required.

    Args:
        u (User): Current authenticated user (CSRF-checked).
        db (Session): Database session.

    Returns:
        dict: Confirmation payload.
    """
    u.is_totp_enabled = False
    u.totp_secret = None
    db.add(u)
    db.commit()
    return {"detail": "disabled"}

logout

logout(response, _u=DEP_CURRENT_USER)

Log out the current user by clearing auth cookies.

Parameters:

Name Type Description Default
response Response

Outgoing response used to clear cookies.

required
_u User

Current authenticated user (unused; enforces auth).

DEP_CURRENT_USER

Returns:

Name Type Description
dict

Confirmation payload.

Source code in app/main.py
418
419
420
421
422
423
424
425
426
427
428
429
430
@router.post("/auth/logout")
def logout(response: Response, _u: User = DEP_CURRENT_USER):
    """Log out the current user by clearing auth cookies.

    Args:
        response (Response): Outgoing response used to clear cookies.
        _u (User): Current authenticated user (unused; enforces auth).

    Returns:
        dict: Confirmation payload.
    """
    clear_auth_cookies(response)
    return {"detail": "ok"}

me

me(u=DEP_CURRENT_USER)

Return a minimal profile for the current user.

Parameters:

Name Type Description Default
u User

Current authenticated user.

DEP_CURRENT_USER

Returns:

Name Type Description
dict

Basic identity fields and role names.

Source code in app/main.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
@router.get("/auth/me")
def me(u: User = DEP_CURRENT_USER):
    """Return a minimal profile for the current user.

    Args:
        u (User): Current authenticated user.

    Returns:
        dict: Basic identity fields and role names.
    """
    return {
        "id": u.id,
        "username": u.username,
        "email": u.email,
        "roles": [r.name for r in u.roles],
    }

refresh

refresh(response, request, db=DEP_GET_SESSION)

Rotate refresh token and issue a new access token.

Reads the refresh token cookie, validates it, then sets new access/refresh/CSRF cookies.

Parameters:

Name Type Description Default
response Response

Outgoing response used to set cookies.

required
request Request

Incoming request (reads refresh cookie).

required
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict

Confirmation payload.

Raises:

Type Description
HTTPException

401 if the refresh token is missing/invalid or user is inactive.

Source code in app/main.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
@router.post("/auth/refresh")
def refresh(
    response: Response, request: Request, db: Session = DEP_GET_SESSION
):
    """Rotate refresh token and issue a new access token.

    Reads the refresh token cookie, validates it, then sets new access/refresh/CSRF cookies.

    Args:
        response (Response): Outgoing response used to set cookies.
        request (Request): Incoming request (reads refresh cookie).
        db (Session): Database session.

    Returns:
        dict: Confirmation payload.

    Raises:
        HTTPException: 401 if the refresh token is missing/invalid or user is inactive.
    """
    tok = request.cookies.get("refresh_token")
    if not tok:
        raise HTTPException(401, "No refresh token")
    try:
        payload = decode_token(tok)
        if payload.get("type") != "refresh":
            raise ValueError("not refresh")
    except Exception as e:
        raise HTTPException(401, "Bad refresh token") from e
    sub = payload.get("sub")
    user = db.scalar(select(User).where(User.username == sub))
    if not user or not user.is_active:
        raise HTTPException(401, "Inactive user")
    roles = [r.name for r in user.roles]
    new_access = create_access_token(cast(str, user.username), roles)
    new_refresh = create_refresh_token(cast(str, user.username))  # rotate
    xsrf = make_csrf(cast(str, user.username))
    set_auth_cookies(response, new_access, new_refresh, xsrf)
    return {"detail": "refreshed"}

create_patient_repo

create_patient_repo(payload)

Create a new patient repository and seed it with a README.

Parameters:

Name Type Description Default
payload PatientCreate

Patient identifier and any setup metadata.

required

Returns:

Name Type Description
dict

Repo name and initialisation status.

Raises:

Type Description
HTTPException

500 on storage/write errors.

Source code in app/main.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
@router.post(
    "/patients",
    dependencies=[DEP_REQUIRE_ROLES_CLINICIAN, DEP_REQUIRE_CSRF],
)
def create_patient_repo(payload: PatientCreate):
    """Create a new patient repository and seed it with a README.

    Args:
        payload (PatientCreate): Patient identifier and any setup metadata.

    Returns:
        dict: Repo name and initialisation status.

    Raises:
        HTTPException: 500 on storage/write errors.
    """
    repo = patient_repo_name(payload.patient_id)
    try:
        ensure_repo_exists(repo, private=True)
        ts = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ")
        readme = (
            f"# Patient Repository: {repo}\n\n"
            "Folders:\n"
            "- `demographics/` – non-clinical demographics JSON\n"
            "- `letters/` – correspondence in Markdown/PDF\n\n"
            f"*Initialized {ts} UTC*\n"
        )
        write_file(
            repo,
            "README.md",
            readme,
            "chore: add patient README",
        )
        return {"repo": repo, "initialized": True}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

list_patients

list_patients(u=DEP_CURRENT_USER)

List patient repositories and any stored demographics.

Parameters:

Name Type Description Default
u User

Current authenticated user.

DEP_CURRENT_USER

Returns:

Name Type Description
dict

Array of patient repo names with optional demographics.

Source code in app/main.py
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
@router.get("/patients")
def list_patients(u: User = DEP_CURRENT_USER):
    """List patient repositories and any stored demographics.

    Args:
        u (User): Current authenticated user.

    Returns:
        dict: Array of patient repo names with optional demographics.
    """
    try:
        patients = []
        if PATIENT_DATA_ROOT.exists():
            for entry in sorted(PATIENT_DATA_ROOT.iterdir()):
                if entry.is_dir() and entry.name.startswith("patient-"):
                    demo_content = read_file(
                        entry.name, "demographics/profile.json"
                    )
                    demo = None
                    if demo_content:
                        try:
                            demo = json.loads(demo_content)
                        except Exception:
                            demo = {"raw": demo_content}
                    patients.append({"repo": entry.name, "demographics": demo})
        return {"patients": patients}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

upsert_demographics

upsert_demographics(patient_id, demographics)

Create or update demographics for a patient.

Parameters:

Name Type Description Default
patient_id str

Patient identifier.

required
demographics Demographics

Structured demographics payload.

required

Returns:

Name Type Description
dict

Repo name and stored path.

Raises:

Type Description
HTTPException

500 on write errors.

Source code in app/main.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
@router.put(
    "/patients/{patient_id}/demographics",
    dependencies=[DEP_REQUIRE_ROLES_CLINICIAN, DEP_REQUIRE_CSRF],
)
def upsert_demographics(patient_id: str, demographics: Demographics):
    """Create or update demographics for a patient.

    Args:
        patient_id (str): Patient identifier.
        demographics (Demographics): Structured demographics payload.

    Returns:
        dict: Repo name and stored path.

    Raises:
        HTTPException: 500 on write errors.
    """
    repo = patient_repo_name(patient_id)
    try:
        ensure_repo_exists(repo)
        ts = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ")
        path = "demographics/profile.json"
        content = json.dumps(
            {"updated_at_utc": ts, **demographics.model_dump()}, indent=2
        )
        write_file(repo, path, content, "feat: upsert demographics")
        return {"repo": repo, "path": path}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

get_demographics

get_demographics(patient_id, u=DEP_CURRENT_USER)

Fetch demographics JSON for a given patient.

Parameters:

Name Type Description Default
patient_id str

Patient identifier.

required
u User

Current authenticated user.

DEP_CURRENT_USER

Returns:

Name Type Description
dict

Repo name and raw demographics content.

Raises:

Type Description
HTTPException

404 if not found; 500 on read errors.

Source code in app/main.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@router.get("/patients/{patient_id}/demographics")
def get_demographics(patient_id: str, u: User = DEP_CURRENT_USER):
    """Fetch demographics JSON for a given patient.

    Args:
        patient_id (str): Patient identifier.
        u (User): Current authenticated user.

    Returns:
        dict: Repo name and raw demographics content.

    Raises:
        HTTPException: 404 if not found; 500 on read errors.
    """
    repo = patient_repo_name(patient_id)
    try:
        content = read_file(repo, "demographics/profile.json")
        if content is None:
            raise HTTPException(
                status_code=404, detail="No demographics found"
            )
        return {"repo": repo, "content": content}
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

write_letter

write_letter(patient_id, letter)

Write and store a new letter in the patient’s repository.

Parameters:

Name Type Description Default
patient_id str

Patient identifier.

required
letter LetterIn

Letter metadata and markdown body.

required

Returns:

Name Type Description
dict

Repo and stored file path.

Raises:

Type Description
HTTPException

500 on write errors.

Source code in app/main.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
@router.post(
    "/patients/{patient_id}/letters",
    dependencies=[DEP_REQUIRE_ROLES_CLINICIAN, DEP_REQUIRE_CSRF],
)
def write_letter(patient_id: str, letter: LetterIn):
    """Write and store a new letter in the patient’s repository.

    Args:
        patient_id (str): Patient identifier.
        letter (LetterIn): Letter metadata and markdown body.

    Returns:
        dict: Repo and stored file path.

    Raises:
        HTTPException: 500 on write errors.
    """
    repo = patient_repo_name(patient_id)
    ts = datetime.now(UTC).strftime("%Y-%m-%dT%H%M%SZ")
    slug = "".join(
        ch if (ch.isalnum() or ch in "-_") else "-"
        for ch in letter.title.lower()
    ).strip("-")
    filename = f"letters/{ts}-{slug or 'letter'}.md"
    md = f"# {letter.title}\n\n{letter.body}\n\n*Written at {ts} UTC*"
    try:
        ensure_repo_exists(repo)
        write_file(
            repo,
            filename,
            md,
            f"feat: add letter '{letter.title}'",
            letter.author_name,
            letter.author_email,
        )
        return {"repo": repo, "path": filename}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

read_letter

read_letter(patient_id, name, u=DEP_CURRENT_USER)

Read a specific letter file from the patient’s repository.

Parameters:

Name Type Description Default
patient_id str

Patient identifier.

required
name str

Letter filename (with or without .md).

required
u User

Current authenticated user.

DEP_CURRENT_USER

Returns:

Name Type Description
dict

Repo, resolved file path, and markdown content.

Raises:

Type Description
HTTPException

404 if not found; 500 on read errors.

Source code in app/main.py
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
@router.get("/patients/{patient_id}/letters/{name}")
def read_letter(patient_id: str, name: str, u: User = DEP_CURRENT_USER):
    """Read a specific letter file from the patient’s repository.

    Args:
        patient_id (str): Patient identifier.
        name (str): Letter filename (with or without .md).
        u (User): Current authenticated user.

    Returns:
        dict: Repo, resolved file path, and markdown content.

    Raises:
        HTTPException: 404 if not found; 500 on read errors.
    """
    repo = patient_repo_name(patient_id)
    path = (
        f"letters/{name}.md" if not name.endswith(".md") else f"letters/{name}"
    )
    try:
        content = read_file(repo, path)
        if content is None:
            raise HTTPException(status_code=404, detail="Letter not found")
        return {"repo": repo, "path": path, "content": content}
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e