agregarr_agregarr/server/lib/collections/services/IndividualCollectionScheduler.ts
bitr8 bcda04dc64
fix(scheduler): startNow immediate sync and deadlock bugs (#348)
Bug 1: startNow doesn't trigger immediate sync
- When startNow=true, firstSyncAt was set to current time
- calculateNextRunFromFirstSync() returned firstSyncAt + interval (12h later)
- No immediate sync was triggered
- Fix: If firstSyncAt is within 60s of now, queue immediate sync

Bug 2: Deadlock when scheduled sync overlaps full sync
- Scheduled sync acquired API lock in queueCollectionSync()
- Queued sync to process "after main sync completes"
- But main sync was waiting for the same API lock
- Fix: Move API lock acquisition from queue time to execution time
  in processLibraryQueue(), after fullSyncRunning check

Co-authored-by: bitr8 <bitr8@users.noreply.github.com>
2026-01-14 10:16:02 +13:00

1179 lines
35 KiB
TypeScript

import PlexAPI from '@server/api/plexapi';
import type {
CustomSyncSchedule,
MultiSourceCombineMode,
MultiSourceType,
} from '@server/lib/settings';
import { getSettings, SYNC_SCHEDULE_PRESETS } from '@server/lib/settings';
import logger from '@server/logger';
import schedule from 'node-schedule';
import { collectionSyncService } from './CollectionSyncService';
interface IndividualCollectionJob {
collectionId: string;
job: schedule.Job;
intervalHours?: number; // For preset schedules
customCron?: string; // For custom cron schedules
}
interface QueuedCollectionSync {
collectionId: string;
collectionName: string;
libraryId: string;
priority: number; // Lower numbers = higher priority
scheduledAt: Date;
}
interface LibraryQueue {
libraryId: string;
running: boolean;
queue: QueuedCollectionSync[];
currentCollection?: string;
}
interface ApiWaitingItem {
collectionId: string;
collectionName: string;
libraryId: string;
resolve: () => void;
scheduledAt: Date;
}
interface ApiQueue {
apiType: string;
inUse: boolean;
waitingQueue: ApiWaitingItem[];
}
/**
* IndividualCollectionScheduler - Manages custom sync schedules for collections
*
* Handles per-collection sync scheduling separate from the main sync job.
* Supports decimal hour intervals (e.g., 0.5 for 30 minutes, 2.5 for 2 hours 30 minutes).
*/
export class IndividualCollectionScheduler {
private static jobs: Map<string, IndividualCollectionJob> = new Map();
private static initialized = false;
private static libraryQueues: Map<string, LibraryQueue> = new Map();
private static fullSyncRunning = false;
private static apiQueues: Map<string, ApiQueue> = new Map();
/**
* Parse custom sync schedule and return either interval hours or cron expression
*/
private static parseCustomSyncSchedule(schedule: CustomSyncSchedule): {
intervalHours?: number;
customCron?: string;
startDate?: string;
startTime?: string;
firstSyncAt?: string;
} {
if (!schedule.enabled) {
return {};
}
// Handle legacy format (direct intervalHours)
if (schedule.intervalHours && !schedule.scheduleType) {
return { intervalHours: schedule.intervalHours };
}
// Handle new format
if (schedule.scheduleType === 'preset' && schedule.preset) {
const preset = SYNC_SCHEDULE_PRESETS.find(
(p) => p.key === schedule.preset
);
if (preset) {
return {
intervalHours: preset.intervalHours,
startDate: schedule.startNow ? undefined : schedule.startDate,
startTime: schedule.startNow ? undefined : schedule.startTime,
firstSyncAt: schedule.firstSyncAt,
};
}
} else if (schedule.scheduleType === 'custom' && schedule.customCron) {
return { customCron: schedule.customCron };
}
// Fallback to legacy intervalHours if present
if (schedule.intervalHours) {
return { intervalHours: schedule.intervalHours };
}
return {};
}
/**
* Generate a cron expression for an interval with a specific start date/time
*/
private static generateCronWithStartDateTime(
intervalHours: number,
startDate: string,
startTime: string
): string {
const [hour, minute] = startTime.split(':').map(Number);
const [day, month] = startDate.split('-').map(Number);
if (intervalHours === 24) {
// Daily: run at the same time every day
return `${minute} ${hour} * * *`;
} else if (intervalHours === 168) {
// Weekly: run on the same day/time every week (assume Monday if not specified)
return `${minute} ${hour} * * 1`;
} else if (intervalHours === 720) {
// Monthly: run on the same day of month
return `${minute} ${hour} ${day} * *`;
} else if (intervalHours === 8760) {
// Yearly: run on the specific date/time
return `${minute} ${hour} ${day} ${month} *`;
} else {
// For other intervals, fall back to regular cron
const hours = Math.floor(intervalHours);
return `${minute} */${hours} * * *`;
}
}
/**
* Parse start date and time into a Date object for the next occurrence
*/
private static parseStartDateTime(
startDate: string,
startTime: string
): Date {
const [hour, minute] = startTime.split(':').map(Number);
const [day, month] = startDate.split('-').map(Number);
const now = new Date();
const targetDate = new Date(
now.getFullYear(),
month - 1,
day,
hour,
minute,
0
);
// If the target date is in the past, move to next year
if (targetDate < now) {
targetDate.setFullYear(now.getFullYear() + 1);
}
return targetDate;
}
/**
* Calculate the next run time based on the first sync time and interval
*/
private static calculateNextRunFromFirstSync(
firstSyncAt: string,
intervalHours: number
): Date {
const firstSync = new Date(firstSyncAt);
const now = new Date();
const intervalMs = intervalHours * 60 * 60 * 1000;
// Calculate how many intervals have passed since the first sync
const elapsed = now.getTime() - firstSync.getTime();
const intervalsPassed = Math.floor(elapsed / intervalMs);
// Calculate the next run time
const nextRun = new Date(
firstSync.getTime() + (intervalsPassed + 1) * intervalMs
);
// If the calculated next run is in the past (shouldn't happen, but safety check), use now + interval
if (nextRun <= now) {
return new Date(now.getTime() + intervalMs);
}
return nextRun;
}
/**
* Initialize the scheduler by setting up jobs for existing collections
*/
public static async initialize(): Promise<void> {
if (this.initialized) {
return;
}
try {
await this.refreshAllJobs();
this.initialized = true;
logger.info('IndividualCollectionScheduler initialized successfully', {
label: 'Individual Collection Scheduler',
activeJobs: this.jobs.size,
});
} catch (error) {
logger.error(
`Failed to initialize IndividualCollectionScheduler: ${error}`,
{
label: 'Individual Collection Scheduler',
error: error instanceof Error ? error.message : String(error),
}
);
}
}
/**
* Refresh all jobs based on current collection configurations
*/
public static async refreshAllJobs(): Promise<void> {
// Clear existing jobs
this.clearAllJobs();
const settings = getSettings();
const collections = settings.plex.collectionConfigs || [];
for (const collection of collections) {
const customSync = collection.customSyncSchedule;
if (customSync?.enabled) {
const parsed = this.parseCustomSyncSchedule(customSync);
if (parsed.intervalHours || parsed.customCron) {
await this.scheduleCollectionSync(
collection.id,
parsed.intervalHours,
parsed.customCron,
parsed.startDate,
parsed.startTime,
parsed.firstSyncAt
);
}
}
}
}
/**
* Schedule a collection for individual sync
*/
public static async scheduleCollectionSync(
collectionId: string,
intervalHours?: number,
customCron?: string,
startDate?: string,
startTime?: string,
firstSyncAt?: string
): Promise<void> {
// Cancel existing job if it exists
this.cancelCollectionSync(collectionId);
try {
let finalCronExpression: string;
// Use custom cron if provided
if (customCron) {
finalCronExpression = customCron;
} else if (intervalHours && intervalHours > 0) {
// Generate cron from interval hours with optional start date/time
if (startDate && startTime && intervalHours >= 24) {
// For daily+ intervals with start date/time, create specific cron
finalCronExpression = this.generateCronWithStartDateTime(
intervalHours,
startDate,
startTime
);
} else if (firstSyncAt && intervalHours >= 1) {
// Use firstSyncAt to calculate next run time for persistent scheduling
const nextRunTime = this.calculateNextRunFromFirstSync(
firstSyncAt,
intervalHours
);
// If firstSyncAt is very recent (within 60 seconds), trigger immediate sync
// This handles the "startNow" case where user expects immediate execution
const firstSyncDate = new Date(firstSyncAt);
const now = new Date();
const timeSinceFirstSync = now.getTime() - firstSyncDate.getTime();
const immediateWindowMs = 60 * 1000; // 60 second grace window
if (
timeSinceFirstSync >= 0 &&
timeSinceFirstSync <= immediateWindowMs
) {
logger.info(
`Triggering immediate sync for newly scheduled collection (startNow)`,
{
label: 'Individual Collection Scheduler',
collectionId,
intervalHours,
firstSyncAt,
timeSinceFirstSyncMs: timeSinceFirstSync,
}
);
// Queue immediate sync (don't await to avoid blocking scheduler setup)
this.queueCollectionSync(collectionId).catch((err) => {
logger.error(`Failed to queue immediate sync: ${err}`, {
label: 'Individual Collection Scheduler',
collectionId,
});
});
}
// Create a one-time job for the next calculated run, then reschedule
const job = schedule.scheduleJob(nextRunTime, async () => {
await this.queueCollectionSync(collectionId);
// Reschedule for next interval
await this.scheduleCollectionSync(
collectionId,
intervalHours,
undefined,
undefined,
undefined,
firstSyncAt
);
});
if (job) {
this.jobs.set(collectionId, {
collectionId,
job,
intervalHours,
customCron,
});
logger.info(
`Scheduled individual collection sync with persistent timing`,
{
label: 'Individual Collection Scheduler',
collectionId,
intervalHours,
firstSyncAt,
nextRun: nextRunTime.toISOString(),
}
);
}
return; // Exit early for persistent scheduling
} else {
// Legacy behavior for immediate start or sub-daily intervals
if (intervalHours < 1) {
const minutes = Math.round(intervalHours * 60);
finalCronExpression = `*/${minutes} * * * *`;
} else if (intervalHours === Math.floor(intervalHours)) {
// Whole hours
const hours = Math.floor(intervalHours);
finalCronExpression = `0 */${hours} * * *`;
} else {
// Decimal hours - use minute-based approach
const totalMinutes = Math.round(intervalHours * 60);
finalCronExpression = `*/${totalMinutes} * * * *`;
}
}
} else {
throw new Error('Either intervalHours or customCron must be provided');
}
const job = schedule.scheduleJob(finalCronExpression, async () => {
await this.queueCollectionSync(collectionId);
});
if (job) {
this.jobs.set(collectionId, {
collectionId,
job,
intervalHours,
customCron,
});
logger.info(`Scheduled individual collection sync`, {
label: 'Individual Collection Scheduler',
collectionId,
intervalHours,
customCron,
cronExpression: finalCronExpression,
});
} else {
throw new Error('Failed to create scheduled job');
}
} catch (error) {
logger.error(
`Failed to schedule collection sync for ${collectionId}: ${error}`,
{
label: 'Individual Collection Scheduler',
collectionId,
intervalHours,
customCron,
error: error instanceof Error ? error.message : String(error),
}
);
}
}
/**
* Cancel individual sync for a collection
*/
public static cancelCollectionSync(collectionId: string): void {
const existingJob = this.jobs.get(collectionId);
if (existingJob) {
existingJob.job.cancel();
this.jobs.delete(collectionId);
logger.info(`Cancelled individual collection sync`, {
label: 'Individual Collection Scheduler',
collectionId,
});
}
}
/**
* Set full sync running status to prevent individual syncs
*/
public static setFullSyncRunning(running: boolean): void {
this.fullSyncRunning = running;
logger.debug(
`Full sync status changed: ${running ? 'running' : 'stopped'}`,
{
label: 'Individual Collection Scheduler',
}
);
}
/**
* Check if full sync is currently running
*/
public static isFullSyncRunning(): boolean {
return this.fullSyncRunning;
}
/**
* Wait for API to become available, then acquire lock
*/
public static async waitForApiAccess(
apiType: string,
collectionId: string,
collectionName: string,
libraryId: string
): Promise<void> {
return new Promise((resolve) => {
// Get or create API queue
let apiQueue = this.apiQueues.get(apiType);
if (!apiQueue) {
apiQueue = {
apiType,
inUse: false,
waitingQueue: [],
};
this.apiQueues.set(apiType, apiQueue);
}
// If API is not in use, acquire it immediately
if (!apiQueue.inUse) {
apiQueue.inUse = true;
logger.debug(`API acquired immediately: ${apiType}`, {
label: 'Individual Collection Scheduler',
apiType,
collectionId,
collectionName,
});
resolve();
return;
}
// API is in use, add to waiting queue
const waitingItem: ApiWaitingItem = {
collectionId,
collectionName,
libraryId,
resolve,
scheduledAt: new Date(),
};
apiQueue.waitingQueue.push(waitingItem);
logger.info(
`Collection waiting for API access: ${collectionName} (${apiType} API, position ${apiQueue.waitingQueue.length} in queue)`,
{
label: 'Individual Collection Scheduler',
apiType,
collectionId,
collectionName,
queuePosition: apiQueue.waitingQueue.length,
}
);
});
}
/**
* Release API and process next item in waiting queue
*/
public static releaseApiAccess(apiType: string): void {
const apiQueue = this.apiQueues.get(apiType);
if (!apiQueue) {
return;
}
// Process next waiting item
const nextItem = apiQueue.waitingQueue.shift();
if (nextItem) {
logger.info(
`API access granted to next waiting collection: ${nextItem.collectionName} (${apiType} API)`,
{
label: 'Individual Collection Scheduler',
apiType,
collectionId: nextItem.collectionId,
collectionName: nextItem.collectionName,
waitingQueueRemaining: apiQueue.waitingQueue.length,
}
);
// Keep API in use for the next item
nextItem.resolve();
} else {
// No more waiting items, release the API
apiQueue.inUse = false;
logger.debug(`API released: ${apiType}`, {
label: 'Individual Collection Scheduler',
apiType,
});
}
}
/**
* Check if an API is currently in use
*/
public static isApiInUse(apiType: string): boolean {
const apiQueue = this.apiQueues.get(apiType);
return apiQueue?.inUse || false;
}
/**
* Get all APIs currently in use
*/
public static getApisInUse(): string[] {
return Array.from(this.apiQueues.entries())
.filter(([, queue]) => queue.inUse)
.map(([apiType]) => apiType);
}
/**
* Get API queue status for debugging
*/
public static getApiQueuesStatus(): {
apiType: string;
inUse: boolean;
waitingCount: number;
waitingCollections: string[];
}[] {
return Array.from(this.apiQueues.entries()).map(([apiType, queue]) => ({
apiType,
inUse: queue.inUse,
waitingCount: queue.waitingQueue.length,
waitingCollections: queue.waitingQueue.map((item) => item.collectionName),
}));
}
/**
* Queue a collection for sync with collision detection and library-based queuing
*/
private static async queueCollectionSync(
collectionId: string
): Promise<void> {
try {
// Get collection configuration
const settings = getSettings();
const collectionConfig = settings.plex.collectionConfigs?.find(
(config) => config.id === collectionId
);
if (!collectionConfig) {
logger.warn(
`Collection config not found for queued sync: ${collectionId}`,
{
label: 'Individual Collection Scheduler',
collectionId,
}
);
this.cancelCollectionSync(collectionId);
return;
}
// Note: API lock is acquired in processLibraryQueue() just before execution,
// not here. Acquiring it here causes deadlock when scheduled sync triggers
// during full sync (both waiting on same lock).
const libraryId = collectionConfig.libraryId;
// Get or create library queue
let libraryQueue = this.libraryQueues.get(libraryId);
if (!libraryQueue) {
libraryQueue = {
libraryId,
running: false,
queue: [],
};
this.libraryQueues.set(libraryId, libraryQueue);
}
// Check if this collection is already queued
const alreadyQueued = libraryQueue.queue.some(
(item) => item.collectionId === collectionId
);
if (alreadyQueued) {
logger.debug(
`Collection already queued for sync: ${collectionConfig.name}`,
{
label: 'Individual Collection Scheduler',
collectionId,
libraryId,
}
);
return;
}
// Add to queue
const queuedSync: QueuedCollectionSync = {
collectionId,
collectionName: collectionConfig.name,
libraryId,
priority: Date.now(), // FIFO ordering
scheduledAt: new Date(),
};
libraryQueue.queue.push(queuedSync);
libraryQueue.queue.sort((a, b) => a.priority - b.priority);
// Log differently if queuing during main sync
if (this.fullSyncRunning) {
logger.info(
`Queued collection sync (will process after main sync completes): ${collectionConfig.name} (queue position: ${libraryQueue.queue.length})`,
{
label: 'Individual Collection Scheduler',
collectionId,
libraryId,
queueSize: libraryQueue.queue.length,
deferredUntilMainSyncComplete: true,
}
);
} else {
logger.info(
`Queued collection sync: ${collectionConfig.name} (queue position: ${libraryQueue.queue.length})`,
{
label: 'Individual Collection Scheduler',
collectionId,
libraryId,
queueSize: libraryQueue.queue.length,
}
);
}
// Process queue for this library if not already running
await this.processLibraryQueue(libraryId);
} catch (error) {
logger.error(
`Failed to queue collection sync for ${collectionId}: ${error}`,
{
label: 'Individual Collection Scheduler',
collectionId,
error: error instanceof Error ? error.message : String(error),
}
);
}
}
/**
* Process the queue for a specific library
*/
private static async processLibraryQueue(libraryId: string): Promise<void> {
const libraryQueue = this.libraryQueues.get(libraryId);
if (
!libraryQueue ||
libraryQueue.running ||
libraryQueue.queue.length === 0
) {
return;
}
libraryQueue.running = true;
try {
while (!this.fullSyncRunning) {
const nextSync = libraryQueue.queue.shift();
if (!nextSync) {
// Queue is empty
break;
}
libraryQueue.currentCollection = nextSync.collectionName;
// Acquire API lock here, just before execution, not at queue time.
// This prevents deadlock when scheduled sync triggers during full sync.
const settings = getSettings();
const collectionConfig = settings.plex.collectionConfigs?.find(
(config) => config.id === nextSync.collectionId
);
if (!collectionConfig) {
logger.warn(
`Collection config not found during queue processing: ${nextSync.collectionId}`,
{
label: 'Individual Collection Scheduler',
collectionId: nextSync.collectionId,
}
);
continue;
}
const apiType = collectionConfig.type;
await this.waitForApiAccess(
apiType,
nextSync.collectionId,
nextSync.collectionName,
nextSync.libraryId
);
logger.info(
`Processing queued collection sync: ${nextSync.collectionName} (${libraryQueue.queue.length} remaining in queue)`,
{
label: 'Individual Collection Scheduler',
collectionId: nextSync.collectionId,
libraryId,
queueRemaining: libraryQueue.queue.length,
}
);
await this.executeCollectionSync(nextSync.collectionId);
// Brief pause between collections in the same library to prevent overwhelming Plex
if (libraryQueue.queue.length > 0) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
if (this.fullSyncRunning && libraryQueue.queue.length > 0) {
logger.info(
`Stopping library queue processing: full sync started (${libraryQueue.queue.length} collections still queued)`,
{
label: 'Individual Collection Scheduler',
libraryId,
queueRemaining: libraryQueue.queue.length,
}
);
}
} catch (error) {
logger.error(
`Error processing library queue for ${libraryId}: ${error}`,
{
label: 'Individual Collection Scheduler',
libraryId,
error: error instanceof Error ? error.message : String(error),
}
);
} finally {
libraryQueue.running = false;
libraryQueue.currentCollection = undefined;
}
}
/**
* Execute sync for a specific collection
*/
private static async executeCollectionSync(
collectionId: string
): Promise<void> {
try {
const settings = getSettings();
const collectionConfig = settings.plex.collectionConfigs?.find(
(config) => config.id === collectionId
);
if (!collectionConfig) {
logger.warn(
`Collection config not found for scheduled sync: ${collectionId}`,
{
label: 'Individual Collection Scheduler',
collectionId,
}
);
// Cancel the job since the collection no longer exists
this.cancelCollectionSync(collectionId);
return;
}
const apiType = collectionConfig.type;
logger.info(
`Executing scheduled sync for collection: ${collectionConfig.name} (API: ${apiType})`,
{
label: 'Individual Collection Scheduler',
collectionId,
collectionName: collectionConfig.name,
apiType,
}
);
// Get admin user for Plex API (same approach as full sync)
const { getAdminUser } = await import(
'@server/lib/collections/core/CollectionUtilities'
);
const admin = await getAdminUser();
if (!admin?.plexToken) {
throw new Error('No admin Plex token found');
}
const plexClient = new PlexAPI({
plexToken: admin.plexToken,
plexSettings: settings.plex,
});
// Determine if this is multi-source
const extendedConfig = collectionConfig as typeof collectionConfig & {
isMultiSource?: boolean;
sources?: {
id: string;
type: string;
subtype?: string;
customUrl?: string;
timePeriod?: string;
customDays?: number;
minimumPlays?: number;
priority: number;
networksCountry?: string;
radarrTagServerId?: number;
radarrTagId?: number;
radarrTagLabel?: string;
sonarrTagServerId?: number;
sonarrTagId?: number;
sonarrTagLabel?: string;
}[];
};
const isMultiSource =
extendedConfig.isMultiSource &&
(extendedConfig.sources?.length ?? 0) > 0;
const allCollections = await plexClient.getAllCollections();
// Use shared library cache for better matching
const { libraryCacheService } = await import('./LibraryCacheService');
const libraryCache = await libraryCacheService.getCache(plexClient);
let result;
if (isMultiSource) {
// Use multi-source orchestrator
const { MultiSourceOrchestrator } = await import(
'./MultiSourceOrchestrator'
);
const orchestrator = new MultiSourceOrchestrator();
// Convert to MultiSourceCollectionConfig format
const multiSourceConfig = {
...extendedConfig,
type: 'multi-source' as const,
sources:
extendedConfig.sources?.map((source) => ({
id: source.id,
type: source.type as MultiSourceType,
subtype: source.subtype || '',
customUrl: source.customUrl,
timePeriod: source.timePeriod as
| 'daily'
| 'weekly'
| 'monthly'
| 'all',
customDays: source.customDays,
minimumPlays: source.minimumPlays,
priority: source.priority,
networksCountry: source.networksCountry,
radarrTagServerId: source.radarrTagServerId,
radarrTagId: source.radarrTagId,
radarrTagLabel: source.radarrTagLabel,
sonarrTagServerId: source.sonarrTagServerId,
sonarrTagId: source.sonarrTagId,
sonarrTagLabel: source.sonarrTagLabel,
})) || [],
combineMode:
(extendedConfig.combineMode as MultiSourceCombineMode) ||
'list_order',
};
result = await orchestrator.processMultiSourceCollection(
multiSourceConfig,
plexClient,
allCollections,
new Set(),
libraryCache
);
} else {
// Use normal single-source sync
const syncService = await collectionSyncService.createSyncService(
collectionConfig.type
);
result = await syncService.processCollections(
[collectionConfig],
plexClient,
allCollections,
new Set(),
libraryCache
);
}
// Mark collection as synced (update needsSync status)
settings.markCollectionSynced(collectionId, 'collection');
settings.save();
// Sync Plex collection ordering after collection sync
const { HubSyncService } = await import(
'@server/lib/collections/plex/HubSyncService'
);
const hubSyncService = new HubSyncService();
await hubSyncService.syncUnifiedOrdering(plexClient);
logger.info(
`Scheduled collection sync completed: ${collectionConfig.name}`,
{
label: 'Individual Collection Scheduler',
collectionId,
collectionName: collectionConfig.name,
result,
}
);
} catch (error) {
logger.error(
`Scheduled collection sync failed for ${collectionId}: ${error}`,
{
label: 'Individual Collection Scheduler',
collectionId,
error: error instanceof Error ? error.message : String(error),
}
);
} finally {
// Always release the API, regardless of success or failure
const settings = getSettings();
const collectionConfig = settings.plex.collectionConfigs?.find(
(config) => config.id === collectionId
);
if (collectionConfig) {
this.releaseApiAccess(collectionConfig.type);
}
}
}
/**
* Clear all scheduled jobs
*/
public static clearAllJobs(): void {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for (const [_, jobInfo] of this.jobs) {
jobInfo.job.cancel();
}
this.jobs.clear();
}
/**
* Get status of all scheduled jobs
*/
public static getJobsStatus(): {
collectionId: string;
intervalHours?: number;
customCron?: string;
nextRun: Date | null;
}[] {
const status: {
collectionId: string;
intervalHours?: number;
customCron?: string;
nextRun: Date | null;
}[] = [];
for (const [collectionId, jobInfo] of this.jobs) {
status.push({
collectionId,
intervalHours: jobInfo.intervalHours,
customCron: jobInfo.customCron,
nextRun: jobInfo.job.nextInvocation(),
});
}
return status;
}
/**
* Check if a collection has an active individual sync schedule
*/
public static hasActiveJob(collectionId: string): boolean {
return this.jobs.has(collectionId);
}
/**
* Get status of all library queues and API locks
*/
public static getLibraryQueuesStatus(): {
libraryId: string;
running: boolean;
currentCollection?: string;
queueSize: number;
queuedCollections: string[];
}[] {
const status: {
libraryId: string;
running: boolean;
currentCollection?: string;
queueSize: number;
queuedCollections: string[];
}[] = [];
for (const [libId, queue] of this.libraryQueues) {
status.push({
libraryId: libId,
running: queue.running,
currentCollection: queue.currentCollection,
queueSize: queue.queue.length,
queuedCollections: queue.queue.map((q) => q.collectionName),
});
}
return status;
}
/**
* Clear all queues (for emergency stop or restart)
*/
public static clearAllQueues(): void {
let totalCleared = 0;
for (const [libraryId, queue] of this.libraryQueues) {
const queueSize = queue.queue.length;
totalCleared += queueSize;
if (queueSize > 0) {
logger.debug(
`Clearing queue for library ${libraryId}: ${queueSize} items`,
{
label: 'Individual Collection Scheduler',
libraryId,
queueSize,
}
);
}
queue.queue = [];
queue.running = false;
queue.currentCollection = undefined;
}
// Clear all API queues as well
const apiQueuesStatus = this.getApiQueuesStatus();
let totalWaitingCleared = 0;
for (const [apiType, apiQueue] of this.apiQueues) {
totalWaitingCleared += apiQueue.waitingQueue.length;
// Reject all waiting promises to prevent hanging
for (const waitingItem of apiQueue.waitingQueue) {
// We can't really reject them gracefully since they're waiting for resolve()
// But clearing the queue will prevent them from ever resolving
logger.debug(
`Clearing waiting collection: ${waitingItem.collectionName}`,
{
label: 'Individual Collection Scheduler',
apiType,
collectionId: waitingItem.collectionId,
}
);
}
}
this.apiQueues.clear();
logger.info(`Cleared all individual collection queues and API queues`, {
label: 'Individual Collection Scheduler',
totalCleared,
librariesCleared: this.libraryQueues.size,
apiQueuesCleared: apiQueuesStatus.length,
totalWaitingCleared,
});
}
/**
* Wait for individual syncs to complete before allowing full sync
* No timeout - waits indefinitely (consistent with main sync behavior)
* Main sync flag will cause individual syncs to exit their processing loops
*/
public static async waitForIndividualSyncsToComplete(): Promise<void> {
const checkInterval = 1000; // Check every second
let lastLogTime = Date.now();
const logIntervalMs = 10000; // Log status every 10 seconds to avoid spam
let anyRunning = true;
while (anyRunning) {
anyRunning = Array.from(this.libraryQueues.values()).some(
(queue) => queue.running || queue.queue.length > 0
);
if (!anyRunning) {
logger.debug('All individual collection syncs have completed', {
label: 'Individual Collection Scheduler',
});
return;
}
// Log status periodically (not every second to avoid spam)
const now = Date.now();
if (now - lastLogTime >= logIntervalMs) {
logger.debug('Waiting for individual collection syncs to complete...', {
label: 'Individual Collection Scheduler',
runningQueues: Array.from(this.libraryQueues.values())
.filter((q) => q.running || q.queue.length > 0)
.map((q) => ({
libraryId: q.libraryId,
queueSize: q.queue.length,
running: q.running,
currentCollection: q.currentCollection,
})),
});
lastLogTime = now;
}
await new Promise((resolve) => setTimeout(resolve, checkInterval));
}
}
/**
* Process all pending queues that accumulated during main sync
* Called after main sync completes to handle collections that were queued
*/
public static async processPendingQueues(): Promise<void> {
const queuesWithPendingItems = Array.from(this.libraryQueues.entries())
.filter(([, queue]) => queue.queue.length > 0 && !queue.running)
.map(([libraryId]) => libraryId);
if (queuesWithPendingItems.length === 0) {
logger.debug('No pending individual collection syncs to process', {
label: 'Individual Collection Scheduler',
});
return;
}
logger.info(
`Processing ${queuesWithPendingItems.length} library queues with pending individual syncs`,
{
label: 'Individual Collection Scheduler',
librariesWithPendingItems: queuesWithPendingItems.length,
totalPendingCollections: Array.from(this.libraryQueues.values()).reduce(
(sum, queue) => sum + queue.queue.length,
0
),
}
);
// Process each library queue with pending items
for (const libraryId of queuesWithPendingItems) {
try {
await this.processLibraryQueue(libraryId);
} catch (error) {
logger.error(
`Failed to process pending queue for library ${libraryId}: ${error}`,
{
label: 'Individual Collection Scheduler',
libraryId,
error: error instanceof Error ? error.message : String(error),
}
);
}
}
logger.info('Finished processing pending individual collection syncs', {
label: 'Individual Collection Scheduler',
});
}
}
// Export singleton instance
export const individualCollectionScheduler = IndividualCollectionScheduler;