Database Schema and Migrations
Docsfy uses SQLite for all metadata, auth, ACL, and session state. The schema is managed in application code (not external migration files), and migrations run automatically at startup.
Database location and startup flow
# src/docsfy/storage.py
DB_PATH = Path(os.getenv("DATA_DIR", "/data")) / "docsfy.db"
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
PROJECTS_DIR = DATA_DIR / "projects"
# src/docsfy/config.py
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
admin_key: str = "" # Required — validated at startup
ai_provider: str = "claude"
ai_model: str = "claude-opus-4-6[1m]" # [1m] = 1 million token context window
ai_cli_timeout: int = Field(default=60, gt=0)
log_level: str = "INFO"
data_dir: str = "/data"
secure_cookies: bool = True # Set to False for local HTTP dev
# src/docsfy/main.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
settings = get_settings()
if not settings.admin_key:
logger.error("ADMIN_KEY environment variable is required")
raise SystemExit(1)
if len(settings.admin_key) < 16:
logger.error("ADMIN_KEY must be at least 16 characters long")
raise SystemExit(1)
_generating.clear()
await init_db(data_dir=settings.data_dir)
await cleanup_expired_sessions()
yield
# docker-compose.yaml
services:
docsfy:
build: .
ports:
- "8000:8000"
env_file: .env
volumes:
- ./data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
Tip: Persist
/data(or your customDATA_DIR) across restarts. If storage is ephemeral, your DB and generated project metadata are lost.
Schema overview
Docsfy creates and maintains four tables in init_db():
projectsusersproject_accesssessions
projects
Stores generated documentation variants per project/owner/provider/model combination.
# src/docsfy/storage.py
await db.execute("""
CREATE TABLE IF NOT EXISTS projects (
name TEXT NOT NULL,
ai_provider TEXT NOT NULL DEFAULT '',
ai_model TEXT NOT NULL DEFAULT '',
owner TEXT NOT NULL DEFAULT '',
repo_url TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'generating',
current_stage TEXT,
last_commit_sha TEXT,
last_generated TEXT,
page_count INTEGER DEFAULT 0,
error_message TEXT,
plan_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (name, ai_provider, ai_model, owner)
)
""")
status is constrained in code (not DB enum):
# src/docsfy/storage.py
VALID_STATUSES = frozenset({"generating", "ready", "error", "aborted"})
Writes are idempotent via upsert on the composite PK:
# src/docsfy/storage.py
await db.execute(
"""INSERT INTO projects (name, ai_provider, ai_model, owner, repo_url, status, updated_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(name, ai_provider, ai_model, owner) DO UPDATE SET
repo_url = excluded.repo_url,
status = excluded.status,
error_message = NULL,
current_stage = NULL,
updated_at = CURRENT_TIMESTAMP""",
(name, ai_provider, ai_model, owner, repo_url, status),
)
users
Stores API-key-authenticated users and role-based access.
# src/docsfy/storage.py
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
api_key_hash TEXT NOT NULL UNIQUE,
role TEXT NOT NULL DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
API keys are not stored in plaintext; they are HMAC-hashed with ADMIN_KEY:
# src/docsfy/storage.py
def hash_api_key(key: str, hmac_secret: str = "") -> str:
"""Hash an API key with HMAC-SHA256 for storage.
Uses ADMIN_KEY as the HMAC secret so that even if the source is read,
keys cannot be cracked without the environment secret.
"""
# NOTE: ADMIN_KEY is used as the HMAC secret. Rotating ADMIN_KEY will
# invalidate all existing api_key_hash values, requiring all users to
# regenerate their API keys.
secret = hmac_secret or os.getenv("ADMIN_KEY", "")
if not secret:
msg = "ADMIN_KEY environment variable is required for key hashing"
raise RuntimeError(msg)
return hmac.new(secret.encode(), key.encode(), hashlib.sha256).hexdigest()
Note: Username
adminis reserved in the DB user model; environment-admin auth is handled separately viaADMIN_KEY.
project_access
Stores explicit ACL grants for sharing projects between users.
# src/docsfy/storage.py
await db.execute("""
CREATE TABLE IF NOT EXISTS project_access (
project_name TEXT NOT NULL,
project_owner TEXT NOT NULL DEFAULT '',
username TEXT NOT NULL,
PRIMARY KEY (project_name, project_owner, username)
)
""")
Grant semantics are project-level for a given owner (all variants under that name/owner):
# src/docsfy/storage.py
async def grant_project_access(
project_name: str, username: str, project_owner: str = ""
) -> None:
"""Grant a user access to all variants of a project."""
if not project_owner:
msg = "project_owner is required for access grants"
raise ValueError(msg)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT OR IGNORE INTO project_access (project_name, project_owner, username) VALUES (?, ?, ?)",
(project_name, project_owner, username),
)
await db.commit()
sessions
Stores browser session state (token, user, role flag, expiration).
# src/docsfy/storage.py
await db.execute("""
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
username TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
)
""")
Session tokens are opaque to clients and stored hashed in DB:
# src/docsfy/storage.py
SESSION_TTL_SECONDS = 28800 # 8 hours
SESSION_TTL_HOURS = SESSION_TTL_SECONDS // 3600
def _hash_session_token(token: str) -> str:
"""Hash a session token for storage."""
return hashlib.sha256(token.encode()).hexdigest()
async def create_session(
username: str, is_admin: bool = False, ttl_hours: int = SESSION_TTL_HOURS
) -> str:
"""Create an opaque session token."""
token = secrets.token_urlsafe(32)
token_hash = _hash_session_token(token)
expires_at = datetime.now(timezone.utc) + timedelta(hours=ttl_hours)
expires_str = expires_at.strftime("%Y-%m-%d %H:%M:%S")
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO sessions (token, username, is_admin, expires_at) VALUES (?, ?, ?, ?)",
(token_hash, username, 1 if is_admin else 0, expires_str),
)
await db.commit()
return token
async def get_session(token: str) -> dict[str, str | int | None] | None:
"""Look up a session. Returns None if expired or not found."""
token_hash = _hash_session_token(token)
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM sessions WHERE token = ? AND expires_at > datetime('now')",
(token_hash,),
)
row = await cursor.fetchone()
return dict(row) if row else None
And cookie max-age is aligned with session TTL:
# src/docsfy/main.py
response.set_cookie(
"docsfy_session",
session_token,
httponly=True,
samesite="strict",
secure=settings.secure_cookies,
max_age=SESSION_TTL_SECONDS,
)
Built-in migration behavior
Docsfy uses in-code, idempotent migrations inside init_db().
Note: There is no migration version table and no separate migration runner. Startup is the migration trigger.
1) projects PK migration (legacy 3-column to 4-column owner-aware PK)
Detection and migration are automatic:
# src/docsfy/storage.py
# Migration: convert old 3-column PK table to 4-column PK (with owner)
cursor = await db.execute("PRAGMA table_info(projects)")
columns = await cursor.fetchall()
col_names = [c[1] for c in columns]
needs_pk_migration = False
# Detect old schema: owner not in columns, or owner is nullable
if "owner" not in col_names:
needs_pk_migration = True
elif "ai_provider" not in col_names:
needs_pk_migration = True
else:
# Check if ai_provider is nullable (old schema)
for col in columns:
if col[1] == "ai_provider" and col[3] == 0: # notnull=0 means nullable
needs_pk_migration = True
break
# src/docsfy/storage.py
await db.execute(f"""
INSERT OR IGNORE INTO projects_new
(name, ai_provider, ai_model, owner, repo_url, status, current_stage,
last_commit_sha, last_generated, page_count, error_message,
plan_json, created_at, updated_at)
SELECT {", ".join(select_cols)}
FROM projects
""")
await db.execute("DROP TABLE projects")
await db.execute("ALTER TABLE projects_new RENAME TO projects")
Warning: This migration rewrites the table and drops the original after copy. Back up
docsfy.dbbefore major upgrades.Warning: Data copy uses
INSERT OR IGNORE; if legacy rows collide under the new composite key, ignored rows will not be migrated.
2) users.role backfill migration
# src/docsfy/storage.py
# Migration: add role column for existing DBs
try:
await db.execute(
"ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user'"
)
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
logger.exception("Migration failed while adding column")
raise
3) users.api_key_hash uniqueness migration
# src/docsfy/storage.py
cursor = await db.execute("PRAGMA index_list(users)")
indexes = await cursor.fetchall()
has_unique_key_index = False
for idx in indexes:
if idx[2]: # unique=1
cursor2 = await db.execute(f"PRAGMA index_info({idx[1]})")
idx_cols = await cursor2.fetchall()
for ic in idx_cols:
if ic[2] == "api_key_hash":
has_unique_key_index = True
break
if has_unique_key_index:
break
if not has_unique_key_index:
try:
await db.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_users_api_key_hash ON users (api_key_hash)"
)
except sqlite3.OperationalError as exc:
if "unique" not in str(exc).lower():
logger.exception("Migration failed while adding unique index")
raise
4) project_access.project_owner backfill migration
# src/docsfy/storage.py
# Migration: add project_owner column to project_access
try:
await db.execute(
"ALTER TABLE project_access ADD COLUMN project_owner TEXT NOT NULL DEFAULT ''"
)
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
logger.exception("Migration failed while adding column")
raise
5) Startup recovery behavior (non-schema but migration-adjacent)
On restart, in-progress generations are marked failed:
# src/docsfy/storage.py
cursor = await db.execute(
"UPDATE projects SET status = 'error', error_message = 'Server restarted during generation', current_stage = NULL WHERE status = 'generating'"
)
Expired sessions are pruned during app startup:
# src/docsfy/storage.py
async def cleanup_expired_sessions() -> None:
"""Remove expired sessions.
NOTE: This is called during application startup (lifespan) only.
Expired sessions accumulate between restarts but are harmless since
get_session() filters by expires_at. For long-running deployments,
consider calling this periodically (e.g., via a background task).
TODO: Add periodic cleanup for long-running instances.
"""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("DELETE FROM sessions WHERE expires_at <= datetime('now')")
await db.commit()
Integrity model and relationships
Docsfy intentionally enforces most relationships at application level (not SQLite foreign keys).
projects.ownerlogically maps tousers.usernameproject_access.usernamemaps tousers.usernameproject_access.(project_name, project_owner)maps to project identity (across variants)sessions.usernamemaps to user identity (including env-admin login path)
Cleanup logic is explicit in code:
# src/docsfy/storage.py
async def delete_user(username: str) -> bool:
"""Delete a user by username, invalidating all their sessions and cleaning up ACLs."""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("DELETE FROM sessions WHERE username = ?", (username,))
# Clean up owned projects and their access entries
await db.execute("DELETE FROM projects WHERE owner = ?", (username,))
await db.execute(
"DELETE FROM project_access WHERE project_owner = ?", (username,)
)
# Clean up ACL entries where user was granted access
await db.execute("DELETE FROM project_access WHERE username = ?", (username,))
cursor = await db.execute("DELETE FROM users WHERE username = ?", (username,))
await db.commit()
return cursor.rowcount > 0
# src/docsfy/storage.py
# Clean up project_access if no more variants remain for this name+owner
if cursor.rowcount > 0 and owner is not None:
remaining = await db.execute(
"SELECT COUNT(*) FROM projects WHERE name = ? AND owner = ?",
(name, owner),
)
row = await remaining.fetchone()
if row and row[0] == 0:
await db.execute(
"DELETE FROM project_access WHERE project_name = ? AND project_owner = ?",
(name, owner),
)
Warning: Because there are no DB-level foreign keys, direct/manual SQL writes can create orphaned rows that the app does not automatically reconcile unless specific cleanup paths are triggered.
Test and CI coverage for schema behavior
Key migration-adjacent behaviors are covered by tests:
# tests/test_storage.py
async def test_init_db_resets_orphaned_generating(db_path: Path) -> None:
from docsfy.storage import get_project, init_db, save_project
await save_project(
name="stuck-repo",
repo_url="https://github.com/org/stuck.git",
status="generating",
ai_provider="claude",
ai_model="opus",
owner="testuser",
)
# Simulate server restart by re-running init_db
await init_db()
project = await get_project(
"stuck-repo", ai_provider="claude", ai_model="opus", owner="testuser"
)
assert project is not None
assert project["status"] == "error"
assert "Server restarted" in project["error_message"]
# tests/test_storage.py
async def test_cleanup_expired_sessions(db_path: Path) -> None:
import aiosqlite
from docsfy.storage import (
DB_PATH,
_hash_session_token,
cleanup_expired_sessions,
create_session,
)
# Directly insert a session with a past expiration
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO sessions (token, username, is_admin, expires_at) VALUES (?, ?, ?, ?)",
("expired-token", "expired-user", 0, "2020-01-01T00:00:00"),
)
await db.commit()
# Create a valid session
valid_token = await create_session("valid-user", ttl_hours=8)
await cleanup_expired_sessions()
# Check that only the valid session remains
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute("SELECT COUNT(*) FROM sessions")
row = await cursor.fetchone()
assert row is not None
assert row[0] == 1
# Session tokens are stored as hashes
token_hash = _hash_session_token(valid_token)
cursor = await db.execute(
"SELECT username FROM sessions WHERE token = ?", (token_hash,)
)
row = await cursor.fetchone()
assert row is not None
assert row[0] == "valid-user"
CI test entry point:
# tox.toml
skipsdist = true
envlist = ["unittests"]
[env.unittests]
deps = ["uv"]
commands = [["uv", "run", "--extra", "dev", "pytest", "-n", "auto", "tests"]]
Note: Current tests validate startup recovery, session cleanup, owner scoping, and
data_dirinitialization, but they do not include explicit fixtures for every legacy schema branch ininit_db()(for example, a seeded pre-ownerprojectstable).