List Fitment Research
This page documents fitment research for contact lists: business logic, status transitions, and how User, Backend, DB, Airflow, Redis streams, and Docker consumers interact.
Status transitions (when and how)
All fitment behaviour is driven by status on three entities. Transitions are summarized below; the rest of the doc says who (which component) performs each transition.
fitment_runs
| Status | When it is set | Who sets it |
|---|---|---|
queued | Run is created at start | Backend |
in_progress | DAG triggered successfully; run is active | Backend (after DAG trigger) |
completed | All items for the run are done (no pending/processing) | Docker consumer (FitmentWorker) |
failed | DAG trigger failed, or run marked failed by logic | Backend or Docker consumer |
cancelled | User stops fitment | Backend |
paused | User pauses fitment | Backend |
in_progress (again) | User resumes fitment | Backend |
Only one run per list is “active” at a time (queued, in_progress, or paused). Completed, failed, and cancelled runs are terminal for that run.
fitment_items
| Status | When it is set | Who sets it |
|---|---|---|
pending | Item created with the run | Backend |
enqueued | Message pushed to Redis for this item | Airflow (research_fitment_dag) |
processing | Worker claimed the message and is processing | Docker consumer (FitmentWorker) |
succeeded | Fitment score and narrative written | Docker consumer (FitmentWorker) |
failed | AI or DB error, or stuck cleanup (see below) | Docker consumer or Airflow (stuck cleanup DAG) |
cancelled | User stopped fitment; item was pending/enqueued/processing | Backend |
paused | User paused fitment | Backend |
pending | User resumed; item was paused | Backend (then re-enqueued to Redis) |
research_results (fitment type)
| Status | When it is set | Who sets it |
|---|---|---|
pending | Record created for a member with prospect URL | Backend |
prospect_url_not_found | Member has no website URL; no queueing | Backend |
enqueued | Same moment fitment_items marked enqueued | Airflow |
processing | Worker is scoring this prospect | Docker consumer (FitmentWorker) |
completed | Fitment score and narrative stored | Docker consumer (FitmentWorker) |
failed | Error or stuck cleanup | Docker consumer or Airflow (stuck cleanup DAG) |
cancelled / paused | User stop/pause; later pending on resume | Backend |
list_research_summary and contact_lists (aggregate_fitment_score, fitment_research_type, fitment_last_updated) are updated by a Docker consumer (ListAggregateWorker) when it processes rs:list:aggregate messages; their “status” is derived from counts of research_results (e.g. in_progress vs completed).
Stuck-processing cleanup (research_stuck_processing_cleanup_dag)
What it does: Recovers items that never finished because a worker died or Redis was restarted (messages lost from PEL). It does not re-queue work; it only marks stuck rows as failed so the run can complete or be retried at a higher level.
Where: ems_services/airflow/dags/research_stuck_processing_cleanup_dag.py
Schedule: Every 1 minute.
Logic: Any row in research_results, research_items, or fitment_items with status = 'processing' and (processing_started_at is NULL or older than 8 minutes) is updated to status = 'failed'. For research_items and fitment_items, last_error is set to a fixed message indicating stuck processing (worker/Redis restart recovery).
Status transition:
- research_results:
processing→failed - research_items:
processing→failed(and last_error set) - fitment_items:
processing→failed(and last_error set)
Who runs it: Airflow (scheduler runs the DAG every minute; task runs in Airflow’s environment and uses the same tenant DB). No Redis or fitment workers are involved; it only reads/updates the DB.
After cleanup, FitmentWorker’s “check if run is done” logic will count these as failed, so fitment_runs can transition to completed (or stay in progress if there are still pending items).
Component roles
| Component | Role |
|---|---|
| User | Starts/stop/pause/resume fitment; views status, results, summary (via frontend). |
| Backend | PHP API (api/lists.php). Validates request, talks to DB, creates run/items, triggers Airflow DAG, implements stop/pause/resume (DB + Redis enqueue on resume). |
| DB | Tenant MySQL: fitment_runs, fitment_items, research_results, contact_list_members, contact_lists, list_research_summary, fitment_customize*. |
| Airflow | Runs research_fitment_dag (triggered by Backend) and research_stuck_processing_cleanup_dag (scheduled). DAGs read/write DB and push to Redis. |
| Redis streams | rs:fitment:score (fitment work), rs:list:aggregate (list-level aggregates). No status stored in Redis; status is in DB. |
| Docker consumers | FitmentWorker (consumes rs:fitment:score; scores via AI; updates DB; pushes to rs:list:aggregate). ListAggregateWorker (consumes rs:list:aggregate; updates contact_lists and list_research_summary). |
Feature 1: Start fitment research (full list)
Business goal: Score every list member (company) 0–100 on how well they fit the client’s offering, using the client website and optional B2B/B2C criteria (fitment customization).
Flow overview (by component)
100%Loading diagram…flowchart TD subgraph User U1[Start fitment for list] end subgraph Backend B1["Receive POST /lists/id/research/fitment"] B2[Validate list and members] B3[Create run and items in DB] B4[Trigger Airflow DAG] B5["Return success + dag_run_id"] end subgraph DB D1["fitment_runs: queued"] D2["research_results: pending / prospect_url_not_found"] D3["fitment_items: pending"] end subgraph Airflow A1[research_fitment_dag: load research data] A2[Push batches to Redis] A3[Mark items enqueued in DB] end subgraph Redis_streams R1[rs:fitment:score] end subgraph Docker_consumers C1[FitmentWorker: consume, score, update DB] C2[Push to rs:list:aggregate] C3[ListAggregateWorker: update list aggregates] end U1 --> B1 B1 --> B2 --> B3 --> D1 B3 --> D2 --> D3 B3 --> B4 --> A1 A1 --> A2 --> R1 A2 --> A3 A3 --> D2 A3 --> D3 B4 --> B5 R1 --> C1 C1 --> C2 --> C3
Status flow (start):
- Backend → DB: Create fitment_runs (status
queued), research_results (pendingorprospect_url_not_found), fitment_items (pending). - Backend → Airflow: Trigger DAG. On success, Backend → DB: fitment_runs
queued→in_progress. - Airflow → Redis: Push messages to rs:fitment:score. Airflow → DB: research_results and fitment_items
pending/enqueued→enqueued. - Docker FitmentWorker: Consume from rs:fitment:score; fitment_items and research_results →
processing; after AI and DB write →succeeded(orfailedon error); fitment_runs processed/failed counts updated. Push to rs:list:aggregate. - Docker ListAggregateWorker: Consume rs:list:aggregate; update contact_lists and list_research_summary.
- FitmentWorker: When no pending/processing items remain, fitment_runs
in_progress→completed, and push again to rs:list:aggregate.
Endpoints (Backend)
| Action | Method | Path |
|---|---|---|
| Start fitment | POST | /lists/{listId}/research/fitment |
| Stop fitment | POST | /lists/{listId}/research/fitment/stop |
| Pause fitment | POST | /lists/{listId}/research/fitment/pause |
| Resume fitment | POST | /lists/{listId}/research/fitment/resume |
| Research status | GET | /lists/{listId}/research/status?research_type=fitment |
| Research results | GET | /lists/{listId}/research/results?research_type=fitment |
| Research summary | GET | /lists/{listId}/research/summary?research_type=fitment |
| Cancel fitment | DELETE | /lists/{listId}/research/fitment |
| Cost estimates | GET | /lists/{listId}/research/cost-estimates?research_type=fitment |
All are under api/lists.php (JWT + RBAC).
Text-based implementation (start)
- User starts fitment for a list (optional body: client_website, selected_pitch_model, outreach_mode).
- Backend validates list and org, loads members. Resolves client website from request, Knowledge Base, or organization domain.
- Backend creates one fitment_runs row (status
queued), then for each member with a prospect URL a research_results row (statuspending) and a fitment_items row (statuspending). Members without URL get research_results only, statusprospect_url_not_found, and are not queued. - Backend optionally loads fitment customization (fitment_customize_combine / fitment_customize, custom_value) and passes it in DAG conf.
- Backend triggers Airflow DAG
research_fitment_dagwith list_id, research_ids, client_website, run_id, fitment_customize, etc. On failure, Backend sets fitment_runs and items tofailedand returns error. - On DAG trigger success, Backend sets fitment_runs to
in_progressand returns dag_run_id, research_ids, total_members to User. - Airflow (research_fitment_dag): loads research data, deduplicates by URL, pushes batches to Redis stream rs:fitment:score, then updates research_results and fitment_items to
enqueuedin DB. - Docker FitmentWorker consumes rs:fitment:score, sets items to
processing, calls AI (Perplexity), writes scores/narratives to research_results and contact_list_members, sets fitment_items tosucceededorfailed, updates fitment_runs counts. After each batch it pushes (list_id, research_type) to rs:list:aggregate. - Docker ListAggregateWorker consumes rs:list:aggregate and updates contact_lists (aggregate_fitment_score, fitment_research_type, fitment_last_updated) and list_research_summary in DB.
- When FitmentWorker sees no pending/processing fitment_items for the run, it sets fitment_runs to
completedand pushes once more to rs:list:aggregate.
Feature 2: Stop, pause, and resume fitment
Business goal: Let the user cancel an active run (stop), temporarily halt it (pause), or continue it (resume). All state changes are done in the Backend and DB; Redis is only used on resume to re-enqueue work.
Flow overview (by component)
100%Loading diagram…flowchart TD subgraph User U1[Stop / Pause / Resume] end subgraph Backend B1["POST /lists/id/research/fitment/stop or pause or resume"] B2[Find active or paused run in DB] B3[Update run and items in DB] B4{Resume?} B5[Re-enqueue paused items to Redis] end subgraph DB D1[fitment_runs status] D2[fitment_items status] D3[research_results status] end subgraph Redis_streams R1[rs:fitment:score] end U1 --> B1 --> B2 --> B3 --> D1 B3 --> D2 --> D3 B3 --> B4 B4 -->|Yes| B5 --> R1
Status transitions:
- Stop: Backend sets fitment_runs →
cancelled, fitment_items (pending/enqueued/processing) →cancelled, research_results (same) →cancelled. Workers that later see the run will skip it. - Pause: Backend sets fitment_runs →
paused, fitment_items (pending/enqueued/processing) →paused, research_results (same) →paused. No Redis enqueue. - Resume: Backend finds run with status
paused. Sets fitment_runs →in_progress, fitment_itemspaused→pending, research_resultspaused→pending, then pushes batches for those research_ids to Redis rs:fitment:score. Docker FitmentWorker will consume and process them again.
Feature 3: Get status, results, and summary
Business goal: Let the user (or UI) see run progress, per-member fitment data, and list-level aggregates. All read paths go User → Backend → DB; no Airflow or Redis involved.
Flow overview (by component)
100%Loading diagram…flowchart LR subgraph User U1[GET status / results / summary] end subgraph Backend B1["GET /lists/id/research/status or results or summary"] B2[Query DB] end subgraph DB D1["research_results, list_research_summary, fitment_runs"] D2[contact_lists aggregates] end U1 --> B1 --> B2 --> D1 B2 --> D2
- Status: Backend reads research_results, list_research_summary, and optionally fitment_runs for the list and research_type
fitment, returns counts by status and paginated member-level state. - Results: Backend reads research_results (fitment_score, fitment_narrative, rag_status, etc.) with optional pitch and member info from DB.
- Summary: Backend reads list_research_summary and contact_lists (aggregate_fitment_score, fitment_research_type, fitment_last_updated).
End-to-end sequence (components only)
100%Loading diagram…sequenceDiagram participant User participant Backend participant DB participant Airflow participant Redis as "Redis streams" participant Fitment as "Docker FitmentWorker" participant Aggregate as "Docker ListAggregateWorker" User->>Backend: POST lists research fitment Backend->>DB: Create fitment_runs research_results fitment_items Backend->>Airflow: Trigger research_fitment_dag Airflow->>DB: Mark items enqueued Airflow->>Redis: Push to rs fitment score Backend-->>User: 200 dag_run_id research_ids loop Per batch Fitment->>Redis: Consume rs fitment score Fitment->>DB: processing to succeeded or failed update counts Fitment->>Redis: Push to rs list aggregate Aggregate->>Redis: Consume rs list aggregate Aggregate->>DB: Update contact_lists list_research_summary end Fitment->>DB: fitment_runs to completed push rs list aggregate Aggregate->>DB: Final list aggregate update User->>Backend: GET status results summary Backend->>DB: Read research_results list_research_summary contact_lists Backend-->>User: JSON response
Where stuck cleanup fits
Airflow runs research_stuck_processing_cleanup_dag every minute. It only touches DB: it finds research_results, research_items, and fitment_items in processing for too long (or with NULL processing_started_at) and sets them to failed. FitmentWorker and ListAggregateWorker do not run this; they only react to the updated DB state (e.g. run can move to completed once stuck items are marked failed).
Tables (reference)
| Table | Role |
|---|---|
| fitment_runs | One row per run; status: queued → in_progress → completed/failed/cancelled/paused. |
| fitment_items | One row per (run_id, research_id); status: pending → enqueued → processing → succeeded/failed (or cancelled/paused by user; stuck cleanup: processing → failed). |
| research_results | One row per member per run; status in sync with fitment_items; holds fitment_score, fitment_narrative, rag_status, etc. |
| contact_list_members | Per-member fitment_score, research_type, research_completed_at (updated by FitmentWorker). |
| contact_lists | aggregate_fitment_score, fitment_research_type, fitment_last_updated (updated by ListAggregateWorker). |
| list_research_summary | Per list + research_type: counts and average_fitment_score (updated by ListAggregateWorker). |
| fitment_customize_combine / fitment_customize, custom_value | Optional B2B/B2C criteria used when scoring. |