Database Schema and Migrations

Docsfy uses SQLite for all metadata, auth, ACL, and session state. The schema is managed in application code (not external migration files), and migrations run automatically at startup.

Database location and startup flow

# src/docsfy/storage.py
DB_PATH = Path(os.getenv("DATA_DIR", "/data")) / "docsfy.db"
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
PROJECTS_DIR = DATA_DIR / "projects"
# src/docsfy/config.py
class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    admin_key: str = ""  # Required — validated at startup
    ai_provider: str = "claude"
    ai_model: str = "claude-opus-4-6[1m]"  # [1m] = 1 million token context window
    ai_cli_timeout: int = Field(default=60, gt=0)
    log_level: str = "INFO"
    data_dir: str = "/data"
    secure_cookies: bool = True  # Set to False for local HTTP dev
# src/docsfy/main.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    settings = get_settings()
    if not settings.admin_key:
        logger.error("ADMIN_KEY environment variable is required")
        raise SystemExit(1)

    if len(settings.admin_key) < 16:
        logger.error("ADMIN_KEY must be at least 16 characters long")
        raise SystemExit(1)

    _generating.clear()
    await init_db(data_dir=settings.data_dir)
    await cleanup_expired_sessions()
    yield
# docker-compose.yaml
services:
  docsfy:
    build: .
    ports:
      - "8000:8000"
    env_file: .env
    volumes:
      - ./data:/data
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

Tip: Persist /data (or your custom DATA_DIR) across restarts. If storage is ephemeral, your DB and generated project metadata are lost.


Schema overview

Docsfy creates and maintains four tables in init_db():

  • projects
  • users
  • project_access
  • sessions

projects

Stores generated documentation variants per project/owner/provider/model combination.

# src/docsfy/storage.py
await db.execute("""
    CREATE TABLE IF NOT EXISTS projects (
        name TEXT NOT NULL,
        ai_provider TEXT NOT NULL DEFAULT '',
        ai_model TEXT NOT NULL DEFAULT '',
        owner TEXT NOT NULL DEFAULT '',
        repo_url TEXT NOT NULL,
        status TEXT NOT NULL DEFAULT 'generating',
        current_stage TEXT,
        last_commit_sha TEXT,
        last_generated TEXT,
        page_count INTEGER DEFAULT 0,
        error_message TEXT,
        plan_json TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (name, ai_provider, ai_model, owner)
    )
""")

status is constrained in code (not DB enum):

# src/docsfy/storage.py
VALID_STATUSES = frozenset({"generating", "ready", "error", "aborted"})

Writes are idempotent via upsert on the composite PK:

# src/docsfy/storage.py
await db.execute(
    """INSERT INTO projects (name, ai_provider, ai_model, owner, repo_url, status, updated_at)
       VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
       ON CONFLICT(name, ai_provider, ai_model, owner) DO UPDATE SET
       repo_url = excluded.repo_url,
       status = excluded.status,
       error_message = NULL,
       current_stage = NULL,
       updated_at = CURRENT_TIMESTAMP""",
    (name, ai_provider, ai_model, owner, repo_url, status),
)

users

Stores API-key-authenticated users and role-based access.

# src/docsfy/storage.py
await db.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username TEXT UNIQUE NOT NULL,
        api_key_hash TEXT NOT NULL UNIQUE,
        role TEXT NOT NULL DEFAULT 'user',
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

API keys are not stored in plaintext; they are HMAC-hashed with ADMIN_KEY:

# src/docsfy/storage.py
def hash_api_key(key: str, hmac_secret: str = "") -> str:
    """Hash an API key with HMAC-SHA256 for storage.

    Uses ADMIN_KEY as the HMAC secret so that even if the source is read,
    keys cannot be cracked without the environment secret.
    """
    # NOTE: ADMIN_KEY is used as the HMAC secret. Rotating ADMIN_KEY will
    # invalidate all existing api_key_hash values, requiring all users to
    # regenerate their API keys.
    secret = hmac_secret or os.getenv("ADMIN_KEY", "")
    if not secret:
        msg = "ADMIN_KEY environment variable is required for key hashing"
        raise RuntimeError(msg)
    return hmac.new(secret.encode(), key.encode(), hashlib.sha256).hexdigest()

Note: Username admin is reserved in the DB user model; environment-admin auth is handled separately via ADMIN_KEY.

project_access

Stores explicit ACL grants for sharing projects between users.

# src/docsfy/storage.py
await db.execute("""
    CREATE TABLE IF NOT EXISTS project_access (
        project_name TEXT NOT NULL,
        project_owner TEXT NOT NULL DEFAULT '',
        username TEXT NOT NULL,
        PRIMARY KEY (project_name, project_owner, username)
    )
""")

Grant semantics are project-level for a given owner (all variants under that name/owner):

# src/docsfy/storage.py
async def grant_project_access(
    project_name: str, username: str, project_owner: str = ""
) -> None:
    """Grant a user access to all variants of a project."""
    if not project_owner:
        msg = "project_owner is required for access grants"
        raise ValueError(msg)
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            "INSERT OR IGNORE INTO project_access (project_name, project_owner, username) VALUES (?, ?, ?)",
            (project_name, project_owner, username),
        )
        await db.commit()

sessions

Stores browser session state (token, user, role flag, expiration).

# src/docsfy/storage.py
await db.execute("""
    CREATE TABLE IF NOT EXISTS sessions (
        token TEXT PRIMARY KEY,
        username TEXT NOT NULL,
        is_admin INTEGER NOT NULL DEFAULT 0,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        expires_at TIMESTAMP NOT NULL
    )
""")

Session tokens are opaque to clients and stored hashed in DB:

# src/docsfy/storage.py
SESSION_TTL_SECONDS = 28800  # 8 hours
SESSION_TTL_HOURS = SESSION_TTL_SECONDS // 3600

def _hash_session_token(token: str) -> str:
    """Hash a session token for storage."""
    return hashlib.sha256(token.encode()).hexdigest()

async def create_session(
    username: str, is_admin: bool = False, ttl_hours: int = SESSION_TTL_HOURS
) -> str:
    """Create an opaque session token."""
    token = secrets.token_urlsafe(32)
    token_hash = _hash_session_token(token)
    expires_at = datetime.now(timezone.utc) + timedelta(hours=ttl_hours)
    expires_str = expires_at.strftime("%Y-%m-%d %H:%M:%S")
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            "INSERT INTO sessions (token, username, is_admin, expires_at) VALUES (?, ?, ?, ?)",
            (token_hash, username, 1 if is_admin else 0, expires_str),
        )
        await db.commit()
    return token

async def get_session(token: str) -> dict[str, str | int | None] | None:
    """Look up a session. Returns None if expired or not found."""
    token_hash = _hash_session_token(token)
    async with aiosqlite.connect(DB_PATH) as db:
        db.row_factory = aiosqlite.Row
        cursor = await db.execute(
            "SELECT * FROM sessions WHERE token = ? AND expires_at > datetime('now')",
            (token_hash,),
        )
        row = await cursor.fetchone()
        return dict(row) if row else None

And cookie max-age is aligned with session TTL:

# src/docsfy/main.py
response.set_cookie(
    "docsfy_session",
    session_token,
    httponly=True,
    samesite="strict",
    secure=settings.secure_cookies,
    max_age=SESSION_TTL_SECONDS,
)

Built-in migration behavior

Docsfy uses in-code, idempotent migrations inside init_db().

Note: There is no migration version table and no separate migration runner. Startup is the migration trigger.

1) projects PK migration (legacy 3-column to 4-column owner-aware PK)

Detection and migration are automatic:

# src/docsfy/storage.py
# Migration: convert old 3-column PK table to 4-column PK (with owner)
cursor = await db.execute("PRAGMA table_info(projects)")
columns = await cursor.fetchall()
col_names = [c[1] for c in columns]

needs_pk_migration = False

# Detect old schema: owner not in columns, or owner is nullable
if "owner" not in col_names:
    needs_pk_migration = True
elif "ai_provider" not in col_names:
    needs_pk_migration = True
else:
    # Check if ai_provider is nullable (old schema)
    for col in columns:
        if col[1] == "ai_provider" and col[3] == 0:  # notnull=0 means nullable
            needs_pk_migration = True
            break
# src/docsfy/storage.py
await db.execute(f"""
    INSERT OR IGNORE INTO projects_new
        (name, ai_provider, ai_model, owner, repo_url, status, current_stage,
         last_commit_sha, last_generated, page_count, error_message,
         plan_json, created_at, updated_at)
    SELECT {", ".join(select_cols)}
    FROM projects
""")
await db.execute("DROP TABLE projects")
await db.execute("ALTER TABLE projects_new RENAME TO projects")

Warning: This migration rewrites the table and drops the original after copy. Back up docsfy.db before major upgrades.

Warning: Data copy uses INSERT OR IGNORE; if legacy rows collide under the new composite key, ignored rows will not be migrated.

2) users.role backfill migration

# src/docsfy/storage.py
# Migration: add role column for existing DBs
try:
    await db.execute(
        "ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user'"
    )
except sqlite3.OperationalError as exc:
    if "duplicate column name" not in str(exc).lower():
        logger.exception("Migration failed while adding column")
        raise

3) users.api_key_hash uniqueness migration

# src/docsfy/storage.py
cursor = await db.execute("PRAGMA index_list(users)")
indexes = await cursor.fetchall()
has_unique_key_index = False
for idx in indexes:
    if idx[2]:  # unique=1
        cursor2 = await db.execute(f"PRAGMA index_info({idx[1]})")
        idx_cols = await cursor2.fetchall()
        for ic in idx_cols:
            if ic[2] == "api_key_hash":
                has_unique_key_index = True
                break
    if has_unique_key_index:
        break

if not has_unique_key_index:
    try:
        await db.execute(
            "CREATE UNIQUE INDEX IF NOT EXISTS idx_users_api_key_hash ON users (api_key_hash)"
        )
    except sqlite3.OperationalError as exc:
        if "unique" not in str(exc).lower():
            logger.exception("Migration failed while adding unique index")
            raise

4) project_access.project_owner backfill migration

# src/docsfy/storage.py
# Migration: add project_owner column to project_access
try:
    await db.execute(
        "ALTER TABLE project_access ADD COLUMN project_owner TEXT NOT NULL DEFAULT ''"
    )
except sqlite3.OperationalError as exc:
    if "duplicate column name" not in str(exc).lower():
        logger.exception("Migration failed while adding column")
        raise

5) Startup recovery behavior (non-schema but migration-adjacent)

On restart, in-progress generations are marked failed:

# src/docsfy/storage.py
cursor = await db.execute(
    "UPDATE projects SET status = 'error', error_message = 'Server restarted during generation', current_stage = NULL WHERE status = 'generating'"
)

Expired sessions are pruned during app startup:

# src/docsfy/storage.py
async def cleanup_expired_sessions() -> None:
    """Remove expired sessions.

    NOTE: This is called during application startup (lifespan) only.
    Expired sessions accumulate between restarts but are harmless since
    get_session() filters by expires_at. For long-running deployments,
    consider calling this periodically (e.g., via a background task).
    TODO: Add periodic cleanup for long-running instances.
    """
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute("DELETE FROM sessions WHERE expires_at <= datetime('now')")
        await db.commit()

Integrity model and relationships

Docsfy intentionally enforces most relationships at application level (not SQLite foreign keys).

  • projects.owner logically maps to users.username
  • project_access.username maps to users.username
  • project_access.(project_name, project_owner) maps to project identity (across variants)
  • sessions.username maps to user identity (including env-admin login path)

Cleanup logic is explicit in code:

# src/docsfy/storage.py
async def delete_user(username: str) -> bool:
    """Delete a user by username, invalidating all their sessions and cleaning up ACLs."""
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute("DELETE FROM sessions WHERE username = ?", (username,))
        # Clean up owned projects and their access entries
        await db.execute("DELETE FROM projects WHERE owner = ?", (username,))
        await db.execute(
            "DELETE FROM project_access WHERE project_owner = ?", (username,)
        )
        # Clean up ACL entries where user was granted access
        await db.execute("DELETE FROM project_access WHERE username = ?", (username,))
        cursor = await db.execute("DELETE FROM users WHERE username = ?", (username,))
        await db.commit()
        return cursor.rowcount > 0
# src/docsfy/storage.py
# Clean up project_access if no more variants remain for this name+owner
if cursor.rowcount > 0 and owner is not None:
    remaining = await db.execute(
        "SELECT COUNT(*) FROM projects WHERE name = ? AND owner = ?",
        (name, owner),
    )
    row = await remaining.fetchone()
    if row and row[0] == 0:
        await db.execute(
            "DELETE FROM project_access WHERE project_name = ? AND project_owner = ?",
            (name, owner),
        )

Warning: Because there are no DB-level foreign keys, direct/manual SQL writes can create orphaned rows that the app does not automatically reconcile unless specific cleanup paths are triggered.


Test and CI coverage for schema behavior

Key migration-adjacent behaviors are covered by tests:

# tests/test_storage.py
async def test_init_db_resets_orphaned_generating(db_path: Path) -> None:
    from docsfy.storage import get_project, init_db, save_project

    await save_project(
        name="stuck-repo",
        repo_url="https://github.com/org/stuck.git",
        status="generating",
        ai_provider="claude",
        ai_model="opus",
        owner="testuser",
    )

    # Simulate server restart by re-running init_db
    await init_db()

    project = await get_project(
        "stuck-repo", ai_provider="claude", ai_model="opus", owner="testuser"
    )
    assert project is not None
    assert project["status"] == "error"
    assert "Server restarted" in project["error_message"]
# tests/test_storage.py
async def test_cleanup_expired_sessions(db_path: Path) -> None:
    import aiosqlite

    from docsfy.storage import (
        DB_PATH,
        _hash_session_token,
        cleanup_expired_sessions,
        create_session,
    )

    # Directly insert a session with a past expiration
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            "INSERT INTO sessions (token, username, is_admin, expires_at) VALUES (?, ?, ?, ?)",
            ("expired-token", "expired-user", 0, "2020-01-01T00:00:00"),
        )
        await db.commit()

    # Create a valid session
    valid_token = await create_session("valid-user", ttl_hours=8)

    await cleanup_expired_sessions()

    # Check that only the valid session remains
    async with aiosqlite.connect(DB_PATH) as db:
        cursor = await db.execute("SELECT COUNT(*) FROM sessions")
        row = await cursor.fetchone()
        assert row is not None
        assert row[0] == 1

        # Session tokens are stored as hashes
        token_hash = _hash_session_token(valid_token)
        cursor = await db.execute(
            "SELECT username FROM sessions WHERE token = ?", (token_hash,)
        )
        row = await cursor.fetchone()
        assert row is not None
        assert row[0] == "valid-user"

CI test entry point:

# tox.toml
skipsdist = true

envlist = ["unittests"]

[env.unittests]
deps = ["uv"]
commands = [["uv", "run", "--extra", "dev", "pytest", "-n", "auto", "tests"]]

Note: Current tests validate startup recovery, session cleanup, owner scoping, and data_dir initialization, but they do not include explicit fixtures for every legacy schema branch in init_db() (for example, a seeded pre-owner projects table).