Indexing Pipeline

Detailed workflow for the indexing pipeline

Overview

The indexing pipeline is an event-driven system that processes account data and makes it searchable. It handles data collection, tag extraction, and index updates asynchronously through a queue-based architecture.

Pipeline Architecture

flowchart TB subgraph Triggers[Event Triggers] A1[Term Approved] A2[Asset Uploaded] A3[Account Updated] A4[Social Data Changed] end subgraph Queue[Indexing Queue] B[likeness_need_indexing_queue] end subgraph Daemon[Indexing Daemon] C[Cron: Every 2 Minutes] D[Process Events] end subgraph Processing[Event Processing] E[Check Base Requirements] F[Check Data Sufficiency] G[Trigger Data Collection] H[Upsert to Index] end subgraph SubProcesses[Sub-Processes] I[AI Tag Generation] J[Social Scraping] end A1 --> B A2 --> B A3 --> B A4 --> B B --> C C --> D D --> E E --> F F -->|Sufficient| H F -->|Insufficient| G G --> I G --> J I --> B J --> B H --> K[Search Ready]

Event Lifecycle

Events progress through these statuses:

  1. PENDING - Waiting to be processed
  2. IN_PROGRESS - Currently being processed
  3. COMPLETED - Successfully processed
  4. FAILED - Permanently failed (max retries exceeded)
  5. DISCARDED - Duplicate or insufficient requirements
  6. TIMEOUT - Exceeded 5-minute processing window
  7. AWAITING_RETRY - Waiting for retry after recoverable error

Event Types

Events are created with different elementType values:

  • IMAGE_ASSET - Image asset uploaded
  • VOICE_ASSET - Voice asset uploaded
  • VOICE_TERM - VoiceOver term approved
  • IMAGE_TERM - Image term approved
  • LIKENESS_TERM - Likeness term approved
  • PROFILE_DESCRIPTION - Account description updated
  • SOCIAL_DATA - Social data changed

Daemon Workflow

The indexing daemon runs every 2 minutes via cron job (GET /api/indexing/process-queue).

Process Locking

Function: acquireLock(INDEXING_DAEMON_PID, 10)

Before processing begins, the daemon acquires a distributed process lock to prevent concurrent execution. This is critical because:

  • Cron Overlap Prevention: The cron job runs every 2 minutes, but processing can take up to 5 minutes (max duration). Without locking, multiple instances could run simultaneously.
  • Stale Lock Recovery: If a daemon crashes or times out, locks older than 10 minutes are automatically considered stale and can be acquired by a new instance.
  • Heartbeat Mechanism: During long-running batches, the daemon calls updateProcessLock() after each event to keep the lock alive.

Lock Lifecycle:

  1. Acquire: Attempts to acquire lock atomically (returns early if already locked)
  2. Heartbeat: Updates lastAcquired timestamp after each event
  3. Release: Always releases lock in finally block, even on errors

Implementation:

  • Lock functions: packages/db/src/access/processLock.ts
  • Lock table: process_lock (id: "INDEXING_DAEMON", lock_flag, last_acquired)

Step 1: Maintenance

Function: markTimedOutEvents()

Marks any IN_PROGRESS events older than 5 minutes as TIMEOUT. This handles cases where sub-processes fail silently or take too long.

Implementation: packages/db/src/access/likenessNeedIndexingQueue.ts

Step 2: Process Retries

Function: processRetryableEvents()

Moves AWAITING_RETRY events whose retryAt time has passed back to PENDING status. These events will be processed in the next iteration.

Implementation: packages/db/src/access/likenessNeedIndexingQueue.ts

Step 3: Process Queue

Function: indexingDaemon()

Processes up to INDEXING_DAEMON_BATCH_SIZE events (default: 250):

  1. Get oldest PENDING event (FIFO)
  2. Call processNextEvent() to process it
  3. Wait 1 second between events
  4. Update lock heartbeat after each event
  5. Stop if no more events

Implementation: packages/likeness-search/src/indexingDaemon.ts

Event Processing Flow

Each event goes through this detailed flow:

flowchart TD A[Get Oldest PENDING Event] --> B{Event Exists?} B -->|No| C[Return False] B -->|Yes| D{Active Event for Account?} D -->|Yes| E{Is TERM Event?} E -->|Yes| F[Return False - Wait] E -->|No| G[Mark DISCARDED] D -->|No| H[Mark IN_PROGRESS] H --> I[Discard Other PENDING Events] I --> J[Check Base Requirements] J --> K{Requirements Met?} K -->|No| L{Image Missing + Social Links?} L -->|Yes| M[Trigger Social Scraping] L -->|No| N[Mark DISCARDED] K -->|Yes| O[Check Data Sufficiency] O --> P{Sufficient Data?} P -->|Yes| Q[Upsert to Index] Q --> R[Mark COMPLETED] P -->|No| S{Can Collect Data?} S -->|Yes| T[Trigger Data Collection] S -->|No| U{Has Image Asset?} U -->|Yes| V[Best Effort Index] U -->|No| W[Mark DISCARDED] V --> R M --> X[Event Stays IN_PROGRESS] T --> X

Base Requirements Check

Function: checkBaseRequirements(accountId)

Location: packages/db/src/access/baseRequirementsChecker.ts

Verifies the account has:

  1. Display Name - account.displayName is not null/empty
  2. Image - account.imageUrl is not null/empty
  3. Approved Term - At least one ipTerms record with ipApprove = true
  4. Social Links - At least one entry in account_social_links table

Recovery: If image is missing but account has social links, trigger social scraping to fetch avatar.

Result: Returns BaseRequirements object with isComplete flag and missingFields array.

Data Sufficiency Check

Function: checkDataSufficiency(accountId)

Location: packages/db/src/access/dataSufficiencyChecker.ts

Counts tags from multiple sources:

  1. Account Description - Counts as 1 tag if present
  2. Asset Tags - Each asset with searchTags counts as 1 tag
  3. Follower Count - Counts as 1 tag if present

Minimum Threshold: MIN_TAGS = 5

Result: Returns DataSufficiency object with:

  • totalTags - Total tag count
  • isEnough - Whether totalTags >= MIN_TAGS
  • missingData - Object indicating what's missing:
    • hasUnprocessedAssets - Assets without tags
    • needsSocialScraping - Social links not scraped recently
    • hasUnprocessedVoiceAssets - Voice assets missing voice samples
    • unprocessedAssetIds - List of asset IDs needing processing

Sub-Processes

AI Tag Generation

Endpoint: POST /api/indexing/generate-tags

Location: apps/zooly-app/app/api/indexing/generate-tags/route.ts

Triggered When:

  • hasUnprocessedAssets = true OR
  • hasUnprocessedVoiceAssets = true

Process:

  1. Get unprocessed assets (searchTags IS NULL and tagAttemptCount < 5)
  2. For each asset:
    • IMAGE: Call generateTagsFromImage() → Gemini vision analysis
    • VOICE: Call generateTagsFromVoice() → Gemini audio analysis
    • VOICE: Also call createVoiceSample() → ElevenLabs voice clone + TTS
      • Creates voice clone on ElevenLabs (addVoice)
      • Generates AI sample text (generateVoiceSampleText)
      • Creates TTS audio (generateVoiceForText)
      • Uploads TTS sample to S3
      • Persists voiceId to eleven_labs table (system-wide voice management)
      • Updates likenessAssets.voiceSampleUrl (for search)
  3. Update likenessAssets.searchTags with extracted tags
  4. On completion: Mark event COMPLETED + create new queue event

Service Functions:

  • generateTagsFromImage() - packages/likeness-search/src/generateTagsFromImage.ts
  • generateTagsFromVoice() - packages/likeness-search/src/generateTagsFromVoice.ts
  • createVoiceSample() - packages/likeness-search/src/createVoiceSample.ts

Social Scraping

Endpoint: POST /api/indexing/scrape-social

Location: apps/zooly-app/app/api/indexing/scrape-social/route.ts

Service Package: packages/social-scraper

Triggered When:

  • needsSocialScraping = true OR
  • Image missing and account has social links

Process:

  1. Get social links from account_social_links table
  2. For each link:
    • Check if recently scraped (<24h) and successful → skip
    • Check if max attempts (5) reached → skip
    • Check if exponential backoff period is active → skip
    • Parse URL to extract platform-specific identifier (username, channel ID, etc.)
    • Select platform-specific scraper function
    • Execute scraper via Scrapfly API:
      • Instagram: Extracts full_name, profile_pic_url_hd, edge_followed_by.count
      • TikTok: Extracts nickname, avatarLarger, followerCount
      • Twitter/X: Extracts display name, profile_image_url_https, followers_count
      • YouTube: Extracts channel name, avatar, subscriber count (parsed from text format)
      • LinkedIn: Extracts profile name, avatar, connection count
    • Save results to scrapes table (name, avatar, followers, rawData)
    • On success: Reset attempt count
    • On error: Increment attempt count and apply exponential backoff
  3. Update follower counts in account_social_links table per platform
  4. If account missing image and scrape has avatar:
    • Upload avatar to S3 via uploadImageFromUrl()
    • Set account.imageUrl
    • Create likenessAssets entry (type: IMAGE) for AI tag extraction
  5. Handle rate limit errors (429) with retry logic
  6. On completion: Mark event COMPLETED + create new queue event

Implementation Details:

  • Main Function: processSocialScraping(accountId) - packages/social-scraper/src/processSocialScraping.ts
  • Scraper Registry: getScraperForPlatform() - packages/social-scraper/src/scrapers/index.ts
  • URL Parsing: parseSocialUrl() - packages/social-scraper/src/parseSocialUrl.ts
  • Platform Scrapers: Individual scraper functions in packages/social-scraper/src/scrapers/
  • Scrapfly Integration: Uses Scrapfly SDK with ASP (Anti-Scraping Protection) enabled
  • Retry Logic: Exponential backoff (60s base, max 1 hour) with max 5 attempts per link

Index Upsert

Function: upsertToIndex(accountId)

Location: packages/likeness-search/src/upsertToIndex.ts

Once sufficient data is available, the account is upserted to both indexes:

SQL Index Upsert

  1. Aggregate Tags: aggregateAccountTags(accountId) collects tags from:
    • Account description
    • likenessAssets[].searchTags
    • Social follower count
  2. Merge Strategy: First non-null value wins for each field
  3. Upsert: upsertSearchIndex(accountId, aggregatedData) writes to likeness_search table

Implementation: packages/db/src/access/likenessSearch.ts

Vector Index Upsert

  1. Normalize Tags: normalizeTagsForVector(aggregatedData) converts tags to text string
    • Example: "category: models, gender: female, hairColor: blonde, ..."
  2. Generate Embedding: generateEmbedding(vectorContent) uses OpenAI text-embedding-3-small
    • Returns 1536-dimensional vector
  3. Upsert: vectorUpsert(accountId, vectorContent, embedding) writes to likeness_search_vector table

Implementation:

  • packages/likeness-search/src/generateEmbedding.ts
  • packages/db/src/access/likenessSearchVector.ts

Tag Aggregation

Function: aggregateAccountTags(accountId)

Location: packages/likeness-search/src/aggregateAccountTags.ts

Aggregates tags from multiple sources using a "first non-null wins" merge strategy:

Sources

  1. Account Description - Direct account-provided tags (if any)
  2. Asset Tags - AI-extracted tags from likenessAssets[].searchTags
  3. Social Data - Follower count from account_social_links

Field Categories

Tags are merged by category:

  • Enum Fields (19 fields): category, gender, country, hairColor, eyeColor, etc.
  • Numeric Fields (4 fields): numberOfFollowers, birthYear, height, weight, engagementRate
  • Boolean Fields (4 fields): isVerified, wearGlasses, hasTattoos, hasPiercings

Normalization

For vector embedding, tags are normalized to a text string:

  • Enum values: ENUM_VALUEenum value (lowercase, underscores to spaces)
  • Numeric values: Included as-is
  • Boolean values: trueyes, falseno

Example Output: "category: models, gender: female, hairColor: blonde, numberOfFollowers: 1000000, wearGlasses: no"

Error Handling

Event-Level Retries

Function: handleApiError(error, currentRetryCount)

Location: packages/likeness-search/src/apiRetryHandler.ts

Error Classification:

  • RATE_LIMIT: Rate limit errors from OpenAI, Gemini, ElevenLabs → Retry with backoff
  • TEMPORARY: Network errors (timeouts, connection errors, 5xx) → Retry with backoff
  • PERMANENT: Unknown errors or max retries exceeded → No retry

Retry Logic:

  • Max retries: 5 attempts
  • Exponential backoff: baseDelay * 2^(retryCount - 1)
  • Base delays: 60s for rate limits, 30s for temporary errors
  • Max delay: 1 hour

Status Management:

  • Retryable errors → Mark event AWAITING_RETRY with retryAt timestamp
  • Permanent errors → Mark event FAILED
  • Daemon moves AWAITING_RETRY events back to PENDING when retryAt passes

Per-Asset Retries

Fields: likenessAssets.tagAttemptCount, likenessAssets.tagLastAttemptAt

Behavior:

  • Assets skipped if tagAttemptCount >= 5
  • Assets skipped if still in exponential backoff period
  • On success: tagAttemptCount reset to 0
  • On failure: tagAttemptCount incremented, tagLastAttemptAt updated

Implementation: packages/db/src/access/likenessAssets.ts

Per-Scrape Retries

Fields: scrapes.attemptCount, scrapes.lastAttemptAt

Behavior:

  • Links skipped if attemptCount >= 5
  • Links skipped if still in exponential backoff period
  • On success: attemptCount reset to 0
  • On failure: attemptCount incremented, lastAttemptAt updated

Implementation: packages/db/src/access/scrapes.ts

Voice Sample Creation Pipeline

When a voice asset is processed, a complete voice sample is created:

sequenceDiagram participant API as Generate Tags API participant Validate as URL Validator participant ElevenLabs participant Gemini participant S3 participant DB as Database API->>Validate: Validate Audio URL Validate-->>API: URL Valid API->>API: Fetch Audio Blob API->>Validate: Validate Content Type Validate-->>API: Content Type Valid API->>ElevenLabs: Create Voice Clone ElevenLabs-->>API: Voice ID API->>Gemini: Generate Demo Script Gemini-->>API: Sample Text API->>ElevenLabs: Generate TTS Audio ElevenLabs-->>API: Audio Stream API->>S3: Upload Voice Sample S3-->>API: S3 URL API->>DB: Update voiceSampleUrl DB-->>API: Updated

Steps:

  1. Validate Audio URL - Reject video platform URLs (YouTube, Vimeo, etc.)
  2. Fetch Audio Blob - Download audio from content URL
  3. Validate Content Type - Ensure valid audio MIME type
  4. Create Voice Clone - ElevenLabs addVoice() creates clone
  5. Generate Demo Script - Gemini generates 15-30 word script matching voice characteristics
  6. Generate TTS Audio - ElevenLabs textToSpeech.convert() creates audio
  7. Upload to S3 - Store at likeness-voice-samples/voice-sample-{assetId}-{uuid}.mp3
  8. Update Database - Set likenessAssets.voiceSampleUrl

Implementation: packages/likeness-search/src/createVoiceSample.ts

Cron Configuration

File: apps/zooly-app/vercel.json

{
  "crons": [
    {
      "path": "/api/indexing/process-queue",
      "schedule": "*/2 * * * *"
    }
  ]
}

The cron job runs every 2 minutes and calls the process-queue endpoint with CRON_SECRET bearer token authentication.

Monitoring & Observability

Logging Points

The system logs at key points:

  1. Daemon Start/End - Event counts (timed out, retried, processed)
  2. Event Status Transitions - PENDING → IN_PROGRESS → COMPLETED/FAILED
  3. Sub-Process Triggers - When AI generation or scraping is triggered
  4. AI Generation Results - Tag extraction success/failure
  5. Search Queries - Query patterns and result counts

Metrics to Track

  • Queue depth (PENDING events count)
  • Processing time per event
  • Success/failure rates
  • Search latency (SQL vs vector fallback)
  • Vector fallback frequency
  • AI generation success rates
  • Voice sample creation success rates