The indexing pipeline is an event-driven system that processes account data and makes it searchable. It handles data collection, tag extraction, and index updates asynchronously through a queue-based architecture.
Events progress through these statuses:
Events are created with different elementType values:
IMAGE_ASSET - Image asset uploadedVOICE_ASSET - Voice asset uploadedVOICE_TERM - VoiceOver term approvedIMAGE_TERM - Image term approvedLIKENESS_TERM - Likeness term approvedPROFILE_DESCRIPTION - Account description updatedSOCIAL_DATA - Social data changedThe indexing daemon runs every 2 minutes via cron job (GET /api/indexing/process-queue).
Function: acquireLock(INDEXING_DAEMON_PID, 10)
Before processing begins, the daemon acquires a distributed process lock to prevent concurrent execution. This is critical because:
updateProcessLock() after each event to keep the lock alive.Lock Lifecycle:
lastAcquired timestamp after each eventfinally block, even on errorsImplementation:
packages/db/src/access/processLock.tsprocess_lock (id: "INDEXING_DAEMON", lock_flag, last_acquired)Function: markTimedOutEvents()
Marks any IN_PROGRESS events older than 5 minutes as TIMEOUT. This handles cases where sub-processes fail silently or take too long.
Implementation: packages/db/src/access/likenessNeedIndexingQueue.ts
Function: processRetryableEvents()
Moves AWAITING_RETRY events whose retryAt time has passed back to PENDING status. These events will be processed in the next iteration.
Implementation: packages/db/src/access/likenessNeedIndexingQueue.ts
Function: indexingDaemon()
Processes up to INDEXING_DAEMON_BATCH_SIZE events (default: 250):
PENDING event (FIFO)processNextEvent() to process itImplementation: packages/likeness-search/src/indexingDaemon.ts
Each event goes through this detailed flow:
Function: checkBaseRequirements(accountId)
Location: packages/db/src/access/baseRequirementsChecker.ts
Verifies the account has:
account.displayName is not null/emptyaccount.imageUrl is not null/emptyipTerms record with ipApprove = trueaccount_social_links tableRecovery: If image is missing but account has social links, trigger social scraping to fetch avatar.
Result: Returns BaseRequirements object with isComplete flag and missingFields array.
Function: checkDataSufficiency(accountId)
Location: packages/db/src/access/dataSufficiencyChecker.ts
Counts tags from multiple sources:
searchTags counts as 1 tagMinimum Threshold: MIN_TAGS = 5
Result: Returns DataSufficiency object with:
totalTags - Total tag countisEnough - Whether totalTags >= MIN_TAGSmissingData - Object indicating what's missing:
hasUnprocessedAssets - Assets without tagsneedsSocialScraping - Social links not scraped recentlyhasUnprocessedVoiceAssets - Voice assets missing voice samplesunprocessedAssetIds - List of asset IDs needing processingEndpoint: POST /api/indexing/generate-tags
Location: apps/zooly-app/app/api/indexing/generate-tags/route.ts
Triggered When:
hasUnprocessedAssets = true ORhasUnprocessedVoiceAssets = trueProcess:
searchTags IS NULL and tagAttemptCount < 5)generateTagsFromImage() → Gemini vision analysisgenerateTagsFromVoice() → Gemini audio analysiscreateVoiceSample() → ElevenLabs voice clone + TTS
addVoice)generateVoiceSampleText)generateVoiceForText)voiceId to eleven_labs table (system-wide voice management)likenessAssets.voiceSampleUrl (for search)likenessAssets.searchTags with extracted tagsCOMPLETED + create new queue eventService Functions:
generateTagsFromImage() - packages/likeness-search/src/generateTagsFromImage.tsgenerateTagsFromVoice() - packages/likeness-search/src/generateTagsFromVoice.tscreateVoiceSample() - packages/likeness-search/src/createVoiceSample.tsEndpoint: POST /api/indexing/scrape-social
Location: apps/zooly-app/app/api/indexing/scrape-social/route.ts
Service Package: packages/social-scraper
Triggered When:
needsSocialScraping = true ORProcess:
account_social_links tablefull_name, profile_pic_url_hd, edge_followed_by.countnickname, avatarLarger, followerCountprofile_image_url_https, followers_countscrapes table (name, avatar, followers, rawData)account_social_links table per platformuploadImageFromUrl()account.imageUrllikenessAssets entry (type: IMAGE) for AI tag extractionCOMPLETED + create new queue eventImplementation Details:
processSocialScraping(accountId) - packages/social-scraper/src/processSocialScraping.tsgetScraperForPlatform() - packages/social-scraper/src/scrapers/index.tsparseSocialUrl() - packages/social-scraper/src/parseSocialUrl.tspackages/social-scraper/src/scrapers/Function: upsertToIndex(accountId)
Location: packages/likeness-search/src/upsertToIndex.ts
Once sufficient data is available, the account is upserted to both indexes:
aggregateAccountTags(accountId) collects tags from:
likenessAssets[].searchTagsupsertSearchIndex(accountId, aggregatedData) writes to likeness_search tableImplementation: packages/db/src/access/likenessSearch.ts
normalizeTagsForVector(aggregatedData) converts tags to text string
"category: models, gender: female, hairColor: blonde, ..."generateEmbedding(vectorContent) uses OpenAI text-embedding-3-small
vectorUpsert(accountId, vectorContent, embedding) writes to likeness_search_vector tableImplementation:
packages/likeness-search/src/generateEmbedding.tspackages/db/src/access/likenessSearchVector.tsFunction: aggregateAccountTags(accountId)
Location: packages/likeness-search/src/aggregateAccountTags.ts
Aggregates tags from multiple sources using a "first non-null wins" merge strategy:
likenessAssets[].searchTagsaccount_social_linksTags are merged by category:
category, gender, country, hairColor, eyeColor, etc.numberOfFollowers, birthYear, height, weight, engagementRateisVerified, wearGlasses, hasTattoos, hasPiercingsFor vector embedding, tags are normalized to a text string:
ENUM_VALUE → enum value (lowercase, underscores to spaces)true → yes, false → noExample Output: "category: models, gender: female, hairColor: blonde, numberOfFollowers: 1000000, wearGlasses: no"
Function: handleApiError(error, currentRetryCount)
Location: packages/likeness-search/src/apiRetryHandler.ts
Error Classification:
Retry Logic:
baseDelay * 2^(retryCount - 1)Status Management:
AWAITING_RETRY with retryAt timestampFAILEDAWAITING_RETRY events back to PENDING when retryAt passesFields: likenessAssets.tagAttemptCount, likenessAssets.tagLastAttemptAt
Behavior:
tagAttemptCount >= 5tagAttemptCount reset to 0tagAttemptCount incremented, tagLastAttemptAt updatedImplementation: packages/db/src/access/likenessAssets.ts
Fields: scrapes.attemptCount, scrapes.lastAttemptAt
Behavior:
attemptCount >= 5attemptCount reset to 0attemptCount incremented, lastAttemptAt updatedImplementation: packages/db/src/access/scrapes.ts
When a voice asset is processed, a complete voice sample is created:
Steps:
addVoice() creates clonetextToSpeech.convert() creates audiolikeness-voice-samples/voice-sample-{assetId}-{uuid}.mp3likenessAssets.voiceSampleUrlImplementation: packages/likeness-search/src/createVoiceSample.ts
File: apps/zooly-app/vercel.json
{
"crons": [
{
"path": "/api/indexing/process-queue",
"schedule": "*/2 * * * *"
}
]
}
The cron job runs every 2 minutes and calls the process-queue endpoint with CRON_SECRET bearer token authentication.
The system logs at key points:
On This Page
OverviewPipeline ArchitectureEvent LifecycleEvent TypesDaemon WorkflowProcess LockingStep 1: MaintenanceStep 2: Process RetriesStep 3: Process QueueEvent Processing FlowBase Requirements CheckData Sufficiency CheckSub-ProcessesAI Tag GenerationSocial ScrapingIndex UpsertSQL Index UpsertVector Index UpsertTag AggregationSourcesField CategoriesNormalizationError HandlingEvent-Level RetriesPer-Asset RetriesPer-Scrape RetriesVoice Sample Creation PipelineCron ConfigurationMonitoring & ObservabilityLogging PointsMetrics to Track