Generation Lifecycle
docsfy runs generation as a background task per variant (owner/project/provider/model).
A variant starts in generating, moves through internal stages, and finishes as ready, error, or aborted.
1) Request Intake and Variant Locking
Generation starts at POST /api/generate. The request model enforces source rules (repo_url XOR repo_path) and derives project_name.
```10:64:src/docsfy/models.py class GenerateRequest(BaseModel): repo_url: str | None = Field( default=None, description="Git repository URL (HTTPS or SSH)" ) repo_path: str | None = Field(default=None, description="Local git repository path") ai_provider: Literal["claude", "gemini", "cursor"] | None = None ai_model: str | None = None ai_cli_timeout: int | None = Field(default=None, gt=0) force: bool = Field( default=False, description="Force full regeneration, ignoring cache" )
@model_validator(mode="after")
def validate_source(self) -> GenerateRequest:
if not self.repo_url and not self.repo_path:
msg = "Either 'repo_url' or 'repo_path' must be provided"
raise ValueError(msg)
if self.repo_url and self.repo_path:
msg = "Provide either 'repo_url' or 'repo_path', not both"
raise ValueError(msg)
return self
The API path enforces permissions, prevents duplicate in-flight generation for the same variant key, persists `status="generating"`, then starts `_run_generation()` as an async task.
```422:505:src/docsfy/main.py
@app.post("/api/generate", status_code=202)
async def generate(request: Request, gen_request: GenerateRequest) -> dict[str, str]:
_require_write_access(request)
# Fix 9: Local repo path access requires admin privileges
if gen_request.repo_path and not request.state.is_admin:
raise HTTPException(
status_code=403,
detail="Local repo path access requires admin privileges",
)
# ... snip ...
# Fix 6: Use lock to prevent race condition between check and add
gen_key = f"{owner}/{project_name}/{ai_provider}/{ai_model}"
async with _gen_lock:
if gen_key in _generating:
raise HTTPException(
status_code=409,
detail=f"Variant '{project_name}/{ai_provider}/{ai_model}' is already being generated",
)
await save_project(
name=project_name,
repo_url=gen_request.repo_url or gen_request.repo_path or "",
status="generating",
ai_provider=ai_provider,
ai_model=ai_model,
owner=owner,
)
try:
task = asyncio.create_task(
_run_generation(
repo_url=gen_request.repo_url,
repo_path=gen_request.repo_path,
project_name=project_name,
ai_provider=ai_provider,
ai_model=ai_model,
ai_cli_timeout=gen_request.ai_cli_timeout
or settings.ai_cli_timeout,
force=gen_request.force,
owner=owner,
)
)
_generating[gen_key] = task
except Exception:
_generating.pop(gen_key, None)
raise
return {"project": project_name, "status": "generating"}
Note:
repo_pathis admin-only and must point to an absolute path containing.git.
2) Clone (or Local SHA Resolution)
_run_generation() always enters current_stage="cloning" first.
For remote sources, docsfy performs a shallow clone (--depth 1) and resolves HEAD SHA.
For local sources, it skips clone and reads local HEAD SHA directly.
```720:789:src/docsfy/main.py async def _run_generation( repo_url: str | None, repo_path: str | None, project_name: str, ai_provider: str, ai_model: str, ai_cli_timeout: int, force: bool = False, owner: str = "", ) -> None: gen_key = f"{owner}/{project_name}/{ai_provider}/{ai_model}" try: # ... snip ... await update_project_status( project_name, ai_provider, ai_model, status="generating", owner=owner, current_stage="cloning", )
if repo_path:
# Local repository - use directly, no cloning needed
local_path, commit_sha = get_local_repo_info(Path(repo_path))
await _generate_from_path(
local_path,
commit_sha,
repo_url or repo_path,
project_name,
ai_provider,
ai_model,
ai_cli_timeout,
force,
owner,
)
else:
# Remote repository - clone to temp dir
if repo_url is None:
msg = "repo_url must be provided for remote repositories"
raise ValueError(msg)
with tempfile.TemporaryDirectory() as tmp_dir:
repo_dir, commit_sha = await asyncio.to_thread(
clone_repo, repo_url, Path(tmp_dir)
)
await _generate_from_path(
repo_dir,
commit_sha,
repo_url or "",
project_name,
ai_provider,
ai_model,
ai_cli_timeout,
force,
owner,
)
```21:45:src/docsfy/repository.py
def clone_repo(repo_url: str, base_dir: Path) -> tuple[Path, str]:
repo_name = extract_repo_name(repo_url)
repo_path = base_dir / repo_name
logger.info(f"Cloning {repo_name} to {repo_path}")
result = subprocess.run(
["git", "clone", "--depth", "1", "--", repo_url, str(repo_path)],
capture_output=True,
text=True,
timeout=300,
)
if result.returncode != 0:
msg = f"Clone failed: {result.stderr or result.stdout}"
raise RuntimeError(msg)
sha_result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=repo_path,
capture_output=True,
text=True,
)
if sha_result.returncode != 0:
msg = f"Failed to get commit SHA: {sha_result.stderr or sha_result.stdout}"
raise RuntimeError(msg)
commit_sha = sha_result.stdout.strip()
logger.info(f"Cloned {repo_name} at commit {commit_sha[:8]}")
return repo_path, commit_sha
3) Planning
After source resolution, docsfy sets current_stage="planning" and calls the planner prompt.
The prompt explicitly tells the model to inspect source/config/tests/CI and output strict JSON.
```24:42:src/docsfy/prompts.py def build_planner_prompt(project_name: str) -> str: return f"""You are a technical documentation planner. Explore this repository thoroughly. Explore the source code, configuration files, tests, CI/CD pipelines, and project structure. Do NOT rely on the README — understand the project from its code and configuration.
Then create a documentation plan as a JSON object. The plan should cover: - Introduction and overview - Installation / getting started - Configuration (if applicable) - Usage guides for key features - API reference (if the project has an API) - Any other sections that would help users understand and use this project
Project name: {project_name}
CRITICAL: Your response must be ONLY a valid JSON object. No text before or after. No markdown code blocks.
Output format: {PLAN_SCHEMA}"""
The parsed plan is stored into DB (`plan_json`) before page generation so UI clients can show structure/progress.
## 4) Incremental Planning and Cache Decisions
When `force=true`, docsfy clears cached pages and resets `page_count` to `0`.
Without force, it can short-circuit to `ready/up_to_date` if commit SHA did not change.
```832:867:src/docsfy/main.py
if force:
cache_dir = get_project_cache_dir(project_name, ai_provider, ai_model, owner)
if cache_dir.exists():
shutil.rmtree(cache_dir)
logger.info(f"[{project_name}] Cleared cache (force=True)")
# Reset page count so API shows 0 during regeneration
await update_project_status(
project_name,
ai_provider,
ai_model,
status="generating",
owner=owner,
page_count=0,
)
else:
existing = await get_project(
project_name, ai_provider=ai_provider, ai_model=ai_model, owner=owner
)
if existing and existing.get("last_generated"):
old_sha = (
str(existing["last_commit_sha"])
if existing.get("last_commit_sha")
else None
)
if old_sha == commit_sha:
logger.info(
f"[{project_name}] Project is up to date at {commit_sha[:8]}"
)
await update_project_status(
project_name,
ai_provider,
ai_model,
status="ready",
owner=owner,
current_stage="up_to_date",
)
return
If SHA changed and prior plan exists, docsfy runs incremental planning (current_stage="incremental_planning") and removes only cached markdown files for affected slugs.
```913:955:src/docsfy/main.py await update_project_status( project_name, ai_provider, ai_model, status="generating", owner=owner, current_stage="incremental_planning", ) pages_to_regen = await run_incremental_planner( repo_dir, project_name, ai_provider, ai_model, changed_files, existing_plan, ai_cli_timeout, ) if pages_to_regen != ["all"]: # Delete only the cached pages that need regeneration for slug in pages_to_regen: # Validate slug to prevent path traversal if ( "/" in slug or "\" in slug or ".." in slug or slug.startswith(".") ): logger.warning( f"[{project_name}] Skipping invalid slug from incremental planner: {slug}" ) continue cache_file = cache_dir / f"{slug}.md" # Extra safety: ensure the resolved path is inside cache_dir try: cache_file.resolve().relative_to(cache_dir.resolve()) except ValueError: logger.warning( f"[{project_name}] Path traversal attempt in slug: {slug}" ) continue if cache_file.exists(): cache_file.unlink() use_cache = True
> **Tip:** Use `force: true` for a guaranteed clean rebuild when changing model/provider behavior.
## 5) Page Generation
docsfy sets `current_stage="generating_pages"` and calls `generate_all_pages()` with concurrency cap `MAX_CONCURRENT_PAGES = 5`.
Each page:
- Validates slug safety
- Uses cache if enabled
- Calls AI for markdown
- Writes cache file
- Updates `page_count` during generation
```66:131:src/docsfy/generator.py
async def generate_page(
repo_path: Path,
slug: str,
title: str,
description: str,
cache_dir: Path,
ai_provider: str,
ai_model: str,
ai_cli_timeout: int | None = None,
use_cache: bool = False,
project_name: str = "",
owner: str = "",
) -> str:
# Validate slug to prevent path traversal
if "/" in slug or "\\" in slug or slug.startswith(".") or ".." in slug:
msg = f"Invalid page slug: '{slug}'"
raise ValueError(msg)
cache_file = cache_dir / f"{slug}.md"
if use_cache and cache_file.exists():
logger.debug(f"[{_label}] Using cached page: {slug}")
return cache_file.read_text(encoding="utf-8")
# ... AI call snip ...
output = _strip_ai_preamble(output)
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(output, encoding="utf-8")
# Update page count in DB if project_name provided
if project_name:
existing_pages = len(list(cache_dir.glob("*.md")))
await update_project_status(
project_name,
ai_provider,
ai_model,
owner=owner,
status="generating",
page_count=existing_pages,
)
```168:201:src/docsfy/generator.py coroutines = [ generate_page( repo_path=repo_path, slug=p["slug"], title=p["title"], description=p["description"], cache_dir=cache_dir, ai_provider=ai_provider, ai_model=ai_model, ai_cli_timeout=ai_cli_timeout, use_cache=use_cache, project_name=project_name, owner=owner, ) for p in all_pages ]
results = await run_parallel_with_limit( coroutines, max_concurrency=MAX_CONCURRENT_PAGES ) pages: dict[str, str] = {} for page_info, result in zip(all_pages, results): if isinstance(result, Exception): logger.warning( f"[{_label}] Page generation failed for '{page_info['slug']}': {result}" ) pages[page_info["slug"]] = ( f"# {page_info['title']}\n\nDocumentation generation failed." ) else: pages[page_info["slug"]] = result
## 6) Rendering and Publish
After markdown generation, docsfy sets `current_stage="rendering"` and renders final static output.
`render_site()` recreates output, copies assets, writes both HTML and markdown pages, search index, and `llms` files.
```215:292:src/docsfy/renderer.py
def render_site(plan: dict[str, Any], pages: dict[str, str], output_dir: Path) -> None:
if output_dir.exists():
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
assets_dir = output_dir / "assets"
assets_dir.mkdir(exist_ok=True)
# Prevent GitHub Pages from running Jekyll
(output_dir / ".nojekyll").touch()
# ... snip ...
for idx, slug_info in enumerate(valid_slug_order):
# ... snip ...
(output_dir / f"{slug}.html").write_text(page_html, encoding="utf-8")
(output_dir / f"{slug}.md").write_text(md_content, encoding="utf-8")
search_index = _build_search_index(valid_pages, plan)
(output_dir / "search-index.json").write_text(
json.dumps(search_index), encoding="utf-8"
)
# Generate llms.txt files
llms_txt = _build_llms_txt(plan)
(output_dir / "llms.txt").write_text(llms_txt, encoding="utf-8")
llms_full_txt = _build_llms_full_txt(plan, valid_pages)
(output_dir / "llms-full.txt").write_text(llms_full_txt, encoding="utf-8")
Final publish state:
```988:1015:src/docsfy/main.py await update_project_status( project_name, ai_provider, ai_model, status="generating", owner=owner, current_stage="rendering", page_count=len(pages), )
site_dir = get_project_site_dir(project_name, ai_provider, ai_model, owner) render_site(plan=plan, pages=pages, output_dir=site_dir)
... snip ...
await update_project_status( project_name, ai_provider, ai_model, status="ready", owner=owner, current_stage=None, last_commit_sha=commit_sha, page_count=page_count, plan_json=json.dumps(plan), )
## Statuses and Stages
### Statuses
`storage.py` defines canonical lifecycle statuses:
```17:17:src/docsfy/storage.py
VALID_STATUSES = frozenset({"generating", "ready", "error", "aborted"})
| Status | Meaning | Terminal |
|---|---|---|
generating |
Task is active | No |
ready |
Docs published (or no-op up_to_date) |
Yes |
error |
Generation failed | Yes |
aborted |
Generation canceled by user/task | Yes |
current_stage values used in lifecycle
cloningplanningincremental_planninggenerating_pagesrenderingup_to_date(ready/no-op)null(done/aborted)
Note: The status page timeline UI is hardcoded to
cloning,planning,generating_pages, andrendering;incremental_planningis a backend stage but not in the stage-order array.
7) Monitoring in UI and API
/status/{name}/{provider}/{model} computes total planned pages from plan_json, then the page JS polls variant details every 3 seconds.
```369:401:src/docsfy/main.py @app.get("/status/{name}/{provider}/{model}", response_class=HTMLResponse) async def project_status_page( request: Request, name: str, provider: str, model: str ) -> HTMLResponse: # ... snip ... if project.get("plan_json"): try: plan_json = json.loads(str(project["plan_json"])) for group in plan_json.get("navigation", []): total_pages += len(group.get("pages", [])) except (json.JSONDecodeError, TypeError): plan_json = None
```948:1063:src/docsfy/templates/status.html
var PROJECT_NAME = {{ project.name | tojson }};
var PROJECT_PROVIDER = {{ project.ai_provider | tojson }};
var PROJECT_MODEL = {{ project.ai_model | tojson }};
var POLL_INTERVAL_MS = 3000;
var previousPageCount = {{ (project.page_count or 0) | tojson }};
var currentStatus = {{ project.status | tojson }};
var currentStage = {{ (project.current_stage or '') | tojson }} || null;
var STAGES = ['cloning', 'planning', 'generating_pages', 'rendering'];
8) Ready, Error, and Aborted End States
Ready
- Final state after successful render
- Also used for no-op updates with
current_stage="up_to_date" - Download endpoint requires
ready
```1086:1091:src/docsfy/main.py if project["status"] != "ready": raise HTTPException(status_code=400, detail="Variant not ready") project_owner = str(project.get("owner", "")) site_dir = get_project_site_dir(name, provider, model, project_owner) if not site_dir.exists(): raise HTTPException(status_code=404, detail="Site not found")
### Error
- Set when CLI availability fails or any unhandled exception occurs
- Carries `error_message`
- UI shows retry controls for `error` and `aborted`
### Aborted
- Variant abort endpoint cancels task, waits up to 5s, then marks `aborted`
```642:717:src/docsfy/main.py
@app.post("/api/projects/{name}/{provider}/{model}/abort")
async def abort_variant(
request: Request, name: str, provider: str, model: str
) -> dict[str, str]:
# ... snip ...
task.cancel()
try:
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError:
pass
except asyncio.TimeoutError as exc:
raise HTTPException(
status_code=409,
detail=f"Abort still in progress for '{gen_key}'. Please retry shortly.",
) from exc
await update_project_status(
name,
provider,
model,
status="aborted",
owner=key_owner,
error_message="Generation aborted by user",
current_stage=None,
)
Warning: On server startup, any orphaned
generatingrows are automatically converted toerrorwith"Server restarted during generation".
```182:185:src/docsfy/storage.py
Reset orphaned "generating" projects from previous server run
cursor = await db.execute( "UPDATE projects SET status = 'error', error_message = 'Server restarted during generation', current_stage = NULL WHERE status = 'generating'" )
## 9) Storage Layout and Runtime Configuration
Variant artifacts are stored under owner/project/provider/model paths:
```501:530:src/docsfy/storage.py
def get_project_dir(
name: str, ai_provider: str = "", ai_model: str = "", owner: str = ""
) -> Path:
# ... snip ...
safe_owner = _validate_owner(owner)
return PROJECTS_DIR / safe_owner / _validate_name(name) / ai_provider / ai_model
def get_project_site_dir(
name: str, ai_provider: str = "", ai_model: str = "", owner: str = ""
) -> Path:
return get_project_dir(name, ai_provider, ai_model, owner) / "site"
def get_project_cache_dir(
name: str, ai_provider: str = "", ai_model: str = "", owner: str = ""
) -> Path:
return get_project_dir(name, ai_provider, ai_model, owner) / "cache" / "pages"
Relevant runtime config:
```1:8:.env.example
REQUIRED - Admin key for user management (minimum 16 characters)
ADMIN_KEY=your-secure-admin-key-here-min-16-chars
AI Configuration
AI_PROVIDER=claude
[1m] = 1 million token context window, this is a valid model identifier
AI_MODEL=claude-opus-4-6[1m] AI_CLI_TIMEOUT=60
```1:13:docker-compose.yaml
services:
docsfy:
build: .
ports:
- "8000:8000"
env_file: .env
volumes:
- ./data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
10) How Lifecycle Behavior Is Validated
Integration tests verify the full mocked flow (generate -> ready -> serve -> download), and storage tests verify restart recovery behavior.
```52:109:tests/test_integration.py async def test_full_flow_mock(client: AsyncClient, tmp_path: Path) -> None: """Test the full generate -> status -> download flow with mocked AI.""" # ... snip ... await _run_generation( repo_url="https://github.com/org/test-repo.git", repo_path=None, project_name="test-repo", ai_provider="claude", ai_model="opus", ai_cli_timeout=60, owner="admin", )
# Check status
response = await client.get("/api/status")
assert response.status_code == 200
projects = response.json()["projects"]
assert len(projects) == 1
assert projects[0]["name"] == "test-repo"
assert projects[0]["status"] == "ready"
```1:7:tox.toml
skipsdist = true
envlist = ["unittests"]
[env.unittests]
deps = ["uv"]
commands = [["uv", "run", "--extra", "dev", "pytest", "-n", "auto", "tests"]]
Note: This repository does not include a checked-in
.github/workflowsdirectory; automation in-repo is defined viatoxand.pre-commit-config.yaml.