Skip to content

Main Application

FastAPI Application Entry Point.

This module is the main entry point for the Quill Medical REST API. It configures the FastAPI application, authentication middleware, and all API routes.

Key Features: - JWT-based authentication with HTTP-only cookies - TOTP two-factor authentication support - Role-based access control (RBAC) - CSRF protection for state-changing operations - Integration with FHIR (patient demographics) and OpenEHR (clinical letters) - Push notification support via Web Push protocol

All API endpoints are exposed under the /api prefix. Development mode enables Swagger UI at /api/docs and ReDoc at /api/redoc for interactive API exploration.

Architecture: - Auth database: User accounts and roles (PostgreSQL via SQLAlchemy) - FHIR server: Patient demographics (HAPI FHIR) - EHRbase: Clinical documents and letters (OpenEHR) - Push notifications: In-memory subscriptions (production should use database)

AdminUserCreateIn

Bases: BaseModel

Admin User Creation Request.

Request model for administrators to create new users with full CBAC settings. Requires admin or superadmin system permissions.

Attributes:

Name Type Description
name str

User's full name (stored as username if username not provided).

username str | None

Unique username for login.

email str

Unique email address.

password str

Plain text password (will be hashed).

base_profession str

Base profession template (e.g., "consultant", "patient").

additional_competencies list[str]

Extra competencies beyond base profession.

removed_competencies list[str]

Competencies to remove from base profession.

system_permissions str

System permission level (patient, staff, admin, superadmin).

Source code in app/main.py
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
class AdminUserCreateIn(BaseModel):
    """Admin User Creation Request.

    Request model for administrators to create new users with full CBAC settings.
    Requires admin or superadmin system permissions.

    Attributes:
        name: User's full name (stored as username if username not provided).
        username: Unique username for login.
        email: Unique email address.
        password: Plain text password (will be hashed).
        base_profession: Base profession template (e.g., "consultant", "patient").
        additional_competencies: Extra competencies beyond base profession.
        removed_competencies: Competencies to remove from base profession.
        system_permissions: System permission level (patient, staff, admin, superadmin).
    """

    name: str
    username: str | None = None
    email: str
    password: str
    base_profession: str = "patient"
    additional_competencies: list[str] = []
    removed_competencies: list[str] = []
    system_permissions: str = "patient"

AdminUserUpdateIn

Bases: BaseModel

Admin User Update Input Schema.

Pydantic model for updating existing user accounts via admin endpoints. All fields are optional - only provided fields will be updated.

Attributes:

Name Type Description
name str | None

Full name (optional).

username str | None

Unique username (optional).

email str | None

Email address (optional).

password str | None

New password (optional, only if changing).

base_profession str | None

Base profession ID (optional).

additional_competencies list[str] | None

Competencies to add (optional).

removed_competencies list[str] | None

Competencies to remove (optional).

system_permissions str | None

System permission level (optional).

Source code in app/main.py
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
class AdminUserUpdateIn(BaseModel):
    """Admin User Update Input Schema.

    Pydantic model for updating existing user accounts via admin endpoints.
    All fields are optional - only provided fields will be updated.

    Attributes:
        name: Full name (optional).
        username: Unique username (optional).
        email: Email address (optional).
        password: New password (optional, only if changing).
        base_profession: Base profession ID (optional).
        additional_competencies: Competencies to add (optional).
        removed_competencies: Competencies to remove (optional).
        system_permissions: System permission level (optional).
    """

    name: str | None = None
    username: str | None = None
    email: str | None = None
    password: str | None = None
    base_profession: str | None = None
    additional_competencies: list[str] | None = None
    removed_competencies: list[str] | None = None
    system_permissions: str | None = None

TotpSetupOut

Bases: BaseModel

TOTP Setup Response.

Response model for TOTP setup endpoint containing the otpauth:// URI for QR code generation. This URI can be rendered as a QR code by the frontend for scanning with authenticator apps.

Attributes:

Name Type Description
provision_uri str

otpauth://totp/... URI for authenticator app setup.

Source code in app/main.py
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
class TotpSetupOut(BaseModel):
    """TOTP Setup Response.

    Response model for TOTP setup endpoint containing the otpauth:// URI
    for QR code generation. This URI can be rendered as a QR code by the
    frontend for scanning with authenticator apps.

    Attributes:
        provision_uri: otpauth://totp/... URI for authenticator app setup.
    """

    provision_uri: str

TotpVerifyIn

Bases: BaseModel

TOTP Verification Request.

Request model for verifying a TOTP code during two-factor authentication setup. After scanning the QR code, users enter the 6-digit code from their authenticator app to prove they can generate valid codes.

Attributes:

Name Type Description
code str

6-digit numeric code from authenticator app.

Source code in app/main.py
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
class TotpVerifyIn(BaseModel):
    """TOTP Verification Request.

    Request model for verifying a TOTP code during two-factor authentication
    setup. After scanning the QR code, users enter the 6-digit code from their
    authenticator app to prove they can generate valid codes.

    Attributes:
        code: 6-digit numeric code from authenticator app.
    """

    code: str

FHIRPatientCreateIn

Bases: BaseModel

FHIR Patient Creation Request.

Request model for creating a new FHIR Patient resource with demographics. Optional patient_id allows specifying a custom FHIR resource ID instead of auto-generated ID.

Attributes:

Name Type Description
given_name str

Patient's first/given name.

family_name str

Patient's surname/family name.

birth_date str | None

Patient's date of birth (YYYY-MM-DD format).

gender str | None

Patient's gender (male, female, other, unknown).

nhs_number str | None

NHS number (10 digits, UK national identifier).

mrn str | None

Medical Record Number (local hospital identifier).

patient_id str | None

Optional custom FHIR resource ID.

Source code in app/main.py
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
class FHIRPatientCreateIn(BaseModel):
    """FHIR Patient Creation Request.

    Request model for creating a new FHIR Patient resource with demographics.
    Optional patient_id allows specifying a custom FHIR resource ID instead
    of auto-generated ID.

    Attributes:
        given_name: Patient's first/given name.
        family_name: Patient's surname/family name.
        birth_date: Patient's date of birth (YYYY-MM-DD format).
        gender: Patient's gender (male, female, other, unknown).
        nhs_number: NHS number (10 digits, UK national identifier).
        mrn: Medical Record Number (local hospital identifier).
        patient_id: Optional custom FHIR resource ID.
    """

    given_name: str
    family_name: str
    birth_date: str | None = None
    gender: str | None = None
    nhs_number: str | None = None
    mrn: str | None = None
    patient_id: str | None = None

CreateOrganizationIn

Bases: BaseModel

Request body for creating a new organisation.

Source code in app/main.py
2556
2557
2558
2559
2560
2561
2562
2563
class CreateOrganizationIn(BaseModel):
    """Request body for creating a new organisation."""

    name: str
    type: str
    location: str | None = None

    model_config = {"extra": "forbid"}

UpdateOrganizationIn

Bases: BaseModel

Request body for updating an organisation.

Source code in app/main.py
2566
2567
2568
2569
2570
2571
2572
2573
class UpdateOrganizationIn(BaseModel):
    """Request body for updating an organisation."""

    name: str | None = None
    type: str | None = None
    location: str | None = None

    model_config = {"extra": "forbid"}

AddStaffIn

Bases: BaseModel

Request body for adding a staff member to an organisation.

Source code in app/main.py
2707
2708
2709
2710
2711
2712
class AddStaffIn(BaseModel):
    """Request body for adding a staff member to an organisation."""

    user_id: int

    model_config = {"extra": "forbid"}

AddPatientIn

Bases: BaseModel

Request body for adding a patient to an organisation.

Source code in app/main.py
2799
2800
2801
2802
2803
2804
class AddPatientIn(BaseModel):
    """Request body for adding a patient to an organisation."""

    patient_id: str

    model_config = {"extra": "forbid"}

log_requests async

log_requests(request, call_next)

Log every request with timing, method, path, and status.

Source code in app/main.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@app.middleware("http")
async def log_requests(
    request: Request,
    call_next: Callable,  # type: ignore[type-arg]
) -> Response:
    """Log every request with timing, method, path, and status."""
    request_id = uuid4().hex[:12]
    start = time.monotonic()
    response: Response = await call_next(request)
    elapsed_ms = round((time.monotonic() - start) * 1000, 1)

    logger.info(
        "%s %s %s %.1fms",
        request.method,
        request.url.path,
        response.status_code,
        elapsed_ms,
        extra={
            "request_id": request_id,
            "method": request.method,
            "path": request.url.path,
            "status": response.status_code,
            "duration_ms": elapsed_ms,
            "client_ip": request.client.host if request.client else None,
        },
    )
    return response

require_clinical_services

require_clinical_services()

FastAPI dependency: raises 503 when FHIR/EHRbase are disabled.

Source code in app/main.py
219
220
221
222
223
224
225
def require_clinical_services() -> None:
    """FastAPI dependency: raises 503 when FHIR/EHRbase are disabled."""
    if not settings.CLINICAL_SERVICES_ENABLED:
        raise HTTPException(
            status_code=503,
            detail="Clinical services are not available in this deployment",
        )

check_fhir_health

check_fhir_health()

Check if FHIR server is available and ready to serve data.

Tests actual patient data access rather than just metadata endpoint, since HAPI FHIR can return metadata before it's ready to serve resources.

Safety-critical: This determines whether frontend shows "Database initialising" vs "No patients to show". False positive could cause clinical staff to think database is empty when it's still loading.

Returns:

Type Description
dict[str, bool | int | str]

dict with 'available' boolean and optional 'error' message

Source code in app/main.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def check_fhir_health() -> dict[str, bool | int | str]:
    """Check if FHIR server is available and ready to serve data.

    Tests actual patient data access rather than just metadata endpoint,
    since HAPI FHIR can return metadata before it's ready to serve resources.

    Safety-critical: This determines whether frontend shows "Database initialising"
    vs "No patients to show". False positive could cause clinical staff to think
    database is empty when it's still loading.

    Returns:
        dict with 'available' boolean and optional 'error' message
    """
    if not settings.CLINICAL_SERVICES_ENABLED:
        return {"available": False, "error": "Clinical services disabled"}
    try:
        # Test actual data access, not just metadata
        # This ensures database is ready and indexes are loaded
        response = httpx.get(
            f"{settings.FHIR_SERVER_URL}/Patient?_count=1", timeout=5.0
        )
        # 200 = success (even if 0 patients), means FHIR is truly ready
        # Other codes mean FHIR still loading or has errors
        return {
            "available": response.status_code == 200,
            "status_code": response.status_code,
        }
    except Exception as e:
        return {"available": False, "error": str(e)}

check_ehrbase_health

check_ehrbase_health()

Check if EHRbase server is available.

Returns:

Type Description
dict[str, bool | int | str]

dict with 'available' boolean and optional 'error' message

Source code in app/main.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def check_ehrbase_health() -> dict[str, bool | int | str]:
    """Check if EHRbase server is available.

    Returns:
        dict with 'available' boolean and optional 'error' message
    """
    if not settings.CLINICAL_SERVICES_ENABLED:
        return {"available": False, "error": "Clinical services disabled"}
    try:
        # Use get_secret_value() to extract the actual password string
        api_user = settings.EHRBASE_API_USER
        api_password: str = settings.EHRBASE_API_PASSWORD.get_secret_value()

        response = httpx.get(
            f"{settings.EHRBASE_URL}/rest/openehr/v1/definition/template/adl1.4",
            timeout=5.0,
            auth=(api_user, api_password),
        )
        return {
            "available": response.status_code
            in (200, 404),  # 404 = no templates but server works
            "status_code": response.status_code,
        }
    except Exception as e:
        return {"available": False, "error": str(e)}

startup_event async

startup_event()

Check service availability on startup.

Source code in app/main.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
@app.on_event("startup")
async def startup_event() -> None:
    """Check service availability on startup."""
    print("\n" + "=" * 60)
    print("Quill Medical Backend Starting...")
    print("=" * 60)

    if settings.CLINICAL_SERVICES_ENABLED:
        fhir_status = check_fhir_health()
        if fhir_status["available"]:
            print("✓ FHIR server is available")
        else:
            print(
                f"✗ WARNING: FHIR server not available - {fhir_status.get('error', 'Unknown error')}"
            )
            print("  Patient operations will fail until FHIR server is ready")

        ehrbase_status = check_ehrbase_health()
        if ehrbase_status["available"]:
            print("✓ EHRbase server is available")
        else:
            print(
                f"✗ WARNING: EHRbase not available - {ehrbase_status.get('error', 'Unknown error')}"
            )
            print(
                "  Clinical letter operations will fail until EHRbase is ready"
            )
    else:
        print("- Clinical services disabled (FHIR/EHRbase skipped)")

    print("=" * 60 + "\n")

set_auth_cookies

set_auth_cookies(response, access, refresh, xsrf)

Set Authentication Cookies.

Sets three HTTP cookies for authentication: access token (short-lived), refresh token (long-lived), and CSRF token (for state-changing operations). Access and refresh tokens are HTTP-only for security. CSRF token is readable by JavaScript so it can be included in request headers.

Cookie Configuration: - access_token: Path=/, HttpOnly, SameSite=Lax, TTL=15min - refresh_token: Path=/api/auth/refresh, HttpOnly, SameSite=Lax, TTL=7days - XSRF-TOKEN: Path=/, SameSite=Lax (not HttpOnly), TTL matches access token

Parameters:

Name Type Description Default
response Response

FastAPI response object to set cookies on.

required
access str

Encoded JWT access token.

required
refresh str

Encoded JWT refresh token.

required
xsrf str

CSRF protection token.

required
Source code in app/main.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def set_auth_cookies(
    response: Response, access: str, refresh: str, xsrf: str
) -> None:
    """Set Authentication Cookies.

    Sets three HTTP cookies for authentication: access token (short-lived),
    refresh token (long-lived), and CSRF token (for state-changing operations).
    Access and refresh tokens are HTTP-only for security. CSRF token is readable
    by JavaScript so it can be included in request headers.

    Cookie Configuration:
    - access_token: Path=/, HttpOnly, SameSite=Lax, TTL=15min
    - refresh_token: Path=/api/auth/refresh, HttpOnly, SameSite=Lax, TTL=7days
    - XSRF-TOKEN: Path=/, SameSite=Lax (not HttpOnly), TTL matches access token

    Args:
        response: FastAPI response object to set cookies on.
        access: Encoded JWT access token.
        refresh: Encoded JWT refresh token.
        xsrf: CSRF protection token.
    """
    response.set_cookie("access_token", access, path="/", **COOKIE_KW)
    response.set_cookie(
        "refresh_token",
        refresh,
        path=f"{settings.API_PREFIX}/auth/refresh",
        **COOKIE_KW,
    )
    response.set_cookie(
        "XSRF-TOKEN",
        xsrf,
        path="/",
        httponly=False,
        samesite="lax",
        secure=settings.SECURE_COOKIES,
        domain=settings.COOKIE_DOMAIN,
    )

clear_auth_cookies

clear_auth_cookies(response)

Clear Authentication Cookies.

Removes all authentication cookies from the client browser by setting them with expired dates. Used during logout to end the user's session. Deletes access_token, refresh_token, and XSRF-TOKEN cookies.

Parameters:

Name Type Description Default
response Response

FastAPI response object to clear cookies from.

required
Source code in app/main.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def clear_auth_cookies(response: Response) -> None:
    """Clear Authentication Cookies.

    Removes all authentication cookies from the client browser by setting
    them with expired dates. Used during logout to end the user's session.
    Deletes access_token, refresh_token, and XSRF-TOKEN cookies.

    Args:
        response: FastAPI response object to clear cookies from.
    """
    response.delete_cookie(
        "access_token", path="/", domain=settings.COOKIE_DOMAIN
    )
    response.delete_cookie(
        "refresh_token",
        path=f"{settings.API_PREFIX}/auth/refresh",
        domain=settings.COOKIE_DOMAIN,
    )
    response.delete_cookie(
        "XSRF-TOKEN", path="/", domain=settings.COOKIE_DOMAIN
    )

health_check

health_check()

Health Check Endpoint.

Checks availability of all required services (FHIR, EHRbase). Returns overall status and detailed service availability.

Returns:

Name Type Description
dict dict[str, Any]

Health status with service availability details

Source code in app/main.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
@router.get("/health")
def health_check() -> dict[str, Any]:
    """Health Check Endpoint.

    Checks availability of all required services (FHIR, EHRbase).
    Returns overall status and detailed service availability.

    Returns:
        dict: Health status with service availability details
    """
    services: dict[str, dict[str, bool | int | str]] = {
        "auth_db": {
            "available": True
        },  # If we can respond, auth DB is working
    }

    # Only check FHIR/EHRbase when clinical services are enabled
    if settings.CLINICAL_SERVICES_ENABLED:
        services["fhir"] = check_fhir_health()
        services["ehrbase"] = check_ehrbase_health()
    else:
        services["fhir"] = {
            "available": False,
            "error": "Not provisioned",
        }
        services["ehrbase"] = {
            "available": False,
            "error": "Not provisioned",
        }

    all_healthy = all(s.get("available", False) for s in services.values())

    return {
        "status": "healthy" if all_healthy else "degraded",
        "services": services,
    }

current_user

current_user(request, db=DEP_GET_SESSION)

Get Currently Authenticated User.

FastAPI dependency that extracts and validates the JWT access token from cookies, then loads the corresponding user from the database. The user's roles are stored in request.state for authorization checks.

Token Validation: - Checks for access_token cookie presence - Verifies JWT signature and expiration - Loads user from database by username - Verifies user account is active

Parameters:

Name Type Description Default
request Request

Incoming FastAPI request with cookies.

required
db Session

Active SQLAlchemy database session.

DEP_GET_SESSION

Returns:

Name Type Description
User User

The authenticated and active user with roles loaded.

Raises:

Type Description
HTTPException

401 if token missing, invalid, expired, or user inactive.

Source code in app/main.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def current_user(request: Request, db: Session = DEP_GET_SESSION) -> User:
    """Get Currently Authenticated User.

    FastAPI dependency that extracts and validates the JWT access token from
    cookies, then loads the corresponding user from the database. The user's
    roles are stored in request.state for authorization checks.

    Token Validation:
    - Checks for access_token cookie presence
    - Verifies JWT signature and expiration
    - Loads user from database by username
    - Verifies user account is active

    Args:
        request: Incoming FastAPI request with cookies.
        db: Active SQLAlchemy database session.

    Returns:
        User: The authenticated and active user with roles loaded.

    Raises:
        HTTPException: 401 if token missing, invalid, expired, or user inactive.
    """
    tok = request.cookies.get("access_token")
    if not tok:
        raise HTTPException(401, "Not authenticated")
    try:
        payload = decode_token(tok)
    except Exception as e:
        raise HTTPException(401, "Invalid token") from e
    sub = payload.get("sub")
    user = db.scalar(select(User).where(User.username == sub))
    if not user or not user.is_active:
        raise HTTPException(401, "Inactive user")
    request.state.roles = [r.name for r in user.roles]
    return user

require_roles

require_roles(*need)

Create Role Authorization Dependency.

Factory function that creates a FastAPI dependency to enforce role-based access control. The returned dependency checks if the authenticated user possesses all required roles. Used in route decorators to protect endpoints.

Usage Example

@router.get("/patients", dependencies=[require_roles("Clinician")]) def list_patients(): ...

Parameters:

Name Type Description Default
*need str

One or more role names required (e.g., "Clinician", "Administrator").

()

Returns:

Name Type Description
Callable Callable[[Request, User], User]

FastAPI dependency function that validates roles.

Raises:

Type Description
HTTPException

403 Forbidden if user lacks any required role.

Source code in app/main.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def require_roles(*need: str) -> Callable[[Request, User], User]:
    """Create Role Authorization Dependency.

    Factory function that creates a FastAPI dependency to enforce role-based
    access control. The returned dependency checks if the authenticated user
    possesses all required roles. Used in route decorators to protect endpoints.

    Usage Example:
        @router.get("/patients", dependencies=[require_roles("Clinician")])
        def list_patients(): ...

    Args:
        *need: One or more role names required (e.g., "Clinician", "Administrator").

    Returns:
        Callable: FastAPI dependency function that validates roles.

    Raises:
        HTTPException: 403 Forbidden if user lacks any required role.
    """

    def dep(request: Request, _u: User = DEP_CURRENT_USER) -> User:
        have = set(getattr(request.state, "roles", []))
        if not set(need).issubset(have):
            raise HTTPException(403, "Forbidden")
        return _u

    return dep

require_csrf

require_csrf(request, u=DEP_CURRENT_USER)

Validate CSRF Token.

FastAPI dependency that validates CSRF tokens to protect against cross-site request forgery attacks. Compares the X-CSRF-Token header with the XSRF-TOKEN cookie, verifies they match, and validates the signature against the user's identity. Required for all state-changing operations (POST/PUT/PATCH/DELETE).

CSRF Protection Flow: 1. Extract X-CSRF-Token header and XSRF-TOKEN cookie 2. Verify both exist and match exactly 3. Verify signature is valid for authenticated user 4. Return user if validation passes

Parameters:

Name Type Description Default
request Request

Incoming request with headers and cookies.

required
u User

Current authenticated user from JWT.

DEP_CURRENT_USER

Returns:

Name Type Description
User User

The validated user (pass-through for chaining).

Raises:

Type Description
HTTPException

403 Forbidden if CSRF validation fails.

Source code in app/main.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def require_csrf(request: Request, u: User = DEP_CURRENT_USER) -> User:
    """Validate CSRF Token.

    FastAPI dependency that validates CSRF tokens to protect against cross-site
    request forgery attacks. Compares the X-CSRF-Token header with the XSRF-TOKEN
    cookie, verifies they match, and validates the signature against the user's
    identity. Required for all state-changing operations (POST/PUT/PATCH/DELETE).

    CSRF Protection Flow:
    1. Extract X-CSRF-Token header and XSRF-TOKEN cookie
    2. Verify both exist and match exactly
    3. Verify signature is valid for authenticated user
    4. Return user if validation passes

    Args:
        request: Incoming request with headers and cookies.
        u: Current authenticated user from JWT.

    Returns:
        User: The validated user (pass-through for chaining).

    Raises:
        HTTPException: 403 Forbidden if CSRF validation fails.
    """
    header = request.headers.get("x-csrf-token")
    cookie = request.cookies.get("XSRF-TOKEN")
    if (
        not header
        or not cookie
        or header != cookie
        or not verify_csrf(cookie, u.username)
    ):
        raise HTTPException(403, "CSRF failed")
    return u

login

login(request, data, response, db=DEP_GET_SESSION)

User Login with Optional TOTP.

Authenticates a user with username and password, optionally verifying a 6-digit TOTP code if two-factor authentication is enabled. On successful authentication, sets HTTP-only cookies for access token, refresh token, and CSRF token. The access token contains the user's roles for authorization.

Authentication Flow: 1. Verify username exists in database 2. Verify password hash matches using Argon2 3. If TOTP enabled, verify 6-digit code from authenticator app 4. Generate access token (15min), refresh token (7d), CSRF token 5. Set HTTP-only cookies with tokens 6. Return success with username and roles

Parameters:

Name Type Description Default
data LoginIn

Login credentials (username, password, optional totp_code).

required
response Response

FastAPI response object for setting cookies.

required
db Session

Database session for user lookup.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Success response with keys: - detail: "ok" - user: {username: str, roles: list[str]}

Raises:

Type Description
HTTPException

400 if credentials invalid, 2FA required, or TOTP code invalid.

Source code in app/main.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
@router.post("/auth/login")
@limiter.limit("5/minute")
def login(
    request: Request,
    data: LoginIn,
    response: Response,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """User Login with Optional TOTP.

    Authenticates a user with username and password, optionally verifying a
    6-digit TOTP code if two-factor authentication is enabled. On successful
    authentication, sets HTTP-only cookies for access token, refresh token,
    and CSRF token. The access token contains the user's roles for authorization.

    Authentication Flow:
    1. Verify username exists in database
    2. Verify password hash matches using Argon2
    3. If TOTP enabled, verify 6-digit code from authenticator app
    4. Generate access token (15min), refresh token (7d), CSRF token
    5. Set HTTP-only cookies with tokens
    6. Return success with username and roles

    Args:
        data: Login credentials (username, password, optional totp_code).
        response: FastAPI response object for setting cookies.
        db: Database session for user lookup.

    Returns:
        dict: Success response with keys:
            - detail: "ok"
            - user: {username: str, roles: list[str]}

    Raises:
        HTTPException: 400 if credentials invalid, 2FA required, or TOTP code invalid.
    """

    user = db.scalar(
        select(User).where(User.username == data.username.strip())
    )

    if not user or not verify_password(data.password, user.password_hash):
        raise HTTPException(400, "Invalid credentials")

    if getattr(user, "is_totp_enabled", False):
        if not data.totp_code:
            raise HTTPException(
                status_code=400,
                detail={
                    "message": "Two-factor required",
                    "error_code": "two_factor_required",
                },
            )
        if not verify_totp_code(user.totp_secret or "", data.totp_code):
            raise HTTPException(
                status_code=400,
                detail={
                    "message": "Invalid two-factor code",
                    "error_code": "invalid_totp",
                },
            )
    roles = [r.name for r in user.roles]
    competencies = user.get_final_competencies()
    access = create_jwt_with_competencies(user.username, roles, competencies)
    refresh = create_refresh_token(user.username)
    xsrf = make_csrf(user.username)
    set_auth_cookies(response, access, refresh, xsrf)
    return {
        "detail": "ok",
        "user": {"username": user.username, "roles": roles},
    }

list_organizations_public

list_organizations_public(db=DEP_GET_SESSION)

List organisations for registration.

Public endpoint that returns organisation names and IDs for the registration form dropdown. No authentication required. Only exposes the minimum fields needed (id and name).

Returns:

Type Description
dict[str, list[dict[str, Any]]]

dict with key organizations containing a list of

dict[str, list[dict[str, Any]]]

{id, name} objects.

Source code in app/main.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
@router.get("/auth/organizations")
def list_organizations_public(
    db: Session = DEP_GET_SESSION,
) -> dict[str, list[dict[str, Any]]]:
    """List organisations for registration.

    Public endpoint that returns organisation names and IDs for the
    registration form dropdown. No authentication required. Only exposes
    the minimum fields needed (id and name).

    Returns:
        dict with key ``organizations`` containing a list of
        ``{id, name}`` objects.
    """
    organizations = db.execute(select(Organization)).scalars().all()
    return {
        "organizations": [
            {"id": org.id, "name": org.name} for org in organizations
        ]
    }

register

register(request, payload, db=DEP_GET_SESSION)

User Registration.

Creates a new user account with username, email, and password. Performs validation checks for required fields, password minimum length, and uniqueness constraints. The password is hashed with Argon2 before storage. Users are created without any roles by default and must be assigned roles by an administrator.

Validation Rules: - Username and email must not be empty after stripping whitespace - Password must be at least 6 characters long - Username must be unique across all users - Email must be unique across all users

Parameters:

Name Type Description Default
payload RegisterIn

Registration data (username, email, password).

required
db Session

Database session for user creation.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Success response with {"detail": "created"}.

Raises:

Type Description
HTTPException

400 if validation fails or constraints violated: - "Missing fields" if username, email, or password empty - "Password too short" if password < 6 characters - "Username already exists" if username taken - "Email already exists" if email taken

Source code in app/main.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
@router.post("/auth/register")
@limiter.limit("3/minute")
def register(
    request: Request,
    payload: RegisterIn,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """User Registration.

    Creates a new user account with username, email, and password. Performs
    validation checks for required fields, password minimum length, and uniqueness
    constraints. The password is hashed with Argon2 before storage. Users are
    created without any roles by default and must be assigned roles by an
    administrator.

    Validation Rules:
    - Username and email must not be empty after stripping whitespace
    - Password must be at least 6 characters long
    - Username must be unique across all users
    - Email must be unique across all users

    Args:
        payload: Registration data (username, email, password).
        db: Database session for user creation.

    Returns:
        dict: Success response with {"detail": "created"}.

    Raises:
        HTTPException: 400 if validation fails or constraints violated:
            - "Missing fields" if username, email, or password empty
            - "Password too short" if password < 6 characters
            - "Username already exists" if username taken
            - "Email already exists" if email taken
    """
    username = payload.username.strip()
    email = payload.email.strip()
    if not username or not email or not payload.password:
        raise HTTPException(status_code=400, detail="Missing fields")

    if len(payload.password) < 6:
        raise HTTPException(status_code=400, detail="Password too short")

    existing = db.scalar(select(User).where(User.username == username))

    if existing:
        raise HTTPException(status_code=400, detail="Username already exists")

    existing = db.scalar(select(User).where(User.email == email))

    if existing:
        raise HTTPException(status_code=400, detail="Email already exists")

    user = User(
        username=username,
        full_name=(
            payload.full_name.strip()
            if payload.full_name and payload.full_name.strip()
            else None
        ),
        email=email,
        password_hash=hash_password(payload.password),
    )

    # When clinical services are disabled (teaching-only deployment),
    # auto-assign the "learner" base profession so the user can
    # immediately take teaching assessments.
    if not settings.CLINICAL_SERVICES_ENABLED:
        user.base_profession = "learner"

    db.add(user)
    db.flush()  # Assigns user.id so we can create the org membership

    # Add the user to the selected organisation as a primary member
    if payload.organisation_id is not None:
        org = db.scalar(
            select(Organization).where(
                Organization.id == payload.organisation_id
            )
        )
        if org is None:
            raise HTTPException(
                status_code=400, detail="Organisation not found"
            )
        db.execute(
            organisation_staff_member.insert().values(
                organisation_id=org.id,
                user_id=user.id,
                is_primary=True,
            )
        )

    db.commit()
    return {"detail": "created"}

forgot_password

forgot_password(request, data, db=DEP_GET_SESSION)

Request a password reset email.

Accepts an email address and, if a matching account exists, sends a password reset link. Always returns a success response regardless of whether the email exists, to prevent account enumeration.

The reset link contains a time-limited signed token (default 30 min).

Parameters:

Name Type Description Default
data ForgotPasswordIn

Payload with the user's email address.

required
db Session

Database session for user lookup.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Always returns {"detail": "ok"} to prevent enumeration.

Source code in app/main.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
@router.post("/auth/forgot-password")
@limiter.limit("3/minute")
def forgot_password(
    request: Request,
    data: ForgotPasswordIn,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Request a password reset email.

    Accepts an email address and, if a matching account exists, sends a
    password reset link. Always returns a success response regardless of
    whether the email exists, to prevent account enumeration.

    The reset link contains a time-limited signed token (default 30 min).

    Args:
        data: Payload with the user's email address.
        db: Database session for user lookup.

    Returns:
        dict: Always returns ``{"detail": "ok"}`` to prevent enumeration.
    """
    email = data.email.strip().lower()
    user = db.scalar(select(User).where(User.email == email))
    if user:
        token = create_password_reset_token(email)
        reset_url = f"{settings.FRONTEND_URL}/reset-password?token={token}"
        send_email(
            to=email,
            subject="Reset your Quill password",
            html_body=(
                f"<p>You requested a password reset for your Quill account.</p>"
                f'<p><a href="{reset_url}">Reset your password</a></p>'
                f"<p>This link expires in {settings.PASSWORD_RESET_TTL_MIN}"
                f" minutes. If you did not request this, ignore this email.</p>"
            ),
        )
    # Always return ok to prevent account enumeration
    return {"detail": "ok"}

reset_password

reset_password(request, data, db=DEP_GET_SESSION)

Reset a user's password using a reset token.

Verifies the token from the email link, validates the new password, and updates the user's password hash.

Parameters:

Name Type Description Default
data ResetPasswordIn

Payload with the reset token and new password.

required
db Session

Database session for user update.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

{"detail": "Password reset successfully"} on success.

Raises:

Type Description
HTTPException

400 if the token is invalid/expired or the new password does not meet requirements.

Source code in app/main.py
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
@router.post("/auth/reset-password")
@limiter.limit("5/minute")
def reset_password(
    request: Request,
    data: ResetPasswordIn,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Reset a user's password using a reset token.

    Verifies the token from the email link, validates the new password,
    and updates the user's password hash.

    Args:
        data: Payload with the reset token and new password.
        db: Database session for user update.

    Returns:
        dict: ``{"detail": "Password reset successfully"}`` on success.

    Raises:
        HTTPException: 400 if the token is invalid/expired or
            the new password does not meet requirements.
    """
    email = verify_password_reset_token(data.token)
    if not email:
        raise HTTPException(
            status_code=400,
            detail="Invalid or expired reset link",
        )
    if len(data.new_password) < 8:
        raise HTTPException(
            status_code=400,
            detail="New password must be at least 8 characters",
        )
    user = db.scalar(select(User).where(User.email == email))
    if not user:
        raise HTTPException(
            status_code=400,
            detail="Invalid or expired reset link",
        )
    user.password_hash = hash_password(data.new_password)
    db.add(user)
    db.commit()
    return {"detail": "Password reset successfully"}

create_user_with_cbac

create_user_with_cbac(payload, current_user=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Admin User Creation with CBAC.

Creates a new user account with full CBAC (Competency-Based Access Control) settings including base profession, competencies, and system permissions. Only accessible to users with admin or superadmin system permissions.

Validation Rules: - Email and password must not be empty - Password must be at least 8 characters long - Username must be unique across all users - Email must be unique across all users - Requesting user must have admin or superadmin permissions

Parameters:

Name Type Description Default
payload AdminUserCreateIn

User creation data with CBAC settings.

required
current_user User

Currently authenticated user (must be admin/superadmin).

DEP_CURRENT_USER
db Session

Database session for user creation.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Success response with new user ID and username.

Raises:

Type Description
HTTPException

403 if requesting user lacks admin permissions.

HTTPException

400 if validation fails or constraints violated.

Source code in app/main.py
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
@router.post("/users")
def create_user_with_cbac(
    payload: AdminUserCreateIn,
    current_user: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Admin User Creation with CBAC.

    Creates a new user account with full CBAC (Competency-Based Access Control)
    settings including base profession, competencies, and system permissions.
    Only accessible to users with admin or superadmin system permissions.

    Validation Rules:
    - Email and password must not be empty
    - Password must be at least 8 characters long
    - Username must be unique across all users
    - Email must be unique across all users
    - Requesting user must have admin or superadmin permissions

    Args:
        payload: User creation data with CBAC settings.
        current_user: Currently authenticated user (must be admin/superadmin).
        db: Database session for user creation.

    Returns:
        dict: Success response with new user ID and username.

    Raises:
        HTTPException: 403 if requesting user lacks admin permissions.
        HTTPException: 400 if validation fails or constraints violated.
    """
    # Check authorization
    if current_user.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Admin or superadmin permissions required to create users",
        )

    # Validation
    email = payload.email.strip()
    password = payload.password
    username = (
        payload.username or payload.name.strip().replace(" ", "").lower()
    )

    if not email or not password:
        raise HTTPException(status_code=400, detail="Missing required fields")

    if len(password) < 8:
        raise HTTPException(
            status_code=400, detail="Password must be at least 8 characters"
        )

    # Check uniqueness
    existing = db.scalar(select(User).where(User.username == username))
    if existing:
        raise HTTPException(status_code=400, detail="Username already exists")

    existing = db.scalar(select(User).where(User.email == email))
    if existing:
        raise HTTPException(status_code=400, detail="Email already exists")

    # Create user
    user = User(
        username=username,
        full_name=payload.name.strip(),
        email=email,
        password_hash=hash_password(password),
        base_profession=payload.base_profession,
        additional_competencies=payload.additional_competencies,
        removed_competencies=payload.removed_competencies,
        system_permissions=payload.system_permissions,
    )
    db.add(user)
    db.commit()
    db.refresh(user)

    return {
        "detail": "created",
        "id": user.id,
        "username": user.username,
        "email": user.email,
    }

update_user

update_user(user_id, payload, current_user=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Update User Account.

Updates an existing user account with new CBAC settings and/or credentials. Only provided fields will be updated - omitted fields remain unchanged. Password is only updated if provided (optional for security).

Requires admin or superadmin system permissions.

Validation Rules: - Only updates fields that are provided (not None) - Email must be unique if being changed - Username must be unique if being changed - Password must be at least 8 characters if being changed - Requesting user must have admin or superadmin permissions

Parameters:

Name Type Description Default
user_id int

ID of the user to update.

required
payload AdminUserUpdateIn

User update data (all fields optional).

required
current_user User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Success response with updated user details.

Raises:

Type Description
HTTPException

403 if requesting user lacks admin permissions.

HTTPException

404 if user not found.

HTTPException

400 if validation fails or constraints violated.

Source code in app/main.py
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
@router.patch("/users/{user_id}")
def update_user(
    user_id: int,
    payload: AdminUserUpdateIn,
    current_user: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Update User Account.

    Updates an existing user account with new CBAC settings and/or credentials.
    Only provided fields will be updated - omitted fields remain unchanged.
    Password is only updated if provided (optional for security).

    Requires admin or superadmin system permissions.

    Validation Rules:
    - Only updates fields that are provided (not None)
    - Email must be unique if being changed
    - Username must be unique if being changed
    - Password must be at least 8 characters if being changed
    - Requesting user must have admin or superadmin permissions

    Args:
        user_id: ID of the user to update.
        payload: User update data (all fields optional).
        current_user: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: Success response with updated user details.

    Raises:
        HTTPException: 403 if requesting user lacks admin permissions.
        HTTPException: 404 if user not found.
        HTTPException: 400 if validation fails or constraints violated.
    """
    # Check authorization
    if current_user.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Admin or superadmin permissions required to update users",
        )

    # Fetch user
    user = db.scalar(select(User).where(User.id == user_id))
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # Validate and update username
    if payload.username is not None:
        username = payload.username.strip()
        if username != user.username:
            existing = db.scalar(select(User).where(User.username == username))
            if existing:
                raise HTTPException(
                    status_code=400, detail="Username already exists"
                )
            user.username = username

    # Update full name
    if payload.name is not None:
        user.full_name = payload.name.strip() or None

    # Validate and update email
    if payload.email is not None:
        email = payload.email.strip()
        if email != user.email:
            if not email:
                raise HTTPException(
                    status_code=400, detail="Email is required"
                )
            existing = db.scalar(select(User).where(User.email == email))
            if existing:
                raise HTTPException(
                    status_code=400, detail="Email already exists"
                )
            user.email = email

    # Update password if provided
    if payload.password is not None and payload.password:
        if len(payload.password) < 8:
            raise HTTPException(
                status_code=400,
                detail="Password must be at least 8 characters",
            )
        user.password_hash = hash_password(payload.password)

    # Update CBAC fields if provided
    if payload.base_profession is not None:
        user.base_profession = payload.base_profession

    if payload.additional_competencies is not None:
        user.additional_competencies = payload.additional_competencies

    if payload.removed_competencies is not None:
        user.removed_competencies = payload.removed_competencies

    if payload.system_permissions is not None:
        user.system_permissions = payload.system_permissions

    # Commit changes
    db.commit()
    db.refresh(user)

    return {
        "detail": "updated",
        "id": user.id,
        "username": user.username,
        "email": user.email,
    }

totp_setup

totp_setup(u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

TOTP Two-Factor Setup.

Generates a new TOTP secret for the authenticated user (or reuses existing) and returns a provision URI for QR code display. The frontend renders this URI as a QR code that users scan with their authenticator app (Google Authenticator, Authy, etc.). The secret is saved to the database but TOTP is not enabled until the user verifies a code with /auth/totp/verify.

Setup Flow: 1. Check if user already has TOTP secret 2. Generate new Base32 secret if missing 3. Save secret to database 4. Generate otpauth:// provision URI 5. Return URI for QR code rendering

The frontend should render the URI as a QR code for an authenticator app.

Parameters:

Name Type Description Default
u User

Currently authenticated user from JWT.

DEP_CURRENT_USER
db Session

Database session for updating user.

DEP_GET_SESSION

Returns:

Name Type Description
TotpSetupOut TotpSetupOut

Object containing provision_uri for QR code.

Parameters:

Name Type Description Default
u User

Current authenticated user (injected).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
TotpSetupOut TotpSetupOut

Provisioning URI encoded with issuer and account name.

Source code in app/main.py
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
@router.post("/auth/totp/setup", response_model=TotpSetupOut)
def totp_setup(
    u: User = DEP_CURRENT_USER, db: Session = DEP_GET_SESSION
) -> TotpSetupOut:
    """TOTP Two-Factor Setup.

    Generates a new TOTP secret for the authenticated user (or reuses existing)
    and returns a provision URI for QR code display. The frontend renders this
    URI as a QR code that users scan with their authenticator app (Google
    Authenticator, Authy, etc.). The secret is saved to the database but TOTP
    is not enabled until the user verifies a code with /auth/totp/verify.

    Setup Flow:
    1. Check if user already has TOTP secret
    2. Generate new Base32 secret if missing
    3. Save secret to database
    4. Generate otpauth:// provision URI
    5. Return URI for QR code rendering

    The frontend should render the URI as a QR code for an authenticator app.

    Args:
        u: Currently authenticated user from JWT.
        db: Database session for updating user.

    Returns:
        TotpSetupOut: Object containing provision_uri for QR code.

    Args:
        u (User): Current authenticated user (injected).
        db (Session): Database session.

    Returns:
        TotpSetupOut: Provisioning URI encoded with issuer and account name.
    """
    if not getattr(u, "totp_secret", None):
        u.totp_secret = generate_totp_secret()

    db.add(u)
    db.commit()
    issuer = getattr(settings, "PROJECT_NAME", "Quill")
    uri = totp_provisioning_uri(
        u.totp_secret or "",
        u.username,
        issuer=issuer,
    )
    return TotpSetupOut(provision_uri=uri)

totp_verify

totp_verify(payload, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Verify TOTP and Enable Two-Factor.

Verifies the 6-digit TOTP code from the user's authenticator app and enables two-factor authentication on their account. This must be called after /auth/totp/setup to complete 2FA setup. Once enabled, the user will be required to provide a TOTP code on every login.

Verification Flow: 1. Check user has TOTP secret from setup 2. Verify 6-digit code matches current time window 3. Set is_totp_enabled=True in database 4. Return success

Parameters:

Name Type Description Default
payload TotpVerifyIn

Request containing the 6-digit TOTP code.

required
u User

Currently authenticated user from JWT.

DEP_CURRENT_USER
db Session

Database session for updating user.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Success response with {"detail": "enabled"}.

Raises:

Type Description
HTTPException

400 if: - No TOTP secret exists (must call /auth/totp/setup first) - TOTP code is invalid or expired

Source code in app/main.py
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
@router.post("/auth/totp/verify")
def totp_verify(
    payload: TotpVerifyIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Verify TOTP and Enable Two-Factor.

    Verifies the 6-digit TOTP code from the user's authenticator app and
    enables two-factor authentication on their account. This must be called
    after /auth/totp/setup to complete 2FA setup. Once enabled, the user
    will be required to provide a TOTP code on every login.

    Verification Flow:
    1. Check user has TOTP secret from setup
    2. Verify 6-digit code matches current time window
    3. Set is_totp_enabled=True in database
    4. Return success

    Args:
        payload: Request containing the 6-digit TOTP code.
        u: Currently authenticated user from JWT.
        db: Database session for updating user.

    Returns:
        dict: Success response with {"detail": "enabled"}.

    Raises:
        HTTPException: 400 if:
            - No TOTP secret exists (must call /auth/totp/setup first)
            - TOTP code is invalid or expired
    """
    if not getattr(u, "totp_secret", None):
        raise HTTPException(
            status_code=400,
            detail={
                "message": "No TOTP secret set",
                "error_code": "no_totp_secret",
            },
        )
    if not verify_totp_code(
        u.totp_secret or "",
        payload.code,
    ):
        raise HTTPException(
            status_code=400,
            detail={"message": "Invalid code", "error_code": "invalid_totp"},
        )
    u.is_totp_enabled = True
    db.add(u)
    db.commit()
    return {"detail": "enabled"}

totp_disable

totp_disable(u=DEP_REQUIRE_CSRF, db=DEP_GET_SESSION)

Disable Two-Factor Authentication.

Disables TOTP two-factor authentication for the current user and clears their TOTP secret. Future logins will only require username and password. Requires CSRF token validation since this is a security-sensitive operation.

Security Note

This is a privileged operation that reduces account security. CSRF protection prevents unauthorized disabling of 2FA via CSRF attacks.

Parameters:

Name Type Description Default
u User

Currently authenticated user (with CSRF validation).

DEP_REQUIRE_CSRF
db Session

Database session for updating user.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Success response with {"detail": "disabled"}.

Source code in app/main.py
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
@router.post("/auth/totp/disable")
def totp_disable(
    u: User = DEP_REQUIRE_CSRF, db: Session = DEP_GET_SESSION
) -> dict[str, str]:
    """Disable Two-Factor Authentication.

    Disables TOTP two-factor authentication for the current user and clears
    their TOTP secret. Future logins will only require username and password.
    Requires CSRF token validation since this is a security-sensitive operation.

    Security Note:
        This is a privileged operation that reduces account security. CSRF
        protection prevents unauthorized disabling of 2FA via CSRF attacks.

    Args:
        u: Currently authenticated user (with CSRF validation).
        db: Database session for updating user.

    Returns:
        dict: Success response with {"detail": "disabled"}.
    """
    u.is_totp_enabled = False
    u.totp_secret = None
    db.add(u)
    db.commit()
    return {"detail": "disabled"}

change_password

change_password(data, u=DEP_REQUIRE_CSRF, db=DEP_GET_SESSION)

Change the current user's password.

Verifies the current password, validates the new password meets minimum requirements, then updates the hash. Requires CSRF token.

Parameters:

Name Type Description Default
data ChangePasswordIn

Current and new password payload.

required
u User

Currently authenticated user (with CSRF validation).

DEP_REQUIRE_CSRF
db Session

Database session for updating user.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Success response with {"detail": "Password changed"}.

Raises:

Type Description
HTTPException

400 if current password is wrong or new password does not meet requirements.

Source code in app/main.py
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
@router.post("/auth/change-password")
def change_password(
    data: ChangePasswordIn,
    u: User = DEP_REQUIRE_CSRF,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Change the current user's password.

    Verifies the current password, validates the new password meets
    minimum requirements, then updates the hash. Requires CSRF token.

    Args:
        data: Current and new password payload.
        u: Currently authenticated user (with CSRF validation).
        db: Database session for updating user.

    Returns:
        dict: Success response with {"detail": "Password changed"}.

    Raises:
        HTTPException: 400 if current password is wrong or new password
            does not meet requirements.
    """
    if not verify_password(data.current_password, u.password_hash):
        raise HTTPException(
            status_code=400, detail="Current password is incorrect"
        )
    if len(data.new_password) < 8:
        raise HTTPException(
            status_code=400,
            detail="New password must be at least 8 characters",
        )
    u.password_hash = hash_password(data.new_password)
    db.add(u)
    db.commit()
    return {"detail": "Password changed"}

logout

logout(response, _u=DEP_CURRENT_USER)

User Logout.

Logs out the current user by clearing all authentication cookies (access_token, refresh_token, XSRF-TOKEN). The user will need to login again to access protected endpoints. Note that this only clears client-side cookies; the tokens remain valid until expiration since there's no server-side revocation.

Implementation Note

Production systems should implement token blacklisting for immediate revocation. Current implementation relies on short access token TTL (15 minutes) to limit exposure window.

Parameters:

Name Type Description Default
response Response

FastAPI response object for clearing cookies.

required
_u User

Currently authenticated user (validates auth before logout).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, str]

Success response with {"detail": "ok"}.

Source code in app/main.py
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
@router.post("/auth/logout")
def logout(response: Response, _u: User = DEP_CURRENT_USER) -> dict[str, str]:
    """User Logout.

    Logs out the current user by clearing all authentication cookies (access_token,
    refresh_token, XSRF-TOKEN). The user will need to login again to access
    protected endpoints. Note that this only clears client-side cookies; the
    tokens remain valid until expiration since there's no server-side revocation.

    Implementation Note:
        Production systems should implement token blacklisting for immediate
        revocation. Current implementation relies on short access token TTL
        (15 minutes) to limit exposure window.

    Args:
        response: FastAPI response object for clearing cookies.
        _u: Currently authenticated user (validates auth before logout).

    Returns:
        dict: Success response with {"detail": "ok"}.
    """
    clear_auth_cookies(response)
    return {"detail": "ok"}

me

me(u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Get Current User Profile.

Returns the authenticated user's profile information including username, email, assigned roles, system permissions, TOTP status, and enabled features from the user's primary organisation.

Parameters:

Name Type Description Default
u User

Currently authenticated user from JWT.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

User profile with keys: - id: User's database ID - username: User's username - email: User's email address - roles: List of assigned role names - system_permissions: User's system permission level - totp_enabled: Whether 2FA is active - enabled_features: Features enabled on user's primary org

Source code in app/main.py
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
@router.get("/auth/me")
def me(
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Get Current User Profile.

    Returns the authenticated user's profile information including username,
    email, assigned roles, system permissions, TOTP status, and enabled
    features from the user's primary organisation.

    Args:
        u: Currently authenticated user from JWT.
        db: Database session.

    Returns:
        dict: User profile with keys:
            - id: User's database ID
            - username: User's username
            - email: User's email address
            - roles: List of assigned role names
            - system_permissions: User's system permission level
            - totp_enabled: Whether 2FA is active
            - enabled_features: Features enabled on user's primary org
    """
    # Resolve features from user's primary org
    enabled_features: list[str] = []
    primary_org_row = db.execute(
        select(organisation_staff_member.c.organisation_id).where(
            organisation_staff_member.c.user_id == u.id,
            organisation_staff_member.c.is_primary.is_(True),
        )
    ).first()
    if primary_org_row:
        features = (
            db.execute(
                select(OrganisationFeature.feature_key).where(
                    OrganisationFeature.organisation_id == primary_org_row[0],
                )
            )
            .scalars()
            .all()
        )
        enabled_features = list(features)

    return {
        "id": u.id,
        "username": u.username,
        "name": u.full_name,
        "email": u.email,
        "roles": [r.name for r in u.roles],
        "system_permissions": u.system_permissions,
        "totp_enabled": u.is_totp_enabled,
        "enabled_features": enabled_features,
        "clinical_services_enabled": settings.CLINICAL_SERVICES_ENABLED,
    }

list_users

list_users(patient_id=None, permission_level=None, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List users, optionally filtered by shared org with a patient.

When patient_id is provided, returns staff who share an org with that patient plus external users with active access grants. This is used by the message participant picker.

Without patient_id, returns all users (admin/superadmin only). Use permission_level to filter by minimum permission level (e.g. staff returns staff, admin, and superadmin users).

Parameters:

Name Type Description Default
patient_id str | None

Optional FHIR patient ID to filter by shared org.

None
permission_level str | None

Optional minimum permission level to filter by.

None
u User

Currently authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Response with users array.

Raises:

Type Description
HTTPException

403 if user lacks permissions.

HTTPException

400 if permission_level is invalid.

Source code in app/main.py
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
@router.get("/users")
def list_users(
    patient_id: str | None = None,
    permission_level: str | None = None,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """List users, optionally filtered by shared org with a patient.

    When ``patient_id`` is provided, returns staff who share an org with
    that patient plus external users with active access grants. This is
    used by the message participant picker.

    Without ``patient_id``, returns all users (admin/superadmin only).
    Use ``permission_level`` to filter by minimum permission level
    (e.g. ``staff`` returns staff, admin, and superadmin users).

    Args:
        patient_id: Optional FHIR patient ID to filter by shared org.
        permission_level: Optional minimum permission level to filter by.
        u: Currently authenticated user.
        db: Database session.

    Returns:
        dict: Response with users array.

    Raises:
        HTTPException: 403 if user lacks permissions.
        HTTPException: 400 if permission_level is invalid.
    """
    if patient_id:
        # Filtered mode: staff in patient's orgs + external with access
        patient_orgs = get_patient_org_ids(db, patient_id)
        staff_ids = (
            get_org_staff_ids(db, patient_orgs) if patient_orgs else set()
        )

        # Also include external users with active access to this patient
        external_rows = db.execute(
            select(ExternalPatientAccess.user_id).where(
                ExternalPatientAccess.patient_id == patient_id,
                ExternalPatientAccess.revoked_at.is_(None),
            )
        ).all()
        external_ids = {r[0] for r in external_rows}

        all_ids = staff_ids | external_ids
        if not all_ids:
            return {"users": []}

        users = (
            db.execute(select(User).where(User.id.in_(all_ids)))
            .scalars()
            .unique()
            .all()
        )
        return {
            "users": [
                {
                    "id": user.id,
                    "username": user.username,
                    "email": user.email,
                    "system_permissions": user.system_permissions,
                }
                for user in users
            ]
        }

    # Unfiltered mode: admin/superadmin only
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )

    if permission_level is not None:
        if permission_level not in PERMISSION_LEVELS:
            raise HTTPException(
                status_code=400,
                detail=f"Invalid permission_level: {permission_level}",
            )
        min_index = PERMISSION_LEVELS.index(permission_level)
        allowed = PERMISSION_LEVELS[min_index:]
        stmt = select(User).where(User.system_permissions.in_(allowed))
    else:
        stmt = select(User)

    try:
        users = db.execute(stmt).scalars().unique().all()
        return {
            "users": [
                {
                    "id": user.id,
                    "username": user.username,
                    "email": user.email,
                    "system_permissions": user.system_permissions,
                }
                for user in users
            ]
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

get_user

get_user(user_id, current_user=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Get User Details.

Retrieves detailed information about a specific user including their CBAC settings (base profession, competencies) and system permissions. Used by the admin interface when editing user accounts.

Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
user_id int

ID of the user to retrieve.

required
current_user User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

User details with keys: - id: User ID - username: Username - email: Email address - name: Full name (currently same as username) - base_profession: Base profession ID - additional_competencies: Array of additional competency IDs - removed_competencies: Array of removed competency IDs - system_permissions: System permission level

Raises:

Type Description
HTTPException

403 if user lacks admin/superadmin permissions.

HTTPException

404 if user not found.

Source code in app/main.py
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
@router.get("/users/{user_id}")
def get_user(
    user_id: int,
    current_user: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Get User Details.

    Retrieves detailed information about a specific user including their
    CBAC settings (base profession, competencies) and system permissions.
    Used by the admin interface when editing user accounts.

    Requires admin or superadmin system permissions.

    Args:
        user_id: ID of the user to retrieve.
        current_user: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: User details with keys:
            - id: User ID
            - username: Username
            - email: Email address
            - name: Full name (currently same as username)
            - base_profession: Base profession ID
            - additional_competencies: Array of additional competency IDs
            - removed_competencies: Array of removed competency IDs
            - system_permissions: System permission level

    Raises:
        HTTPException: 403 if user lacks admin/superadmin permissions.
        HTTPException: 404 if user not found.
    """
    # Check permissions
    if current_user.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )

    # Fetch user
    user = db.scalar(select(User).where(User.id == user_id))
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return {
        "id": user.id,
        "username": user.username,
        "email": user.email,
        "name": user.full_name or user.username,
        "base_profession": user.base_profession,
        "additional_competencies": user.additional_competencies or [],
        "removed_competencies": user.removed_competencies or [],
        "system_permissions": user.system_permissions,
    }

refresh

refresh(response, request, db=DEP_GET_SESSION)

Rotate Tokens and Issue New Access Token.

Validates the refresh token from cookies and issues new access, refresh, and CSRF tokens. This extends the user's session without requiring re-login. Frontend automatically calls this endpoint when the access token expires (401 response) to maintain seamless user experience.

Token Rotation Flow: 1. Extract refresh token from HTTP-only cookie 2. Decode and validate refresh token (check expiry, type="refresh") 3. Load user from database by username in token 4. Generate new access token (15min TTL) 5. Generate new refresh token (7 day TTL) 6. Generate new CSRF token 7. Set all three cookies in response

Security Note

Token rotation reduces risk of token replay attacks. Each refresh invalidates the old tokens and issues new ones with fresh expiry times.

Parameters:

Name Type Description Default
response Response

FastAPI response object for setting new cookies.

required
request Request

FastAPI request object for reading refresh token cookie.

required
db Session

Database session for loading user.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Success response with {"detail": "ok"}.

Raises: HTTPException: 401 if: - No refresh_token cookie present - Token signature invalid or expired - Token type is not "refresh" - User not found in database or inactive

Source code in app/main.py
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
@router.post("/auth/refresh")
def refresh(
    response: Response, request: Request, db: Session = DEP_GET_SESSION
) -> dict[str, str]:
    """Rotate Tokens and Issue New Access Token.

    Validates the refresh token from cookies and issues new access, refresh,
    and CSRF tokens. This extends the user's session without requiring re-login.
    Frontend automatically calls this endpoint when the access token expires
    (401 response) to maintain seamless user experience.

    Token Rotation Flow:
    1. Extract refresh token from HTTP-only cookie
    2. Decode and validate refresh token (check expiry, type="refresh")
    3. Load user from database by username in token
    4. Generate new access token (15min TTL)
    5. Generate new refresh token (7 day TTL)
    6. Generate new CSRF token
    7. Set all three cookies in response

    Security Note:
        Token rotation reduces risk of token replay attacks. Each refresh
        invalidates the old tokens and issues new ones with fresh expiry times.

    Args:
        response: FastAPI response object for setting new cookies.
        request: FastAPI request object for reading refresh token cookie.
        db: Database session for loading user.

    Returns:
        dict: Success response with {"detail": "ok"}.

    Raises:
    Raises:
        HTTPException: 401 if:
            - No refresh_token cookie present
            - Token signature invalid or expired
            - Token type is not "refresh"
            - User not found in database or inactive
    """
    tok = request.cookies.get("refresh_token")
    if not tok:
        raise HTTPException(401, "No refresh token")
    try:
        payload = decode_token(tok)
        if payload.get("type") != "refresh":
            raise ValueError("not refresh")
    except Exception as e:
        raise HTTPException(401, "Bad refresh token") from e
    sub = payload.get("sub")
    user = db.scalar(select(User).where(User.username == sub))
    if not user or not user.is_active:
        raise HTTPException(401, "Inactive user")
    roles = [r.name for r in user.roles]
    competencies = user.get_final_competencies()
    new_access = create_jwt_with_competencies(
        user.username, roles, competencies
    )
    new_refresh = create_refresh_token(user.username)  # rotate
    xsrf = make_csrf(user.username)
    set_auth_cookies(response, new_access, new_refresh, xsrf)
    return {"detail": "refreshed"}

create_patient_record

create_patient_record(patient_id)

Create or Verify Patient in FHIR.

Verifies that a patient exists in the FHIR server before allowing clinical operations. This ensures the patient has a valid FHIR Patient resource before creating letters or other clinical documents. Requires Clinician role and CSRF token validation.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID to verify.

required

Returns:

Name Type Description
dict dict[str, str]

Patient verification response with keys: - patient_id: The verified patient ID - status: "ready" indicating patient exists

Raises:

Type Description
HTTPException

404 if patient not found in FHIR server.

HTTPException

500 if FHIR communication fails.

Source code in app/main.py
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
@router.post(
    "/patients/verify",
    dependencies=[
        DEP_REQUIRE_CLINICAL,
        DEP_REQUIRE_ROLES_CLINICIAN,
        DEP_REQUIRE_CSRF,
    ],
)
def create_patient_record(patient_id: str) -> dict[str, str]:
    """Create or Verify Patient in FHIR.

    Verifies that a patient exists in the FHIR server before allowing clinical
    operations. This ensures the patient has a valid FHIR Patient resource
    before creating letters or other clinical documents. Requires Clinician
    role and CSRF token validation.

    Args:
        patient_id: FHIR Patient resource ID to verify.

    Returns:
        dict: Patient verification response with keys:
            - patient_id: The verified patient ID
            - status: "ready" indicating patient exists

    Raises:
        HTTPException: 404 if patient not found in FHIR server.
        HTTPException: 500 if FHIR communication fails.
    """
    try:
        # Check if patient exists in FHIR
        patient = read_fhir_patient(patient_id)
        if not patient:
            raise HTTPException(
                status_code=404, detail="Patient not found in FHIR server"
            )
        return {"patient_id": patient_id, "status": "ready"}
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

list_patients

list_patients(include_inactive=False, scope=None, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List patients from FHIR, filtered by organisation membership.

By default, staff see only patients in their organisation(s). Admin/superadmin can pass scope=admin to see all patients. External users see only patients they have ExternalPatientAccess for.

Parameters:

Name Type Description Default
include_inactive bool

If true, include deactivated patients (admin only).

False
scope str | None

Pass "admin" to bypass org filtering (admin/superadmin only).

None
u User

Currently authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Response with patients array and fhir_ready flag.

Source code in app/main.py
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
@router.get("/patients", dependencies=[DEP_REQUIRE_CLINICAL])
def list_patients(
    include_inactive: bool = False,
    scope: str | None = None,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """List patients from FHIR, filtered by organisation membership.

    By default, staff see only patients in their organisation(s).
    Admin/superadmin can pass ``scope=admin`` to see all patients.
    External users see only patients they have ExternalPatientAccess for.

    Args:
        include_inactive: If true, include deactivated patients (admin only).
        scope: Pass "admin" to bypass org filtering (admin/superadmin only).
        u: Currently authenticated user.
        db: Database session.

    Returns:
        dict: Response with patients array and fhir_ready flag.
    """
    try:
        patients = list_fhir_patients()

        # Fetch all patient metadata from database
        stmt = select(PatientMetadata)
        metadata_records = db.execute(stmt).scalars().all()
        metadata_map = {m.patient_id: m.is_active for m in metadata_records}

        # Determine which patients are accessible
        is_admin = u.system_permissions in ["admin", "superadmin"]
        admin_scope = scope == "admin" and is_admin

        accessible_ids: set[str] | None = None
        if admin_scope:
            accessible_ids = None  # no filtering
        else:
            accessible_ids = get_accessible_patient_ids(db, u)

        # Enrich patients with activation status and filter
        enriched_patients = []
        for patient in patients:
            patient_id = patient.get("id")
            if patient_id is None:
                continue

            # Org-based filtering (skip for admin scope)
            if accessible_ids is not None and patient_id not in accessible_ids:
                continue

            is_active = metadata_map.get(patient_id, True)

            # Filter based on activation status
            if is_active or (include_inactive and is_admin):
                patient["is_active"] = is_active
                enriched_patients.append(patient)

        return {"patients": enriched_patients, "fhir_ready": True}
    except Exception:
        return {"patients": [], "fhir_ready": False}

upsert_demographics

upsert_demographics(patient_id, demographics, u=DEP_CURRENT_USER)

Update Patient Demographics in FHIR.

Updates patient demographic information in the FHIR server. Accepts a dictionary of FHIR-compatible demographic fields (name, address, telecom, birthDate, gender, etc.). Requires Clinician role and CSRF token validation since this modifies patient data.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID to update.

required
demographics dict[str, Any]

Dictionary of FHIR Patient fields to update.

required
u User

Currently authenticated user (unused but validates auth).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, str | Any]

Update response with keys: - patient_id: The updated patient ID - updated: True indicating success - data: Complete updated FHIR Patient resource

Raises:

Type Description
HTTPException

404 if patient not found in FHIR server.

HTTPException

500 if FHIR update operation fails.

Source code in app/main.py
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
@router.put(
    "/patients/{patient_id}/demographics",
    dependencies=[
        DEP_REQUIRE_CLINICAL,
        DEP_REQUIRE_ROLES_CLINICIAN,
        DEP_REQUIRE_CSRF,
    ],
)
def upsert_demographics(
    patient_id: str, demographics: dict[str, Any], u: User = DEP_CURRENT_USER
) -> dict[str, str | Any]:
    """Update Patient Demographics in FHIR.

    Updates patient demographic information in the FHIR server. Accepts a
    dictionary of FHIR-compatible demographic fields (name, address, telecom,
    birthDate, gender, etc.). Requires Clinician role and CSRF token validation
    since this modifies patient data.

    Args:
        patient_id: FHIR Patient resource ID to update.
        demographics: Dictionary of FHIR Patient fields to update.
        u: Currently authenticated user (unused but validates auth).

    Returns:
        dict: Update response with keys:
            - patient_id: The updated patient ID
            - updated: True indicating success
            - data: Complete updated FHIR Patient resource

    Raises:
        HTTPException: 404 if patient not found in FHIR server.
        HTTPException: 500 if FHIR update operation fails.
    """
    try:
        result = update_fhir_patient(patient_id, demographics)
        if result is None:
            raise HTTPException(status_code=404, detail="Patient not found")
        return {"patient_id": patient_id, "updated": True, "data": result}
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

get_demographics

get_demographics(patient_id, u=DEP_CURRENT_USER)

Get Patient Demographics from FHIR.

Retrieves complete demographic information for a specific patient from the FHIR server. Returns the full FHIR R4 Patient resource including name, date of birth, gender, identifiers, contact information, and address.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID to retrieve.

required
u User

Currently authenticated user (any role can read demographics).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, str | Any]

Patient demographics response with keys: - patient_id: The requested patient ID - data: Complete FHIR Patient resource

Raises:

Type Description
HTTPException

404 if patient not found in FHIR server.

HTTPException

500 if FHIR read operation fails.

Source code in app/main.py
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
@router.get(
    "/patients/{patient_id}/demographics",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def get_demographics(
    patient_id: str, u: User = DEP_CURRENT_USER
) -> dict[str, str | Any]:
    """Get Patient Demographics from FHIR.

    Retrieves complete demographic information for a specific patient from the
    FHIR server. Returns the full FHIR R4 Patient resource including name,
    date of birth, gender, identifiers, contact information, and address.

    Args:
        patient_id: FHIR Patient resource ID to retrieve.
        u: Currently authenticated user (any role can read demographics).

    Returns:
        dict: Patient demographics response with keys:
            - patient_id: The requested patient ID
            - data: Complete FHIR Patient resource

    Raises:
        HTTPException: 404 if patient not found in FHIR server.
        HTTPException: 500 if FHIR read operation fails.
    """
    try:
        patient = read_fhir_patient(patient_id)
        if patient is None:
            raise HTTPException(status_code=404, detail="Patient not found")
        return {"patient_id": patient_id, "data": patient}
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

write_letter

write_letter(patient_id, letter)

Create Clinical Letter in OpenEHR.

Creates a new clinical letter composition in EHRbase for the specified patient. Automatically ensures the patient has an EHR in EHRbase (creates if missing). Stores the letter title, markdown body, and author information. Requires Clinician role and CSRF token validation.

Letter Storage
  • Letters stored as OpenEHR Compositions in EHRbase
  • Each patient has corresponding EHR linked by FHIR patient ID
  • Markdown body preserved for future rendering
  • Author metadata includes name and email

Parameters:

Name Type Description Default
patient_id str

FHIR Patient ID to associate letter with.

required
letter LetterIn

Letter content (title, body, author metadata).

required

Returns:

Name Type Description
dict dict[str, str]

Created letter response with keys: - patient_id: The patient ID - composition_uid: OpenEHR composition UID for retrieval - title: Letter title

Raises:

Type Description
HTTPException

500 if EHR creation or composition write fails.

Source code in app/main.py
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
@router.post(
    "/patients/{patient_id}/letters",
    dependencies=[
        DEP_REQUIRE_CLINICAL,
        DEP_REQUIRE_ROLES_CLINICIAN,
        DEP_REQUIRE_CSRF,
    ],
)
def write_letter(patient_id: str, letter: LetterIn) -> dict[str, str]:
    """Create Clinical Letter in OpenEHR.

    Creates a new clinical letter composition in EHRbase for the specified patient.
    Automatically ensures the patient has an EHR in EHRbase (creates if missing).
    Stores the letter title, markdown body, and author information. Requires
    Clinician role and CSRF token validation.

    Letter Storage:
        - Letters stored as OpenEHR Compositions in EHRbase
        - Each patient has corresponding EHR linked by FHIR patient ID
        - Markdown body preserved for future rendering
        - Author metadata includes name and email

    Args:
        patient_id: FHIR Patient ID to associate letter with.
        letter: Letter content (title, body, author metadata).

    Returns:
        dict: Created letter response with keys:
            - patient_id: The patient ID
            - composition_uid: OpenEHR composition UID for retrieval
            - title: Letter title

    Raises:
        HTTPException: 500 if EHR creation or composition write fails.
    """
    try:
        result = create_letter_composition(
            patient_id=patient_id,
            title=letter.title,
            body=letter.body,
            author_name=letter.author_name,
        )
        return {
            "patient_id": patient_id,
            "composition_uid": result.get("uid", {}).get("value"),
            "title": letter.title,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

read_letter

read_letter(patient_id, composition_uid, u=DEP_CURRENT_USER)

Read Specific Clinical Letter from OpenEHR.

Retrieves a specific clinical letter composition from EHRbase by its composition UID. Returns the complete composition including title, body, author information, and OpenEHR metadata.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient ID the letter belongs to.

required
composition_uid str

OpenEHR composition UID from letter creation.

required
u User

Currently authenticated user (any role can read letters).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, Any]

Letter retrieval response with keys: - patient_id: The patient ID - composition_uid: The composition UID - data: Complete OpenEHR Composition structure

Raises:

Type Description
HTTPException

404 if letter not found in EHRbase.

HTTPException

500 if EHRbase read operation fails.

Source code in app/main.py
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
@router.get(
    "/patients/{patient_id}/letters/{composition_uid}",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def read_letter(
    patient_id: str, composition_uid: str, u: User = DEP_CURRENT_USER
) -> dict[str, Any]:
    """Read Specific Clinical Letter from OpenEHR.

    Retrieves a specific clinical letter composition from EHRbase by its
    composition UID. Returns the complete composition including title, body,
    author information, and OpenEHR metadata.

    Args:
        patient_id: FHIR Patient ID the letter belongs to.
        composition_uid: OpenEHR composition UID from letter creation.
        u: Currently authenticated user (any role can read letters).

    Returns:
        dict: Letter retrieval response with keys:
            - patient_id: The patient ID
            - composition_uid: The composition UID
            - data: Complete OpenEHR Composition structure

    Raises:
        HTTPException: 404 if letter not found in EHRbase.
        HTTPException: 500 if EHRbase read operation fails.
    """
    try:
        composition = get_letter_composition(patient_id, composition_uid)
        if composition is None:
            raise HTTPException(status_code=404, detail="Letter not found")
        return {
            "patient_id": patient_id,
            "composition_uid": composition_uid,
            "data": composition,
        }
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

list_letters

list_letters(patient_id, u=DEP_CURRENT_USER)

List All Clinical Letters for Patient.

Retrieves all clinical letter compositions for a specific patient from EHRbase. Returns a list of letter metadata (UID, title, creation date) without fetching the full content of each letter. Use the individual letter endpoint to retrieve complete letter content.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient ID to retrieve letters for.

required
u User

Currently authenticated user (any role can list letters).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, Any]

Letter list response with keys: - patient_id: The patient ID - letters: Array of letter metadata (UID, title, created date)

Raises:

Type Description
HTTPException

500 if EHRbase query fails.

Source code in app/main.py
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
@router.get(
    "/patients/{patient_id}/letters",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def list_letters(
    patient_id: str, u: User = DEP_CURRENT_USER
) -> dict[str, Any]:
    """List All Clinical Letters for Patient.

    Retrieves all clinical letter compositions for a specific patient from
    EHRbase. Returns a list of letter metadata (UID, title, creation date)
    without fetching the full content of each letter. Use the individual
    letter endpoint to retrieve complete letter content.

    Args:
        patient_id: FHIR Patient ID to retrieve letters for.
        u: Currently authenticated user (any role can list letters).

    Returns:
        dict: Letter list response with keys:
            - patient_id: The patient ID
            - letters: Array of letter metadata (UID, title, created date)

    Raises:
        HTTPException: 500 if EHRbase query fails.
    """
    try:
        letters = list_letters_for_patient(patient_id)
        return {"patient_id": patient_id, "letters": letters}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

create_patient_in_fhir

create_patient_in_fhir(data, u=DEP_CURRENT_USER)

Create New Patient in FHIR Server.

Creates a new FHIR R4 Patient resource with the provided name information. The patient will be assigned a FHIR resource ID (either auto-generated or custom if patient_id provided).

Parameters:

Name Type Description Default
data FHIRPatientCreateIn

Patient name and optional ID.

required
u User

Currently authenticated user (any role can create patients).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, Any]

Complete FHIR Patient resource with assigned ID.

Raises:

Type Description
HTTPException

500 if FHIR patient creation fails.

Source code in app/main.py
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
@router.post("/patients", dependencies=[DEP_REQUIRE_CLINICAL])
def create_patient_in_fhir(
    data: FHIRPatientCreateIn, u: User = DEP_CURRENT_USER
) -> dict[str, Any]:
    """Create New Patient in FHIR Server.

    Creates a new FHIR R4 Patient resource with the provided name information.
    The patient will be assigned a FHIR resource ID (either auto-generated or
    custom if patient_id provided).

    Args:
        data: Patient name and optional ID.
        u: Currently authenticated user (any role can create patients).

    Returns:
        dict: Complete FHIR Patient resource with assigned ID.

    Raises:
        HTTPException: 500 if FHIR patient creation fails.
    """
    try:
        patient = create_fhir_patient(
            given_name=data.given_name,
            family_name=data.family_name,
            birth_date=data.birth_date,
            gender=data.gender,
            nhs_number=data.nhs_number,
            mrn=data.mrn,
            patient_id=data.patient_id,
        )
        return patient
    except Exception as e:
        raise HTTPException(
            status_code=500, detail=f"Failed to create FHIR patient: {e}"
        ) from e

get_patient

get_patient(patient_id, u=DEP_CURRENT_USER)

Get Single Patient from FHIR.

Retrieves a specific patient's demographics from the FHIR server by ID. Returns the complete FHIR R4 Patient resource including name, birth date, gender, and identifiers. Used by the admin interface when editing patient records.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID to retrieve.

required
u User

Currently authenticated user (any role can view patients).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, Any]

Complete FHIR Patient resource.

Raises:

Type Description
HTTPException

404 if patient not found in FHIR server.

HTTPException

500 if FHIR server communication fails.

Source code in app/main.py
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
@router.get(
    "/patients/{patient_id}",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def get_patient(patient_id: str, u: User = DEP_CURRENT_USER) -> dict[str, Any]:
    """Get Single Patient from FHIR.

    Retrieves a specific patient's demographics from the FHIR server by ID.
    Returns the complete FHIR R4 Patient resource including name, birth date,
    gender, and identifiers. Used by the admin interface when editing patient
    records.

    Args:
        patient_id: FHIR Patient resource ID to retrieve.
        u: Currently authenticated user (any role can view patients).

    Returns:
        dict: Complete FHIR Patient resource.

    Raises:
        HTTPException: 404 if patient not found in FHIR server.
        HTTPException: 500 if FHIR server communication fails.
    """
    try:
        patient = read_fhir_patient(patient_id)
        if not patient:
            raise HTTPException(status_code=404, detail="Patient not found")
        # Mypy type narrowing: patient is now guaranteed to be dict[str, Any]
        assert patient is not None
        return patient
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(
            status_code=500, detail=f"Failed to retrieve patient: {e}"
        ) from e

update_patient

update_patient(patient_id, data, u=DEP_CURRENT_USER)

Update Patient in FHIR.

Updates an existing patient's demographics in the FHIR server. Accepts the same fields as patient creation (name, birth_date, gender, identifiers). Used by the admin interface when editing patient records.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID to update.

required
data FHIRPatientCreateIn

Updated patient demographics.

required
u User

Currently authenticated user (any role can update patients).

DEP_CURRENT_USER

Returns:

Name Type Description
dict dict[str, Any]

Complete updated FHIR Patient resource.

Raises:

Type Description
HTTPException

404 if patient not found in FHIR server.

HTTPException

500 if FHIR update operation fails.

Source code in app/main.py
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
@router.patch(
    "/patients/{patient_id}",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def update_patient(
    patient_id: str,
    data: FHIRPatientCreateIn,
    u: User = DEP_CURRENT_USER,
) -> dict[str, Any]:
    """Update Patient in FHIR.

    Updates an existing patient's demographics in the FHIR server. Accepts
    the same fields as patient creation (name, birth_date, gender, identifiers).
    Used by the admin interface when editing patient records.

    Args:
        patient_id: FHIR Patient resource ID to update.
        data: Updated patient demographics.
        u: Currently authenticated user (any role can update patients).

    Returns:
        dict: Complete updated FHIR Patient resource.

    Raises:
        HTTPException: 404 if patient not found in FHIR server.
        HTTPException: 500 if FHIR update operation fails.
    """
    try:
        # Read existing patient to verify it exists
        existing = read_fhir_patient(patient_id)
        if not existing:
            raise HTTPException(status_code=404, detail="Patient not found")

        # Build update dict with provided fields
        updates: dict[str, Any] = {}

        # Update name
        if data.given_name or data.family_name:
            updates["name"] = [
                {
                    "use": "official",
                    "given": [data.given_name] if data.given_name else [],
                    "family": data.family_name if data.family_name else "",
                }
            ]

        # Update birth date
        if data.birth_date:
            updates["birthDate"] = data.birth_date

        # Update gender
        if data.gender:
            updates["gender"] = data.gender

        # Update identifiers (NHS number, MRN)
        identifiers = []
        if data.nhs_number:
            identifiers.append(
                {
                    "system": "https://fhir.nhs.uk/Id/nhs-number",
                    "value": data.nhs_number,
                }
            )
        if data.mrn:
            identifiers.append(
                {
                    "system": "http://hospital.example.org/mrn",
                    "value": data.mrn,
                }
            )
        if identifiers:
            updates["identifier"] = identifiers

        # Perform update
        updated_patient = update_fhir_patient(patient_id, updates)
        if not updated_patient:
            raise HTTPException(
                status_code=500, detail="Failed to update patient"
            )
        # Mypy type narrowing: updated_patient is now guaranteed to be dict[str, Any]
        assert updated_patient is not None
        return updated_patient
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(
            status_code=500, detail=f"Failed to update patient: {e}"
        ) from e

get_patient_metadata

get_patient_metadata(patient_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Get Patient Metadata.

Returns application-specific metadata for a patient, including activation status. If no metadata record exists, returns default values (is_active=True).

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID.

required
u User

Currently authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Patient metadata with keys: - patient_id: FHIR Patient resource ID - is_active: Whether patient is active in the system

Source code in app/main.py
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
@router.get(
    "/patients/{patient_id}/metadata",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def get_patient_metadata(
    patient_id: str,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Get Patient Metadata.

    Returns application-specific metadata for a patient, including activation
    status. If no metadata record exists, returns default values (is_active=True).

    Args:
        patient_id: FHIR Patient resource ID.
        u: Currently authenticated user.
        db: Database session.

    Returns:
        dict: Patient metadata with keys:
            - patient_id: FHIR Patient resource ID
            - is_active: Whether patient is active in the system
    """
    stmt = select(PatientMetadata).where(
        PatientMetadata.patient_id == patient_id
    )
    metadata = db.execute(stmt).scalar_one_or_none()

    if metadata:
        return {
            "patient_id": metadata.patient_id,
            "is_active": metadata.is_active,
        }
    else:
        # No metadata record means patient is active by default
        return {
            "patient_id": patient_id,
            "is_active": True,
        }

deactivate_patient

deactivate_patient(patient_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Deactivate Patient Record.

Marks a patient as inactive in the system. Deactivated patients are hidden from clinical views but remain visible in admin pages with a deactivated flag. Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID to deactivate.

required
u User

Currently authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Confirmation with keys: - patient_id: The deactivated patient ID - is_active: False - message: Success message

Raises:

Type Description
HTTPException

403 if user lacks admin permissions.

HTTPException

404 if patient not found in FHIR.

Source code in app/main.py
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
@router.post(
    "/patients/{patient_id}/deactivate",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def deactivate_patient(
    patient_id: str,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Deactivate Patient Record.

    Marks a patient as inactive in the system. Deactivated patients are hidden
    from clinical views but remain visible in admin pages with a deactivated flag.
    Requires admin or superadmin system permissions.

    Args:
        patient_id: FHIR Patient resource ID to deactivate.
        u: Currently authenticated user.
        db: Database session.

    Returns:
        dict: Confirmation with keys:
            - patient_id: The deactivated patient ID
            - is_active: False
            - message: Success message

    Raises:
        HTTPException: 403 if user lacks admin permissions.
        HTTPException: 404 if patient not found in FHIR.
    """
    # Check permissions
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Admin or superadmin permission required to deactivate patients",
        )

    # Verify patient exists in FHIR
    patient = read_fhir_patient(patient_id)
    if not patient:
        raise HTTPException(status_code=404, detail="Patient not found")

    # Get or create metadata record
    stmt = select(PatientMetadata).where(
        PatientMetadata.patient_id == patient_id
    )
    metadata = db.execute(stmt).scalar_one_or_none()

    if metadata:
        # Update existing record
        metadata.is_active = False
    else:
        # Create new metadata record
        metadata = PatientMetadata(
            patient_id=patient_id,
            is_active=False,
        )
        db.add(metadata)

    db.commit()

    return {
        "patient_id": patient_id,
        "is_active": False,
        "message": "Patient deactivated successfully",
    }

activate_patient

activate_patient(patient_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Activate Patient Record.

Reactivates a previously deactivated patient, making them visible in all views again. Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID to activate.

required
u User

Currently authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Confirmation with keys: - patient_id: The activated patient ID - is_active: True - message: Success message

Raises:

Type Description
HTTPException

403 if user lacks admin permissions.

HTTPException

404 if patient not found in FHIR.

Source code in app/main.py
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
@router.post(
    "/patients/{patient_id}/activate",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def activate_patient(
    patient_id: str,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Activate Patient Record.

    Reactivates a previously deactivated patient, making them visible in all
    views again. Requires admin or superadmin system permissions.

    Args:
        patient_id: FHIR Patient resource ID to activate.
        u: Currently authenticated user.
        db: Database session.

    Returns:
        dict: Confirmation with keys:
            - patient_id: The activated patient ID
            - is_active: True
            - message: Success message

    Raises:
        HTTPException: 403 if user lacks admin permissions.
        HTTPException: 404 if patient not found in FHIR.
    """
    # Check permissions
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Admin or superadmin permission required to activate patients",
        )

    # Verify patient exists in FHIR
    patient = read_fhir_patient(patient_id)
    if not patient:
        raise HTTPException(status_code=404, detail="Patient not found")

    # Get or create metadata record
    stmt = select(PatientMetadata).where(
        PatientMetadata.patient_id == patient_id
    )
    metadata = db.execute(stmt).scalar_one_or_none()

    if metadata:
        # Update existing record
        metadata.is_active = True
    else:
        # Create new metadata record (already active by default)
        metadata = PatientMetadata(
            patient_id=patient_id,
            is_active=True,
        )
        db.add(metadata)

    db.commit()

    return {
        "patient_id": patient_id,
        "is_active": True,
        "message": "Patient activated successfully",
    }

shared_organisations_endpoint

shared_organisations_endpoint(patient_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Return organisations shared between the current user and a patient.

For external users this returns an empty list (they use per-patient access grants, not org membership).

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID.

required
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

organisations list with id/name/type for each shared org.

Source code in app/main.py
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
@router.get(
    "/patients/{patient_id}/shared-organisations",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def shared_organisations_endpoint(
    patient_id: str,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Return organisations shared between the current user and a patient.

    For external users this returns an empty list (they use
    per-patient access grants, not org membership).

    Args:
        patient_id: FHIR Patient resource ID.
        u: Authenticated user.
        db: Database session.

    Returns:
        dict: ``organisations`` list with id/name/type for each shared org.
    """
    if is_external_user(u.system_permissions):
        return {"organisations": []}

    shared_ids = get_shared_org_ids(db, u.id, patient_id)
    if not shared_ids:
        return {"organisations": []}

    orgs = (
        db.execute(select(Organization).where(Organization.id.in_(shared_ids)))
        .scalars()
        .all()
    )
    return {
        "organisations": [
            {"id": o.id, "name": o.name, "type": o.type} for o in orgs
        ]
    }

get_my_competencies async

get_my_competencies(user=DEP_CURRENT_USER)

Get current user's resolved competencies.

Returns the authenticated user's base profession and final competencies after resolving base profession + additional - removed competencies.

Returns:

Name Type Description
UserCompetenciesResponse UserCompetenciesResponse

User's competency information

Source code in app/main.py
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
@router.get(
    "/cbac/my-competencies",
    response_model=UserCompetenciesResponse,
    tags=["cbac"],
)
async def get_my_competencies(
    user: User = DEP_CURRENT_USER,
) -> UserCompetenciesResponse:
    """Get current user's resolved competencies.

    Returns the authenticated user's base profession and final competencies
    after resolving base profession + additional - removed competencies.

    Returns:
        UserCompetenciesResponse: User's competency information
    """
    return UserCompetenciesResponse(
        user_id=user.id,
        username=user.username,
        base_profession=user.base_profession,
        additional_competencies=user.additional_competencies or [],
        removed_competencies=user.removed_competencies or [],
        final_competencies=user.get_final_competencies(),
    )

prescribe_controlled async

prescribe_controlled(prescription, user=Depends(has_competency('prescribe_controlled_schedule_2')), db=DEP_GET_SESSION)

Prescribe controlled substance (Schedule 2).

Example endpoint demonstrating CBAC protection. Only users with 'prescribe_controlled_schedule_2' competency can access this endpoint.

Requires
  • Competency: prescribe_controlled_schedule_2
  • Authentication: JWT cookie

Parameters:

Name Type Description Default
prescription PrescriptionRequest

Prescription details

required
user User

Authenticated user (injected by dependency)

Depends(has_competency('prescribe_controlled_schedule_2'))
db Session

Database session

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Prescription confirmation

Raises:

Type Description
HTTPException

403 if user lacks competency

Source code in app/main.py
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
@router.post(
    "/prescriptions/controlled",
    tags=["cbac", "prescriptions"],
    status_code=201,
)
async def prescribe_controlled(
    prescription: PrescriptionRequest,
    user: User = Depends(has_competency("prescribe_controlled_schedule_2")),
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Prescribe controlled substance (Schedule 2).

    Example endpoint demonstrating CBAC protection. Only users with
    'prescribe_controlled_schedule_2' competency can access this endpoint.

    Requires:
        - Competency: prescribe_controlled_schedule_2
        - Authentication: JWT cookie

    Args:
        prescription: Prescription details
        user: Authenticated user (injected by dependency)
        db: Database session

    Returns:
        dict: Prescription confirmation

    Raises:
        HTTPException: 403 if user lacks competency
    """
    # In real implementation, this would create prescription in database
    return {
        "status": "success",
        "prescription_id": "RX001",
        "prescriber": user.username,
        "patient_id": prescription.patient_id,
        "medication": prescription.medication,
        "dose": prescription.dose,
        "duration_days": prescription.duration_days,
    }

update_my_competencies async

update_my_competencies(data, user=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Update user's additional/removed competencies.

Allows system administrators to add or remove competencies from a user's base profession template. This endpoint should be protected with additional authorization in production (e.g., require "Administrator" role).

NOTE: In production, this should require Administrator role and CSRF token.

Parameters:

Name Type Description Default
data UpdateCompetenciesRequest

Additional and removed competencies to update

required
user User

Authenticated user

DEP_CURRENT_USER
db Session

Database session

DEP_GET_SESSION

Returns:

Name Type Description
UserCompetenciesResponse UserCompetenciesResponse

Updated user competency information

Source code in app/main.py
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
@router.patch(
    "/cbac/my-competencies",
    response_model=UserCompetenciesResponse,
    tags=["cbac"],
)
async def update_my_competencies(
    data: UpdateCompetenciesRequest,
    user: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> UserCompetenciesResponse:
    """Update user's additional/removed competencies.

    Allows system administrators to add or remove competencies from a user's
    base profession template. This endpoint should be protected with additional
    authorization in production (e.g., require "Administrator" role).

    NOTE: In production, this should require Administrator role and CSRF token.

    Args:
        data: Additional and removed competencies to update
        user: Authenticated user
        db: Database session

    Returns:
        UserCompetenciesResponse: Updated user competency information
    """
    # Update user's competencies
    if data.additional_competencies is not None:
        user.additional_competencies = data.additional_competencies
    if data.removed_competencies is not None:
        user.removed_competencies = data.removed_competencies

    db.commit()
    db.refresh(user)

    return UserCompetenciesResponse(
        user_id=user.id,
        username=user.username,
        base_profession=user.base_profession,
        additional_competencies=user.additional_competencies or [],
        removed_competencies=user.removed_competencies or [],
        final_competencies=user.get_final_competencies(),
    )

list_organizations

list_organizations(u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List All Organizations.

Retrieves all organizations from the database. Returns basic information for each organization. Used by admin interface to display organization list and management options.

Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
u User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Response with key: - organizations: Array of organization objects

Raises:

Type Description
HTTPException

403 if user lacks admin/superadmin permissions.

HTTPException

500 if database query fails.

Source code in app/main.py
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
@router.get("/organizations")
def list_organizations(
    u: User = DEP_CURRENT_USER, db: Session = DEP_GET_SESSION
) -> dict[str, Any]:
    """List All Organizations.

    Retrieves all organizations from the database. Returns basic information
    for each organization. Used by admin interface to display organization
    list and management options.

    Requires admin or superadmin system permissions.

    Args:
        u: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: Response with key:
            - organizations: Array of organization objects

    Raises:
        HTTPException: 403 if user lacks admin/superadmin permissions.
        HTTPException: 500 if database query fails.
    """
    # Check permissions
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )

    try:
        organizations = db.execute(select(Organization)).scalars().all()
        return {
            "organizations": [
                {
                    "id": org.id,
                    "name": org.name,
                    "type": org.type,
                    "location": org.location,
                    "created_at": org.created_at.isoformat(),
                    "updated_at": org.updated_at.isoformat(),
                }
                for org in organizations
            ]
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

get_organization

get_organization(org_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Get Organization Details.

Retrieves detailed information about a specific organization including staff members and patient count.

Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
org_id int

ID of the organization to retrieve.

required
u User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Organization details with staff and patient information.

Raises:

Type Description
HTTPException

403 if user lacks admin/superadmin permissions.

HTTPException

404 if organization not found.

Source code in app/main.py
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
@router.get("/organizations/{org_id}")
def get_organization(
    org_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Get Organization Details.

    Retrieves detailed information about a specific organization including
    staff members and patient count.

    Requires admin or superadmin system permissions.

    Args:
        org_id: ID of the organization to retrieve.
        u: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: Organization details with staff and patient information.

    Raises:
        HTTPException: 403 if user lacks admin/superadmin permissions.
        HTTPException: 404 if organization not found.
    """
    # Check permissions
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )

    # Fetch organization
    org = db.scalar(select(Organization).where(Organization.id == org_id))
    if not org:
        raise HTTPException(status_code=404, detail="Organization not found")

    # Get staff members with primary status
    staff_query = (
        select(
            User.id,
            User.username,
            User.email,
            organisation_staff_member.c.is_primary,
        )
        .join(
            organisation_staff_member,
            organisation_staff_member.c.user_id == User.id,
        )
        .where(organisation_staff_member.c.organisation_id == org_id)
    )

    staff_members = db.execute(staff_query).all()

    # Get patient members
    patient_query = select(
        organisation_patient_member.c.patient_id,
        organisation_patient_member.c.is_primary,
    ).where(organisation_patient_member.c.organisation_id == org_id)

    patient_members = db.execute(patient_query).all()

    return {
        "id": org.id,
        "name": org.name,
        "type": org.type,
        "location": org.location,
        "created_at": org.created_at.isoformat(),
        "updated_at": org.updated_at.isoformat(),
        "staff_count": len(staff_members),
        "patient_count": len(patient_members),
        "staff_members": [
            {
                "id": sm.id,
                "username": sm.username,
                "email": sm.email,
                "is_primary": sm.is_primary or False,
            }
            for sm in staff_members
        ],
        "patient_members": [
            {
                "patient_id": pm.patient_id,
                "is_primary": pm.is_primary or False,
            }
            for pm in patient_members
        ],
    }

update_organization

update_organization(org_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Update Organisation.

Updates an existing organisation's details.

Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
org_id int

ID of the organisation to update.

required
body UpdateOrganizationIn

Fields to update (name, type, location). Only provided fields are updated.

required
u User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Updated organisation details.

Raises:

Type Description
HTTPException

400 if type is invalid.

HTTPException

403 if user lacks admin/superadmin permissions.

HTTPException

404 if organisation not found.

Source code in app/main.py
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
@router.put("/organizations/{org_id}")
def update_organization(
    org_id: int,
    body: UpdateOrganizationIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Update Organisation.

    Updates an existing organisation's details.

    Requires admin or superadmin system permissions.

    Args:
        org_id: ID of the organisation to update.
        body: Fields to update (name, type, location). Only provided fields are updated.
        u: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: Updated organisation details.

    Raises:
        HTTPException: 400 if type is invalid.
        HTTPException: 403 if user lacks admin/superadmin permissions.
        HTTPException: 404 if organisation not found.
    """
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )
    org = db.get(Organization, org_id)
    if not org:
        raise HTTPException(status_code=404, detail="Organisation not found")

    valid_types = [
        "hospital_team",
        "gp_practice",
        "private_clinic",
        "department",
        "teaching_establishment",
    ]

    if body.name is not None:
        org.name = body.name.strip()
    if body.type is not None:
        if body.type not in valid_types:
            raise HTTPException(
                status_code=400,
                detail=f"Invalid organisation type. Must be one of: {', '.join(valid_types)}",
            )
        org.type = body.type
    if body.location is not None:
        org.location = body.location.strip() or None

    db.commit()
    db.refresh(org)

    return {
        "id": org.id,
        "name": org.name,
        "type": org.type,
        "location": org.location,
        "created_at": org.created_at.isoformat(),
        "updated_at": org.updated_at.isoformat(),
    }

create_organization

create_organization(body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Create Organisation.

Creates a new organisation in the database.

Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
body CreateOrganizationIn

Organisation details (name, type, optional location).

required
u User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Created organisation details.

Raises:

Type Description
HTTPException

400 if type is invalid.

HTTPException

403 if user lacks admin/superadmin permissions.

Source code in app/main.py
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
@router.post("/organizations")
def create_organization(
    body: CreateOrganizationIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Create Organisation.

    Creates a new organisation in the database.

    Requires admin or superadmin system permissions.

    Args:
        body: Organisation details (name, type, optional location).
        u: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: Created organisation details.

    Raises:
        HTTPException: 400 if type is invalid.
        HTTPException: 403 if user lacks admin/superadmin permissions.
    """
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )

    valid_types = [
        "hospital_team",
        "gp_practice",
        "private_clinic",
        "department",
        "teaching_establishment",
    ]
    if body.type not in valid_types:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid organisation type. Must be one of: {', '.join(valid_types)}",
        )

    org = Organization(
        name=body.name.strip(),
        type=body.type,
        location=body.location.strip() if body.location else None,
    )
    db.add(org)
    db.commit()
    db.refresh(org)

    return {
        "id": org.id,
        "name": org.name,
        "type": org.type,
        "location": org.location,
        "created_at": org.created_at.isoformat(),
        "updated_at": org.updated_at.isoformat(),
    }

add_staff_to_organization

add_staff_to_organization(org_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Add Staff Member to Organisation.

Adds an existing user as a staff member of an organisation.

Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
org_id int

ID of the organisation.

required
body AddStaffIn

Staff member details (user_id).

required
u User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Confirmation with organisation and user IDs.

Raises:

Type Description
HTTPException

403 if user lacks admin/superadmin permissions.

HTTPException

404 if organisation or user not found.

HTTPException

409 if user is already a staff member.

Source code in app/main.py
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
@router.post("/organizations/{org_id}/staff")
def add_staff_to_organization(
    org_id: int,
    body: AddStaffIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Add Staff Member to Organisation.

    Adds an existing user as a staff member of an organisation.

    Requires admin or superadmin system permissions.

    Args:
        org_id: ID of the organisation.
        body: Staff member details (user_id).
        u: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: Confirmation with organisation and user IDs.

    Raises:
        HTTPException: 403 if user lacks admin/superadmin permissions.
        HTTPException: 404 if organisation or user not found.
        HTTPException: 409 if user is already a staff member.
    """
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )

    org = db.scalar(select(Organization).where(Organization.id == org_id))
    if not org:
        raise HTTPException(status_code=404, detail="Organization not found")

    user = db.scalar(select(User).where(User.id == body.user_id))
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    if not check_permission_level(user.system_permissions, PERMISSION_STAFF):
        raise HTTPException(
            status_code=400,
            detail="User must have staff-level permissions or above",
        )

    # Check if already a member
    existing = db.scalar(
        select(organisation_staff_member).where(
            organisation_staff_member.c.organisation_id == org_id,
            organisation_staff_member.c.user_id == body.user_id,
        )
    )
    if existing:
        raise HTTPException(
            status_code=409,
            detail="User is already a staff member of this organisation",
        )

    # Auto-set as primary if user has no existing primary org
    has_primary = db.scalar(
        select(organisation_staff_member.c.organisation_id).where(
            organisation_staff_member.c.user_id == body.user_id,
            organisation_staff_member.c.is_primary.is_(True),
        )
    )

    db.execute(
        organisation_staff_member.insert().values(
            organisation_id=org_id,
            user_id=body.user_id,
            is_primary=has_primary is None,
        )
    )
    db.commit()

    return {
        "organisation_id": org_id,
        "user_id": body.user_id,
        "username": user.username,
    }

add_patient_to_organization

add_patient_to_organization(org_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Add Patient to Organisation.

Adds a patient to an organisation by their FHIR patient ID.

Requires admin or superadmin system permissions.

Parameters:

Name Type Description Default
org_id int

ID of the organisation.

required
body AddPatientIn

Patient details (patient_id).

required
u User

Currently authenticated user (admin/superadmin only).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Confirmation with organisation and patient IDs.

Raises:

Type Description
HTTPException

403 if user lacks admin/superadmin permissions.

HTTPException

404 if organisation not found.

HTTPException

409 if patient is already a member.

Source code in app/main.py
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
@router.post(
    "/organizations/{org_id}/patients",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def add_patient_to_organization(
    org_id: int,
    body: AddPatientIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Add Patient to Organisation.

    Adds a patient to an organisation by their FHIR patient ID.

    Requires admin or superadmin system permissions.

    Args:
        org_id: ID of the organisation.
        body: Patient details (patient_id).
        u: Currently authenticated user (admin/superadmin only).
        db: Database session.

    Returns:
        dict: Confirmation with organisation and patient IDs.

    Raises:
        HTTPException: 403 if user lacks admin/superadmin permissions.
        HTTPException: 404 if organisation not found.
        HTTPException: 409 if patient is already a member.
    """
    if u.system_permissions not in ["admin", "superadmin"]:
        raise HTTPException(
            status_code=403,
            detail="Requires admin or superadmin permissions",
        )

    org = db.scalar(select(Organization).where(Organization.id == org_id))
    if not org:
        raise HTTPException(status_code=404, detail="Organization not found")

    # Check if already a member
    existing = db.scalar(
        select(organisation_patient_member).where(
            organisation_patient_member.c.organisation_id == org_id,
            organisation_patient_member.c.patient_id == body.patient_id,
        )
    )
    if existing:
        raise HTTPException(
            status_code=409,
            detail="Patient is already a member of this organisation",
        )

    db.execute(
        organisation_patient_member.insert().values(
            organisation_id=org_id,
            patient_id=body.patient_id,
            is_primary=False,
        )
    )
    db.commit()

    return {
        "organisation_id": org_id,
        "patient_id": body.patient_id,
    }

remove_staff_from_organization

remove_staff_from_organization(org_id, user_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Remove a staff member from an organisation.

Admin/superadmin only.

Parameters:

Name Type Description Default
org_id int

Organisation ID.

required
user_id int

User ID to remove.

required
u User

Authenticated admin user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Confirmation.

Source code in app/main.py
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
@router.delete(
    "/organizations/{org_id}/staff/{user_id}",
    dependencies=[DEP_REQUIRE_CSRF],
)
def remove_staff_from_organization(
    org_id: int,
    user_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Remove a staff member from an organisation.

    Admin/superadmin only.

    Args:
        org_id: Organisation ID.
        user_id: User ID to remove.
        u: Authenticated admin user.
        db: Database session.

    Returns:
        dict: Confirmation.
    """
    if u.system_permissions not in ("admin", "superadmin"):
        raise HTTPException(status_code=403, detail="Admin only")

    existing = db.scalar(
        select(organisation_staff_member).where(
            organisation_staff_member.c.organisation_id == org_id,
            organisation_staff_member.c.user_id == user_id,
        )
    )
    if not existing:
        raise HTTPException(status_code=404, detail="Membership not found")

    db.execute(
        organisation_staff_member.delete().where(
            organisation_staff_member.c.organisation_id == org_id,
            organisation_staff_member.c.user_id == user_id,
        )
    )
    db.commit()
    return {"status": "removed"}

remove_patient_from_organization

remove_patient_from_organization(org_id, patient_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Remove a patient from an organisation.

Admin/superadmin only.

Parameters:

Name Type Description Default
org_id int

Organisation ID.

required
patient_id str

FHIR Patient resource ID.

required
u User

Authenticated admin user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Confirmation.

Source code in app/main.py
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
@router.delete(
    "/organizations/{org_id}/patients/{patient_id}",
    dependencies=[DEP_REQUIRE_CSRF],
)
def remove_patient_from_organization(
    org_id: int,
    patient_id: str,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Remove a patient from an organisation.

    Admin/superadmin only.

    Args:
        org_id: Organisation ID.
        patient_id: FHIR Patient resource ID.
        u: Authenticated admin user.
        db: Database session.

    Returns:
        dict: Confirmation.
    """
    if u.system_permissions not in ("admin", "superadmin"):
        raise HTTPException(status_code=403, detail="Admin only")

    existing = db.scalar(
        select(organisation_patient_member).where(
            organisation_patient_member.c.organisation_id == org_id,
            organisation_patient_member.c.patient_id == patient_id,
        )
    )
    if not existing:
        raise HTTPException(status_code=404, detail="Membership not found")

    db.execute(
        organisation_patient_member.delete().where(
            organisation_patient_member.c.organisation_id == org_id,
            organisation_patient_member.c.patient_id == patient_id,
        )
    )
    db.commit()
    return {"status": "removed"}

list_org_features

list_org_features(org_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List enabled features for an organisation.

Admin/superadmin only.

Source code in app/main.py
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
@router.get("/organizations/{org_id}/features")
def list_org_features(
    org_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, list[dict[str, Any]]]:
    """List enabled features for an organisation.

    Admin/superadmin only.
    """
    if u.system_permissions not in ("admin", "superadmin"):
        raise HTTPException(status_code=403, detail="Admin only")

    org = db.get(Organization, org_id)
    if not org:
        raise HTTPException(status_code=404, detail="Organisation not found")

    return {
        "features": [
            FeatureOut(
                feature_key=f.feature_key,
                enabled_at=f.enabled_at,
                enabled_by=f.enabled_by,
            ).model_dump()
            for f in org.features
        ]
    }

toggle_org_feature

toggle_org_feature(org_id, feature_key, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Enable or disable a feature on an organisation.

Admin/superadmin only. When enabled=true a row is created; when enabled=false the row is deleted.

Source code in app/main.py
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
@router.put(
    "/organizations/{org_id}/features/{feature_key}",
    dependencies=[DEP_REQUIRE_CSRF],
)
def toggle_org_feature(
    org_id: int,
    feature_key: str,
    body: FeatureToggleIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Enable or disable a feature on an organisation.

    Admin/superadmin only.  When ``enabled=true`` a row is created;
    when ``enabled=false`` the row is deleted.
    """
    if u.system_permissions not in ("admin", "superadmin"):
        raise HTTPException(status_code=403, detail="Admin only")

    org = db.get(Organization, org_id)
    if not org:
        raise HTTPException(status_code=404, detail="Organisation not found")

    existing = db.scalar(
        select(OrganisationFeature).where(
            OrganisationFeature.organisation_id == org_id,
            OrganisationFeature.feature_key == feature_key,
        )
    )

    if body.enabled:
        if existing:
            return {"status": "already_enabled"}
        feature = OrganisationFeature(
            organisation_id=org_id,
            feature_key=feature_key,
            enabled_by=u.id,
        )
        db.add(feature)
        db.commit()
        return {"status": "enabled"}
    else:
        if not existing:
            return {"status": "already_disabled"}
        db.delete(existing)
        db.commit()
        return {"status": "disabled"}
link_patient_to_user(user_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Link a user account to a FHIR patient record.

Admin/superadmin only. Sets fhir_patient_id on the user.

Parameters:

Name Type Description Default
user_id int

User ID.

required
body dict[str, str]

Must contain fhir_patient_id.

required
u User

Authenticated admin user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Confirmation with user and patient IDs.

Source code in app/main.py
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
@router.patch(
    "/users/{user_id}/link-patient",
    dependencies=[DEP_REQUIRE_CSRF],
)
def link_patient_to_user(
    user_id: int,
    body: dict[str, str],
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Link a user account to a FHIR patient record.

    Admin/superadmin only. Sets ``fhir_patient_id`` on the user.

    Args:
        user_id: User ID.
        body: Must contain ``fhir_patient_id``.
        u: Authenticated admin user.
        db: Database session.

    Returns:
        dict: Confirmation with user and patient IDs.
    """
    if u.system_permissions not in ("admin", "superadmin"):
        raise HTTPException(status_code=403, detail="Admin only")

    fhir_patient_id = body.get("fhir_patient_id")
    if not fhir_patient_id:
        raise HTTPException(
            status_code=422, detail="fhir_patient_id is required"
        )

    target = db.get(User, user_id)
    if target is None:
        raise HTTPException(status_code=404, detail="User not found")

    # Check not already linked to another user
    clash = db.scalar(
        select(User).where(
            User.fhir_patient_id == fhir_patient_id,
            User.id != user_id,
        )
    )
    if clash is not None:
        raise HTTPException(
            status_code=409,
            detail="FHIR patient already linked to another user",
        )

    target.fhir_patient_id = fhir_patient_id
    db.commit()

    return {
        "user_id": user_id,
        "fhir_patient_id": fhir_patient_id,
    }

invite_external_user

invite_external_user(patient_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Generate an invite link for an external user.

Only the patient themselves (via fhir_patient_id) or an admin/superadmin may issue invites.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID.

required
body InviteExternalIn

Email and user type.

required
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

invite_url containing the signed JWT.

Source code in app/main.py
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
@router.post(
    "/patients/{patient_id}/invite-external",
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def invite_external_user(
    patient_id: str,
    body: InviteExternalIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Generate an invite link for an external user.

    Only the patient themselves (via ``fhir_patient_id``) or an
    admin/superadmin may issue invites.

    Args:
        patient_id: FHIR Patient resource ID.
        body: Email and user type.
        u: Authenticated user.
        db: Database session.

    Returns:
        dict: ``invite_url`` containing the signed JWT.
    """
    # Only patient-self or admin can invite
    is_own = u.fhir_patient_id is not None and u.fhir_patient_id == patient_id
    is_admin = u.system_permissions in ("admin", "superadmin")
    if not (is_own or is_admin):
        raise HTTPException(
            status_code=403,
            detail="Only the patient or an admin can invite external users",
        )

    token = create_invite_token(
        patient_id=patient_id,
        email=body.email,
        user_type=body.user_type,
    )

    # In production the base URL would come from settings
    invite_url = f"/app/accept-invite?token={token}"

    return {"invite_url": invite_url, "token": token}

accept_invite

accept_invite(body, db=DEP_GET_SESSION)

Accept an invite — register or grant access to existing user.

If the email matches an existing user, access is granted immediately. Otherwise a new user is created with the fields supplied in the request body.

Parameters:

Name Type Description Default
body AcceptInviteIn

Invite token and optional registration fields.

required
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

Status and redirect information.

Source code in app/main.py
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
@router.post("/accept-invite")
def accept_invite(
    body: AcceptInviteIn,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """Accept an invite — register or grant access to existing user.

    If the email matches an existing user, access is granted
    immediately. Otherwise a new user is created with the fields
    supplied in the request body.

    Args:
        body: Invite token and optional registration fields.
        db: Database session.

    Returns:
        dict: Status and redirect information.
    """
    from jose import JWTError

    try:
        payload = decode_invite_token(body.token)
    except JWTError as err:
        raise HTTPException(
            status_code=400, detail="Invalid or expired invite token"
        ) from err

    patient_id: str = payload["patient_id"]
    email: str = payload["email"]
    user_type: str = payload["user_type"]

    # Check if user already exists
    existing = db.scalar(select(User).where(User.email == email))

    if existing is not None:
        # Grant access to existing user (idempotent)
        grant = db.scalar(
            select(ExternalPatientAccess).where(
                ExternalPatientAccess.user_id == existing.id,
                ExternalPatientAccess.patient_id == patient_id,
            )
        )
        if grant is None:
            db.add(
                ExternalPatientAccess(
                    user_id=existing.id,
                    patient_id=patient_id,
                    granted_by_user_id=existing.id,
                )
            )
            db.commit()
        elif grant.revoked_at is not None:
            grant.revoked_at = None
            db.commit()
        return {"status": "access_granted", "user_id": existing.id}

    # New user registration
    if not body.username or not body.password:
        raise HTTPException(
            status_code=422,
            detail="username and password required for new registration",
        )

    # Validate uniqueness
    if db.scalar(select(User).where(User.username == body.username)):
        raise HTTPException(status_code=409, detail="Username already taken")

    new_user = User(
        username=body.username,
        email=email,
        password_hash=hash_password(body.password),
        system_permissions=user_type,
        base_profession="patient",
    )
    db.add(new_user)
    db.flush()

    db.add(
        ExternalPatientAccess(
            user_id=new_user.id,
            patient_id=patient_id,
            granted_by_user_id=new_user.id,
        )
    )
    db.commit()

    return {"status": "registered", "user_id": new_user.id}

revoke_external_access

revoke_external_access(patient_id, user_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Revoke an external user's access to a patient.

Admin/superadmin only. Soft-deletes by setting revoked_at.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID.

required
user_id int

ID of the external user.

required
u User

Authenticated admin user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, str]

Confirmation message.

Source code in app/main.py
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
@router.delete(
    "/patients/{patient_id}/external-access/{user_id}",
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def revoke_external_access(
    patient_id: str,
    user_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, str]:
    """Revoke an external user's access to a patient.

    Admin/superadmin only. Soft-deletes by setting ``revoked_at``.

    Args:
        patient_id: FHIR Patient resource ID.
        user_id: ID of the external user.
        u: Authenticated admin user.
        db: Database session.

    Returns:
        dict: Confirmation message.
    """
    if u.system_permissions not in ("admin", "superadmin"):
        raise HTTPException(status_code=403, detail="Admin only")

    grant = db.scalar(
        select(ExternalPatientAccess).where(
            ExternalPatientAccess.user_id == user_id,
            ExternalPatientAccess.patient_id == patient_id,
            ExternalPatientAccess.revoked_at.is_(None),
        )
    )
    if grant is None:
        raise HTTPException(
            status_code=404, detail="Active access grant not found"
        )

    grant.revoked_at = datetime.now(UTC)
    db.commit()

    return {"status": "revoked"}

list_external_access

list_external_access(patient_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List external users with access to a patient.

Returns active (non-revoked) external access grants.

Parameters:

Name Type Description Default
patient_id str

FHIR Patient resource ID.

required
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, Any]

grants list with user info and access details.

Source code in app/main.py
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
@router.get(
    "/patients/{patient_id}/external-access",
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def list_external_access(
    patient_id: str,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, Any]:
    """List external users with access to a patient.

    Returns active (non-revoked) external access grants.

    Args:
        patient_id: FHIR Patient resource ID.
        u: Authenticated user.
        db: Database session.

    Returns:
        dict: ``grants`` list with user info and access details.
    """
    # Only admin or the patient themselves
    is_own = u.fhir_patient_id is not None and u.fhir_patient_id == patient_id
    is_admin = u.system_permissions in ("admin", "superadmin")
    if not (is_own or is_admin):
        raise HTTPException(status_code=403, detail="Access denied")

    grants = (
        db.execute(
            select(ExternalPatientAccess).where(
                ExternalPatientAccess.patient_id == patient_id,
                ExternalPatientAccess.revoked_at.is_(None),
            )
        )
        .scalars()
        .all()
    )

    return {
        "grants": [
            {
                "user_id": g.user_id,
                "username": g.user.username,
                "email": g.user.email,
                "user_type": g.user.system_permissions,
                "granted_at": g.granted_at.isoformat(),
                "access_level": g.access_level,
            }
            for g in grants
        ]
    }

create_conversation_endpoint

create_conversation_endpoint(body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Create a new messaging conversation.

Creates a FHIR Communication resource as the source of truth, then projects the data into SQL for fast reads.

Parameters:

Name Type Description Default
body ConversationCreateIn

Conversation details including first message.

required
u User

Authenticated user (conversation creator).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
ConversationDetailOut ConversationDetailOut

The newly created conversation.

Source code in app/main.py
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
3374
3375
3376
3377
3378
3379
3380
3381
3382
3383
3384
3385
3386
3387
3388
@router.post(
    "/conversations",
    response_model=ConversationDetailOut,
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def create_conversation_endpoint(
    body: ConversationCreateIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> ConversationDetailOut:
    """Create a new messaging conversation.

    Creates a FHIR Communication resource as the source of truth,
    then projects the data into SQL for fast reads.

    Args:
        body: Conversation details including first message.
        u: Authenticated user (conversation creator).
        db: Database session.

    Returns:
        ConversationDetailOut: The newly created conversation.
    """
    try:
        return create_conversation(
            db=db,
            creator=u,
            patient_id=body.patient_id,
            initial_message=body.initial_message,
            subject=body.subject,
            participant_ids=body.participant_ids,
            include_patient_as_participant=(
                body.include_patient_as_participant
            ),
        )
    except PermissionError as exc:
        raise HTTPException(status_code=403, detail=str(exc)) from exc
    except FhirCommunicationError as exc:
        raise HTTPException(
            status_code=502,
            detail="Failed to create message in clinical store",
        ) from exc

list_conversations_endpoint

list_conversations_endpoint(status=None, patient_id=None, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List conversations for the current user.

Returns conversations the user participates in, optionally filtered by status or patient.

Parameters:

Name Type Description Default
status str | None

Optional filter by conversation status.

None
patient_id str | None

Optional filter by FHIR patient ID.

None
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
ConversationListOut ConversationListOut

List of conversations with metadata.

Source code in app/main.py
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412
3413
3414
3415
3416
3417
3418
3419
3420
3421
3422
@router.get(
    "/conversations",
    response_model=ConversationListOut,
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def list_conversations_endpoint(
    status: str | None = None,
    patient_id: str | None = None,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> ConversationListOut:
    """List conversations for the current user.

    Returns conversations the user participates in, optionally
    filtered by status or patient.

    Args:
        status: Optional filter by conversation status.
        patient_id: Optional filter by FHIR patient ID.
        u: Authenticated user.
        db: Database session.

    Returns:
        ConversationListOut: List of conversations with metadata.
    """
    items = list_conversations(
        db=db,
        user=u,
        status=status,
        patient_id=patient_id,
    )
    return ConversationListOut(conversations=items)

list_patient_conversations_endpoint

list_patient_conversations_endpoint(patient_id, status=None, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List all conversations about a patient.

Returns all conversations for the patient regardless of whether the current user is a participant.

Parameters:

Name Type Description Default
patient_id str

FHIR patient ID.

required
status str | None

Optional filter by conversation status.

None
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
ConversationListOut ConversationListOut

All conversations for this patient.

Source code in app/main.py
3425
3426
3427
3428
3429
3430
3431
3432
3433
3434
3435
3436
3437
3438
3439
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454
3455
3456
@router.get(
    "/patients/{patient_id}/conversations",
    response_model=ConversationListOut,
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def list_patient_conversations_endpoint(
    patient_id: str,
    status: str | None = None,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> ConversationListOut:
    """List all conversations about a patient.

    Returns all conversations for the patient regardless of
    whether the current user is a participant.

    Args:
        patient_id: FHIR patient ID.
        status: Optional filter by conversation status.
        u: Authenticated user.
        db: Database session.

    Returns:
        ConversationListOut: All conversations for this patient.
    """
    items = list_patient_conversations(
        db=db,
        patient_id=patient_id,
        user=u,
        status=status,
    )
    return ConversationListOut(conversations=items)

get_conversation_endpoint

get_conversation_endpoint(conversation_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Get a single conversation with all messages.

Also marks the conversation as read for the current user.

Parameters:

Name Type Description Default
conversation_id int

ID of the conversation.

required
u User

Authenticated user (must be a participant).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
ConversationDetailOut ConversationDetailOut

Full conversation with messages.

Raises:

Type Description
HTTPException

404 if not found or user is not a participant.

Source code in app/main.py
3459
3460
3461
3462
3463
3464
3465
3466
3467
3468
3469
3470
3471
3472
3473
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489
@router.get(
    "/conversations/{conversation_id}",
    response_model=ConversationDetailOut,
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def get_conversation_endpoint(
    conversation_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> ConversationDetailOut:
    """Get a single conversation with all messages.

    Also marks the conversation as read for the current user.

    Args:
        conversation_id: ID of the conversation.
        u: Authenticated user (must be a participant).
        db: Database session.

    Returns:
        ConversationDetailOut: Full conversation with messages.

    Raises:
        HTTPException: 404 if not found or user is not a participant.
    """
    result = get_conversation_detail(
        db=db, conversation_id=conversation_id, user=u
    )
    if result is None:
        raise HTTPException(status_code=404, detail="Conversation not found")
    return result

update_conversation_status_endpoint

update_conversation_status_endpoint(conversation_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Update conversation status (e.g. close, archive).

Only participants can update a conversation's status.

Parameters:

Name Type Description Default
conversation_id int

ID of the conversation.

required
body ConversationStatusUpdateIn

New status value.

required
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
ConversationOut ConversationOut

Updated conversation.

Raises:

Type Description
HTTPException

404 if not found or user is not a participant.

Source code in app/main.py
3492
3493
3494
3495
3496
3497
3498
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
3549
3550
3551
3552
3553
3554
3555
3556
3557
3558
3559
3560
3561
3562
3563
3564
3565
3566
@router.patch(
    "/conversations/{conversation_id}",
    response_model=ConversationOut,
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def update_conversation_status_endpoint(
    conversation_id: int,
    body: ConversationStatusUpdateIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> ConversationOut:
    """Update conversation status (e.g. close, archive).

    Only participants can update a conversation's status.

    Args:
        conversation_id: ID of the conversation.
        body: New status value.
        u: Authenticated user.
        db: Database session.

    Returns:
        ConversationOut: Updated conversation.

    Raises:
        HTTPException: 404 if not found or user is not a participant.
    """
    conv = db.get(Conversation, conversation_id)
    if conv is None:
        raise HTTPException(status_code=404, detail="Conversation not found")
    cp = next((p for p in conv.participants if p.user_id == u.id), None)
    if cp is None:
        raise HTTPException(status_code=404, detail="Conversation not found")

    conv.status = body.status
    db.commit()
    db.refresh(conv)

    # Calculate unread
    if cp.last_read_at:
        unread = sum(
            1
            for m in conv.messages
            if m.created_at > cp.last_read_at and m.sender_id != u.id
        )
    else:
        unread = sum(1 for m in conv.messages if m.sender_id != u.id)

    last_msg = (
        max(conv.messages, key=lambda m: m.created_at)
        if conv.messages
        else None
    )
    return ConversationOut(
        id=conv.id,
        fhir_conversation_id=conv.fhir_conversation_id,
        patient_id=conv.patient_id,
        subject=conv.subject,
        status=conv.status,
        created_at=conv.created_at,
        updated_at=conv.updated_at,
        participants=[
            ParticipantOut(
                user_id=p.user_id,
                username=p.user.username,
                display_name=p.user.username,
                role=p.role,
                joined_at=p.joined_at,
            )
            for p in conv.participants
        ],
        last_message_preview=last_msg.body[:200] if last_msg else None,
        last_message_time=last_msg.created_at if last_msg else None,
        unread_count=unread,
    )

send_message_endpoint

send_message_endpoint(conversation_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Send a message in a conversation.

Writes to FHIR first, then projects to SQL.

Parameters:

Name Type Description Default
conversation_id int

ID of the conversation.

required
body MessageCreateIn

Message body (and optional amendment reference).

required
u User

Authenticated user (must be a participant).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
MessageOut MessageOut

The newly created message.

Raises:

Type Description
HTTPException

404 if conversation not found.

HTTPException

403 if user is not a participant.

HTTPException

502 if FHIR write fails.

Source code in app/main.py
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
@router.post(
    "/conversations/{conversation_id}/messages",
    response_model=MessageOut,
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def send_message_endpoint(
    conversation_id: int,
    body: MessageCreateIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> MessageOut:
    """Send a message in a conversation.

    Writes to FHIR first, then projects to SQL.

    Args:
        conversation_id: ID of the conversation.
        body: Message body (and optional amendment reference).
        u: Authenticated user (must be a participant).
        db: Database session.

    Returns:
        MessageOut: The newly created message.

    Raises:
        HTTPException: 404 if conversation not found.
        HTTPException: 403 if user is not a participant.
        HTTPException: 502 if FHIR write fails.
    """
    try:
        return send_message(
            db=db,
            conversation_id=conversation_id,
            sender=u,
            body=body.body,
            amends_id=body.amends_id,
        )
    except ValueError as exc:
        raise HTTPException(status_code=404, detail=str(exc)) from exc
    except PermissionError as exc:
        raise HTTPException(status_code=403, detail=str(exc)) from exc
    except FhirCommunicationError as exc:
        raise HTTPException(
            status_code=502,
            detail="Failed to create message in clinical store",
        ) from exc

add_participant_endpoint

add_participant_endpoint(conversation_id, body, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Add a participant to a conversation.

Only existing participants can add others.

Parameters:

Name Type Description Default
conversation_id int

ID of the conversation.

required
body AddParticipantIn

User to add and their role.

required
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
ParticipantOut ParticipantOut

The added participant.

Raises:

Type Description
HTTPException

404 if conversation not found.

HTTPException

403 if requesting user is not a participant.

Source code in app/main.py
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
@router.post(
    "/conversations/{conversation_id}/participants",
    response_model=ParticipantOut,
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def add_participant_endpoint(
    conversation_id: int,
    body: AddParticipantIn,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> ParticipantOut:
    """Add a participant to a conversation.

    Only existing participants can add others.

    Args:
        conversation_id: ID of the conversation.
        body: User to add and their role.
        u: Authenticated user.
        db: Database session.

    Returns:
        ParticipantOut: The added participant.

    Raises:
        HTTPException: 404 if conversation not found.
        HTTPException: 403 if requesting user is not a participant.
    """
    conv = db.get(Conversation, conversation_id)
    if conv is None:
        raise HTTPException(status_code=404, detail="Conversation not found")
    cp = next((p for p in conv.participants if p.user_id == u.id), None)
    if cp is None:
        raise HTTPException(
            status_code=403,
            detail="Only participants can add others",
        )
    try:
        return add_participant(
            db=db,
            conversation_id=conversation_id,
            user_id=body.user_id,
            role=body.role,
        )
    except ValueError as exc:
        raise HTTPException(status_code=404, detail=str(exc)) from exc

list_participants_endpoint

list_participants_endpoint(conversation_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

List participants in a conversation.

Parameters:

Name Type Description Default
conversation_id int

ID of the conversation.

required
u User

Authenticated user (must be a participant).

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Type Description
list[ParticipantOut]

list[ParticipantOut]: Participants in the conversation.

Raises:

Type Description
HTTPException

404 if not found or user is not a participant.

Source code in app/main.py
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
@router.get(
    "/conversations/{conversation_id}/participants",
    response_model=list[ParticipantOut],
    dependencies=[DEP_REQUIRE_CLINICAL],
)
def list_participants_endpoint(
    conversation_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> list[ParticipantOut]:
    """List participants in a conversation.

    Args:
        conversation_id: ID of the conversation.
        u: Authenticated user (must be a participant).
        db: Database session.

    Returns:
        list[ParticipantOut]: Participants in the conversation.

    Raises:
        HTTPException: 404 if not found or user is not a participant.
    """
    conv = db.get(Conversation, conversation_id)
    if conv is None:
        raise HTTPException(status_code=404, detail="Conversation not found")
    cp = next((p for p in conv.participants if p.user_id == u.id), None)
    if cp is None:
        raise HTTPException(status_code=404, detail="Conversation not found")

    return [
        ParticipantOut(
            user_id=p.user_id,
            username=p.user.username,
            display_name=p.user.username,
            role=p.role,
            joined_at=p.joined_at,
        )
        for p in conv.participants
    ]

join_conversation_endpoint

join_conversation_endpoint(conversation_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Join a conversation as a staff member.

Staff can self-join any conversation they can see. Patients cannot self-join; they must be added by a participant.

Parameters:

Name Type Description Default
conversation_id int

ID of the conversation.

required
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
ParticipantOut ParticipantOut

The new or existing participant record.

Raises:

Type Description
HTTPException

404 if conversation not found.

HTTPException

403 if user is a patient.

Source code in app/main.py
3707
3708
3709
3710
3711
3712
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
@router.post(
    "/conversations/{conversation_id}/join",
    response_model=ParticipantOut,
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def join_conversation_endpoint(
    conversation_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> ParticipantOut:
    """Join a conversation as a staff member.

    Staff can self-join any conversation they can see.
    Patients cannot self-join; they must be added by a participant.

    Args:
        conversation_id: ID of the conversation.
        u: Authenticated user.
        db: Database session.

    Returns:
        ParticipantOut: The new or existing participant record.

    Raises:
        HTTPException: 404 if conversation not found.
        HTTPException: 403 if user is a patient.
    """
    try:
        return join_conversation(
            db=db,
            conversation_id=conversation_id,
            user=u,
        )
    except ValueError as exc:
        raise HTTPException(status_code=404, detail=str(exc)) from exc
    except PermissionError as exc:
        raise HTTPException(status_code=403, detail=str(exc)) from exc

mark_read_endpoint

mark_read_endpoint(conversation_id, u=DEP_CURRENT_USER, db=DEP_GET_SESSION)

Mark a conversation as read for the current user.

Parameters:

Name Type Description Default
conversation_id int

ID of the conversation.

required
u User

Authenticated user.

DEP_CURRENT_USER
db Session

Database session.

DEP_GET_SESSION

Returns:

Name Type Description
dict dict[str, bool]

Success status.

Raises:

Type Description
HTTPException

404 if not found or user is not a participant.

Source code in app/main.py
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
@router.post(
    "/conversations/{conversation_id}/read",
    dependencies=[DEP_REQUIRE_CLINICAL, DEP_REQUIRE_CSRF],
)
def mark_read_endpoint(
    conversation_id: int,
    u: User = DEP_CURRENT_USER,
    db: Session = DEP_GET_SESSION,
) -> dict[str, bool]:
    """Mark a conversation as read for the current user.

    Args:
        conversation_id: ID of the conversation.
        u: Authenticated user.
        db: Database session.

    Returns:
        dict: Success status.

    Raises:
        HTTPException: 404 if not found or user is not a participant.
    """
    ok = mark_conversation_read(
        db=db, conversation_id=conversation_id, user_id=u.id
    )
    if not ok:
        raise HTTPException(status_code=404, detail="Conversation not found")
    return {"ok": True}