Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5b301b7
feat(assessment): Implement L1 pipeline with topic relevance and dupl…
vprashrex May 27, 2026
b412829
feat(export): Expand output columns to include topic relevance and du…
vprashrex May 28, 2026
c12ac18
feat(post-processing): Implement post-processing configuration for as…
vprashrex May 31, 2026
c1791d5
feat(assessment): Enhance attachment handling in L1 pipeline with mix…
vprashrex Jun 2, 2026
97651d2
Merge branch 'main' into feat/assessment-pipeline-l1
vprashrex Jun 2, 2026
98acf86
feat(tests): update assessment run status to 'l2_processing' and refa…
vprashrex Jun 2, 2026
0addb71
refactor(tests): streamline patching of run_assessment_run in TestSta…
vprashrex Jun 2, 2026
ad8e29f
feat(tests): add comprehensive tests for L1 duplicate detection and p…
vprashrex Jun 2, 2026
e020717
feat: implement prefilter pipeline with topic relevance and duplicate…
vprashrex Jun 2, 2026
4a4e4f8
Refactor assessment tests and add new functionality
vprashrex Jun 4, 2026
bb30f88
feat: refactor assessment prefilter configuration and enhance pipelin…
vprashrex Jun 4, 2026
e89f1f2
feat: enhance error handling in assessment pipeline and improve attac…
vprashrex Jun 4, 2026
87ee6a5
feat: add error handling for deterministic failures in assessment eva…
vprashrex Jun 4, 2026
61798b6
feat: update assessment prefilter constants for provider and model co…
vprashrex Jun 4, 2026
d4d88a2
feat: enhance assessment tests with additional attachment handling an…
vprashrex Jun 4, 2026
827547d
feat: improve test readability by formatting patch calls in assessmen…
vprashrex Jun 4, 2026
8bd54a7
feat: add commented alternative model and duplicate store configurati…
vprashrex Jun 4, 2026
26c4230
Merge branch 'main' into feat/assessment-pipeline-l1
vprashrex Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Add prefilter columns and pipeline stage-machine columns to assessment_run
Revision ID: 064
Revises: 063
Create Date: 2026-05-27 00:00:00.000000
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

revision = "064"
down_revision = "063"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"assessment_run",
sa.Column(
"prefilter_object_store_url",
sa.String(),
nullable=True,
comment="S3 URL of prefilter results JSON",
),
)
op.add_column(
"assessment_run",
sa.Column(
"prefilter_total_rows",
sa.Integer(),
nullable=True,
comment="Total rows fed into the prefilter stages",
),
)
op.add_column(
"assessment_run",
sa.Column(
"prefilter_total_passed",
sa.Integer(),
nullable=True,
comment="Rows that passed the go/no-go gates and went to L2",
),
)
op.add_column(
"assessment_run",
sa.Column(
"prefilter_total_rejected",
sa.Integer(),
nullable=True,
comment="Rows rejected by a go/no-go gate",
),
)
op.add_column(
"assessment_run",
sa.Column(
"stage",
sa.String(),
nullable=True,
comment=(
"Current pipeline stage: PRE_FILTER_TOPIC_RELEVANCE, "
"PRE_FILTER_DUPLICATE_DETECTION, L2_ASSESSMENT, COMPLETED, FAILED"
),
),
)
op.add_column(
"assessment_run",
sa.Column(
"stage_status",
sa.String(),
nullable=True,
comment="Status of stage: PENDING, PROCESSING, COMPLETED, FAILED",
),
)
op.add_column(
"assessment_run",
sa.Column(
"pipeline",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
comment="Ordered stage config driving execution: {'stages': [...]}",
),
)
op.add_column(
"assessment_run",
sa.Column(
"stage_batches",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
comment="Map of stage name -> batch_job id, for per-stage result lookup",
),
)


def downgrade() -> None:
op.drop_column("assessment_run", "stage_batches")
op.drop_column("assessment_run", "pipeline")
op.drop_column("assessment_run", "stage_status")
op.drop_column("assessment_run", "stage")
op.drop_column("assessment_run", "prefilter_total_rejected")
op.drop_column("assessment_run", "prefilter_total_passed")
op.drop_column("assessment_run", "prefilter_total_rows")
op.drop_column("assessment_run", "prefilter_object_store_url")
5 changes: 5 additions & 0 deletions backend/app/api/docs/assessment/resume_run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Resume a failed assessment run from its failed stage.

Re-runs the same child run in place, starting at the stage that failed.
Stages that already completed are reused (their batch results are not
recomputed). Only valid when the run is in a failed state.
15 changes: 15 additions & 0 deletions backend/app/api/docs/assessment/update_post_processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Save post-processing config for a single assessment run.

Stores the config inside the run's `input` JSON blob (key
`post_processing_config`). It is applied at export/preview time and never
re-runs the LLM, so it can be edited after the run completes.

The config has three optional sections:

- `computed_columns`: derived columns from formulas, e.g.
`{"name": "Total_Score", "formula": "@Novelty_score + @Usefulness_score"}`.
Formulas reference columns with `@` and support `+ - * /` and parentheses.
- `filter`: row filters combined with AND logic.
- `sort`: sort rules applied in priority order.

Pass `null` (or an empty body) to clear post-processing for the run.
80 changes: 78 additions & 2 deletions backend/app/api/routes/assessment/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import logging
from typing import Any, Literal

from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
from app.crud.assessment import (
get_assessment_by_id,
update_run_post_processing_config,
)
from app.crud.assessment import (
get_assessment_run_by_id as get_run_by_id,
)
from app.crud.assessment import (
list_assessment_runs as list_runs,
)
from app.models.assessment import (
Expand All @@ -21,6 +26,9 @@
AssessmentRunPublic,
)
from app.models.evaluation import EvaluationDataset
from app.services.assessment.service import (
resume_assessment_run as resume_run,
)
from app.services.assessment.service import (
retry_assessment_run as retry_run,
)
Expand All @@ -33,6 +41,7 @@
load_export_rows_for_run,
sort_export_rows,
)
from app.services.assessment.utils.post_processing import apply_post_processing
from app.utils import APIResponse, load_description

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,6 +74,13 @@ def _build_run_public(
total_items=run.total_items,
error_message=run.error_message,
input=run.input,
prefilter_total_rows=run.prefilter_total_rows,
prefilter_total_passed=run.prefilter_total_passed,
prefilter_total_rejected=run.prefilter_total_rejected,
stage=run.stage,
stage_status=run.stage_status,
pipeline=run.pipeline,
post_processing_config=(run.input or {}).get("post_processing_config"),
inserted_at=run.inserted_at,
updated_at=run.updated_at,
)
Expand Down Expand Up @@ -127,6 +143,34 @@ def retry_assessment_run(
return APIResponse.success_response(data=result)


@router.post(
"/runs/{run_id}/resume",
description=load_description("assessment/resume_run.md"),
response_model=APIResponse[AssessmentResponse],
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
)
def resume_assessment_run(
run_id: int,
session: SessionDep,
auth_context: AuthContextDep,
) -> APIResponse[AssessmentResponse]:
"""Resume a failed child run from its failed stage, reusing completed stages."""
run = get_run_by_id(
session=session,
run_id=run_id,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)

result = resume_run(
session=session,
run=run,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)
return APIResponse.success_response(data=result)


@router.get(
"/runs",
description=load_description("assessment/list_runs.md"),
Expand Down Expand Up @@ -212,12 +256,44 @@ def export_assessment_run_results(
)
)

post_processing_config = (run.input or {}).get("post_processing_config") or None
base_label = assessment.experiment_name if assessment else f"run_{run.id}"

if export_format != "json":
return build_export_response(
export_rows=export_rows,
export_format=export_format,
base_name=f"{base_label}_run_{run.id}_results",
post_processing_config=post_processing_config,
)

return APIResponse.success_response(data=build_json_export_rows(export_rows))
rows = build_json_export_rows(export_rows)
rows = apply_post_processing(rows, post_processing_config)
return APIResponse.success_response(data=rows)


@router.patch(
"/runs/{run_id}/post-processing",
description=load_description("assessment/update_post_processing.md"),
response_model=APIResponse[AssessmentRunPublic],
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
)
def update_post_processing(
run_id: int,
session: SessionDep,
auth_context: AuthContextDep,
config: dict[str, Any] | None = Body(default=None),
) -> APIResponse[AssessmentRunPublic]:
"""Save post-processing config (computed columns, sort, filter) for a run."""
run = get_run_by_id(
session=session,
run_id=run_id,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")

run = update_run_post_processing_config(session=session, run=run, config=config)

return APIResponse.success_response(data=_build_run_public(session, run))
23 changes: 23 additions & 0 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,29 @@ def run_tts_batch_submission(
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_assessment_pipeline")
def run_assessment_pipeline(
self,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
**kwargs,
):
Comment on lines +237 to +244
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don't silently swallow extra task kwargs here.

run_assessment_pipeline never forwards **kwargs, so unexpected task arguments are accepted and dropped instead of failing fast. This segment also misses the required type annotations on the new function signature.

Suggested fix
 def run_assessment_pipeline(
-    self,
+    self: celery.Task,
     run_id: int,
     organization_id: int,
     project_id: int,
     trace_id: str,
-    **kwargs,
-):
+) -> None:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def run_assessment_pipeline(
self,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
**kwargs,
):
def run_assessment_pipeline(
self: celery.Task,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
) -> None:
🧰 Tools
🪛 Ruff (0.15.15)

[warning] 243-243: Unused function argument: kwargs

(ARG001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/celery/tasks/job_execution.py` around lines 237 - 244, The
function run_assessment_pipeline currently accepts but silently drops **kwargs
and lacks type annotations; update its signature to include proper type hints
(e.g., run_id: int, organization_id: int, project_id: int, trace_id: str,
**kwargs: Any) and add a return type (likely -> None). Then either forward
**kwargs into the downstream pipeline invocation inside run_assessment_pipeline
(so extra task args are preserved) or explicitly validate/raise (TypeError) for
unexpected keys if they must not be accepted; ensure you import Any from typing
and apply the change where run_assessment_pipeline is defined.

from app.services.assessment.tasks import execute_assessment_pipeline

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_assessment_pipeline(
run_id=run_id,
organization_id=organization_id,
project_id=project_id,
),
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_tts_result_processing")
def run_tts_result_processing(
Expand Down
4 changes: 4 additions & 0 deletions backend/app/crud/assessment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
list_assessment_runs,
list_assessments,
recompute_assessment_status,
update_assessment_run_prefilter_stats,
update_assessment_run_status,
update_run_post_processing_config,
)
from app.crud.assessment.dataset import (
create_assessment_dataset,
Expand Down Expand Up @@ -42,5 +44,7 @@
"list_assessment_datasets",
"list_assessments",
"recompute_assessment_status",
"update_assessment_run_prefilter_stats",
"update_assessment_run_status",
"update_run_post_processing_config",
]
Loading
Loading