List Fitment Research

This page documents fitment research for contact lists: business logic, status transitions, and how User, Backend, DB, Airflow, Redis streams, and Docker consumers interact.


Status transitions (when and how)

All fitment behaviour is driven by status on three entities. Transitions are summarized below; the rest of the doc says who (which component) performs each transition.

fitment_runs

StatusWhen it is setWho sets it
queuedRun is created at startBackend
in_progressDAG triggered successfully; run is activeBackend (after DAG trigger)
completedAll items for the run are done (no pending/processing)Docker consumer (FitmentWorker)
failedDAG trigger failed, or run marked failed by logicBackend or Docker consumer
cancelledUser stops fitmentBackend
pausedUser pauses fitmentBackend
in_progress (again)User resumes fitmentBackend

Only one run per list is “active” at a time (queued, in_progress, or paused). Completed, failed, and cancelled runs are terminal for that run.

fitment_items

StatusWhen it is setWho sets it
pendingItem created with the runBackend
enqueuedMessage pushed to Redis for this itemAirflow (research_fitment_dag)
processingWorker claimed the message and is processingDocker consumer (FitmentWorker)
succeededFitment score and narrative writtenDocker consumer (FitmentWorker)
failedAI or DB error, or stuck cleanup (see below)Docker consumer or Airflow (stuck cleanup DAG)
cancelledUser stopped fitment; item was pending/enqueued/processingBackend
pausedUser paused fitmentBackend
pendingUser resumed; item was pausedBackend (then re-enqueued to Redis)

research_results (fitment type)

StatusWhen it is setWho sets it
pendingRecord created for a member with prospect URLBackend
prospect_url_not_foundMember has no website URL; no queueingBackend
enqueuedSame moment fitment_items marked enqueuedAirflow
processingWorker is scoring this prospectDocker consumer (FitmentWorker)
completedFitment score and narrative storedDocker consumer (FitmentWorker)
failedError or stuck cleanupDocker consumer or Airflow (stuck cleanup DAG)
cancelled / pausedUser stop/pause; later pending on resumeBackend

list_research_summary and contact_lists (aggregate_fitment_score, fitment_research_type, fitment_last_updated) are updated by a Docker consumer (ListAggregateWorker) when it processes rs:list:aggregate messages; their “status” is derived from counts of research_results (e.g. in_progress vs completed).


Stuck-processing cleanup (research_stuck_processing_cleanup_dag)

What it does: Recovers items that never finished because a worker died or Redis was restarted (messages lost from PEL). It does not re-queue work; it only marks stuck rows as failed so the run can complete or be retried at a higher level.

Where: ems_services/airflow/dags/research_stuck_processing_cleanup_dag.py
Schedule: Every 1 minute.
Logic: Any row in research_results, research_items, or fitment_items with status = 'processing' and (processing_started_at is NULL or older than 8 minutes) is updated to status = 'failed'. For research_items and fitment_items, last_error is set to a fixed message indicating stuck processing (worker/Redis restart recovery).

Status transition:

  • research_results: processingfailed
  • research_items: processingfailed (and last_error set)
  • fitment_items: processingfailed (and last_error set)

Who runs it: Airflow (scheduler runs the DAG every minute; task runs in Airflow’s environment and uses the same tenant DB). No Redis or fitment workers are involved; it only reads/updates the DB.

After cleanup, FitmentWorker’s “check if run is done” logic will count these as failed, so fitment_runs can transition to completed (or stay in progress if there are still pending items).


Component roles

ComponentRole
UserStarts/stop/pause/resume fitment; views status, results, summary (via frontend).
BackendPHP API (api/lists.php). Validates request, talks to DB, creates run/items, triggers Airflow DAG, implements stop/pause/resume (DB + Redis enqueue on resume).
DBTenant MySQL: fitment_runs, fitment_items, research_results, contact_list_members, contact_lists, list_research_summary, fitment_customize*.
AirflowRuns research_fitment_dag (triggered by Backend) and research_stuck_processing_cleanup_dag (scheduled). DAGs read/write DB and push to Redis.
Redis streamsrs:fitment:score (fitment work), rs:list:aggregate (list-level aggregates). No status stored in Redis; status is in DB.
Docker consumersFitmentWorker (consumes rs:fitment:score; scores via AI; updates DB; pushes to rs:list:aggregate). ListAggregateWorker (consumes rs:list:aggregate; updates contact_lists and list_research_summary).

Feature 1: Start fitment research (full list)

Business goal: Score every list member (company) 0–100 on how well they fit the client’s offering, using the client website and optional B2B/B2C criteria (fitment customization).

Flow overview (by component)

100%

Status flow (start):

  1. Backend → DB: Create fitment_runs (status queued), research_results (pending or prospect_url_not_found), fitment_items (pending).
  2. Backend → Airflow: Trigger DAG. On success, Backend → DB: fitment_runs queuedin_progress.
  3. Airflow → Redis: Push messages to rs:fitment:score. Airflow → DB: research_results and fitment_items pending/enqueuedenqueued.
  4. Docker FitmentWorker: Consume from rs:fitment:score; fitment_items and research_resultsprocessing; after AI and DB write → succeeded (or failed on error); fitment_runs processed/failed counts updated. Push to rs:list:aggregate.
  5. Docker ListAggregateWorker: Consume rs:list:aggregate; update contact_lists and list_research_summary.
  6. FitmentWorker: When no pending/processing items remain, fitment_runs in_progresscompleted, and push again to rs:list:aggregate.

Endpoints (Backend)

ActionMethodPath
Start fitmentPOST/lists/{listId}/research/fitment
Stop fitmentPOST/lists/{listId}/research/fitment/stop
Pause fitmentPOST/lists/{listId}/research/fitment/pause
Resume fitmentPOST/lists/{listId}/research/fitment/resume
Research statusGET/lists/{listId}/research/status?research_type=fitment
Research resultsGET/lists/{listId}/research/results?research_type=fitment
Research summaryGET/lists/{listId}/research/summary?research_type=fitment
Cancel fitmentDELETE/lists/{listId}/research/fitment
Cost estimatesGET/lists/{listId}/research/cost-estimates?research_type=fitment

All are under api/lists.php (JWT + RBAC).

Text-based implementation (start)

  1. User starts fitment for a list (optional body: client_website, selected_pitch_model, outreach_mode).
  2. Backend validates list and org, loads members. Resolves client website from request, Knowledge Base, or organization domain.
  3. Backend creates one fitment_runs row (status queued), then for each member with a prospect URL a research_results row (status pending) and a fitment_items row (status pending). Members without URL get research_results only, status prospect_url_not_found, and are not queued.
  4. Backend optionally loads fitment customization (fitment_customize_combine / fitment_customize, custom_value) and passes it in DAG conf.
  5. Backend triggers Airflow DAG research_fitment_dag with list_id, research_ids, client_website, run_id, fitment_customize, etc. On failure, Backend sets fitment_runs and items to failed and returns error.
  6. On DAG trigger success, Backend sets fitment_runs to in_progress and returns dag_run_id, research_ids, total_members to User.
  7. Airflow (research_fitment_dag): loads research data, deduplicates by URL, pushes batches to Redis stream rs:fitment:score, then updates research_results and fitment_items to enqueued in DB.
  8. Docker FitmentWorker consumes rs:fitment:score, sets items to processing, calls AI (Perplexity), writes scores/narratives to research_results and contact_list_members, sets fitment_items to succeeded or failed, updates fitment_runs counts. After each batch it pushes (list_id, research_type) to rs:list:aggregate.
  9. Docker ListAggregateWorker consumes rs:list:aggregate and updates contact_lists (aggregate_fitment_score, fitment_research_type, fitment_last_updated) and list_research_summary in DB.
  10. When FitmentWorker sees no pending/processing fitment_items for the run, it sets fitment_runs to completed and pushes once more to rs:list:aggregate.

Feature 2: Stop, pause, and resume fitment

Business goal: Let the user cancel an active run (stop), temporarily halt it (pause), or continue it (resume). All state changes are done in the Backend and DB; Redis is only used on resume to re-enqueue work.

Flow overview (by component)

100%

Status transitions:

  • Stop: Backend sets fitment_runscancelled, fitment_items (pending/enqueued/processing) → cancelled, research_results (same) → cancelled. Workers that later see the run will skip it.
  • Pause: Backend sets fitment_runspaused, fitment_items (pending/enqueued/processing) → paused, research_results (same) → paused. No Redis enqueue.
  • Resume: Backend finds run with status paused. Sets fitment_runsin_progress, fitment_items pausedpending, research_results pausedpending, then pushes batches for those research_ids to Redis rs:fitment:score. Docker FitmentWorker will consume and process them again.

Feature 3: Get status, results, and summary

Business goal: Let the user (or UI) see run progress, per-member fitment data, and list-level aggregates. All read paths go User → Backend → DB; no Airflow or Redis involved.

Flow overview (by component)

100%
  • Status: Backend reads research_results, list_research_summary, and optionally fitment_runs for the list and research_type fitment, returns counts by status and paginated member-level state.
  • Results: Backend reads research_results (fitment_score, fitment_narrative, rag_status, etc.) with optional pitch and member info from DB.
  • Summary: Backend reads list_research_summary and contact_lists (aggregate_fitment_score, fitment_research_type, fitment_last_updated).

End-to-end sequence (components only)

100%

Where stuck cleanup fits

Airflow runs research_stuck_processing_cleanup_dag every minute. It only touches DB: it finds research_results, research_items, and fitment_items in processing for too long (or with NULL processing_started_at) and sets them to failed. FitmentWorker and ListAggregateWorker do not run this; they only react to the updated DB state (e.g. run can move to completed once stuck items are marked failed).


Tables (reference)

TableRole
fitment_runsOne row per run; status: queued → in_progress → completed/failed/cancelled/paused.
fitment_itemsOne row per (run_id, research_id); status: pending → enqueued → processing → succeeded/failed (or cancelled/paused by user; stuck cleanup: processing → failed).
research_resultsOne row per member per run; status in sync with fitment_items; holds fitment_score, fitment_narrative, rag_status, etc.
contact_list_membersPer-member fitment_score, research_type, research_completed_at (updated by FitmentWorker).
contact_listsaggregate_fitment_score, fitment_research_type, fitment_last_updated (updated by ListAggregateWorker).
list_research_summaryPer list + research_type: counts and average_fitment_score (updated by ListAggregateWorker).
fitment_customize_combine / fitment_customize, custom_valueOptional B2B/B2C criteria used when scoring.