feat(upload): Implement persistent state via metadata for resumability (#50) (#51)

* feat: Enhance chunk upload functionality with configurable retry logic

- Introduced MAX_RETRIES configuration to allow dynamic adjustment of retry attempts for chunk uploads.
- Updated index.html to read MAX_RETRIES from server-side configuration, providing a default value if not set.
- Implemented retry logic in uploadChunkWithRetry method, including exponential backoff and error handling for network issues.
- Added console warnings for invalid or missing MAX_RETRIES values to improve debugging.

This commit improves the robustness of file uploads by allowing configurable retry behavior, enhancing user experience during upload failures.

* feat: Enhance upload functionality with metadata management and improved error handling

- Introduced persistent metadata management for uploads, allowing resumability and better tracking of upload states.
- Added special handling for 404 responses during chunk uploads, logging warnings and marking uploads as complete if previously finished.
- Implemented metadata directory creation and validation in app.js to ensure proper upload management.
- Updated upload.js to include metadata read/write functions, improving the robustness of the upload process.
- Enhanced cleanup routines to handle stale metadata and incomplete uploads, ensuring a cleaner state.

This commit significantly improves the upload process by adding metadata support, enhancing error handling, and ensuring better resource management during uploads.

Fixes #24
This commit is contained in:
Greirson Lee-Thorp 2025-05-04 16:30:16 -07:00 committed by GitHub
parent bf1c9a2dbd
commit b256311822
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 783 additions and 388 deletions

View File

@ -53,9 +53,26 @@
<script defer>
const CHUNK_SIZE = 1024 * 1024; // 1MB chunks
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000;
const AUTO_UPLOAD = ['true', '1', 'yes'].includes('{{AUTO_UPLOAD}}'.toLowerCase());
const RETRY_DELAY = 1000; // 1 second delay between retries
// Read MAX_RETRIES from the injected server value, with a fallback
const MAX_RETRIES_STR = '{{MAX_RETRIES}}';
let maxRetries = 5; // Default value
if (MAX_RETRIES_STR && MAX_RETRIES_STR !== '{{MAX_RETRIES}}') {
const parsedRetries = parseInt(MAX_RETRIES_STR, 10);
if (!isNaN(parsedRetries) && parsedRetries >= 0) {
maxRetries = parsedRetries;
} else {
console.warn(`Invalid MAX_RETRIES value "${MAX_RETRIES_STR}" received from server, defaulting to ${maxRetries}.`);
}
} else {
console.warn('MAX_RETRIES not injected by server, defaulting to 5.');
}
window.MAX_RETRIES = maxRetries; // Assign to window for potential global use/debugging
console.log(`Max retries for chunk uploads: ${window.MAX_RETRIES}`);
const AUTO_UPLOAD_STR = '{{AUTO_UPLOAD}}';
const AUTO_UPLOAD = ['true', '1', 'yes'].includes(AUTO_UPLOAD_STR.toLowerCase());
// Utility function to generate a unique batch ID
function generateBatchId() {
@ -82,12 +99,21 @@
this.lastUploadedBytes = 0;
this.lastUploadTime = null;
this.uploadRate = 0;
this.maxRetries = window.MAX_RETRIES; // Use configured retries
this.retryDelay = RETRY_DELAY; // Use constant
}
async start() {
try {
this.updateProgress(0); // Initial progress update
await this.initUpload();
await this.uploadChunks();
if (this.file.size > 0) { // Only upload chunks if file is not empty
await this.uploadChunks();
} else {
console.log(`Skipping chunk upload for zero-byte file: ${this.file.name}`);
// Server handles zero-byte completion in /init
this.updateProgress(100); // Mark as complete on client too
}
return true;
} catch (error) {
console.error('Upload failed:', error);
@ -139,10 +165,22 @@
async uploadChunks() {
this.createProgressElement();
let currentChunkStartPosition = this.position; // Track start position for retries
while (this.position < this.file.size) {
const chunk = await this.readChunk();
await this.uploadChunk(chunk);
const chunk = await this.readChunk(); // Reads based on current this.position
try {
// Attempt to upload the chunk with retry logic
// Pass the position *before* reading the chunk, as that's the start of the data being sent
await this.uploadChunkWithRetry(chunk, currentChunkStartPosition);
// If successful, update the start position for the *next* chunk read
// this.position is updated internally by readChunk, so currentChunkStartPosition reflects the next read point
currentChunkStartPosition = this.position;
} catch (error) {
// If uploadChunkWithRetry fails after all retries, propagate the error
console.error(`UploadChunks failed after retries for chunk starting at ${currentChunkStartPosition}. File: ${this.file.webkitRelativePath || this.file.name}`);
throw error; // Propagate up to the start() method's catch block
}
}
}
@ -154,25 +192,94 @@
return await blob.arrayBuffer();
}
async uploadChunk(chunk) {
// Remove leading slash from API path before concatenating
async uploadChunkWithRetry(chunk, chunkStartPosition) {
const chunkApiUrlPath = `/api/upload/chunk/${this.uploadId}`;
const chunkApiUrl = chunkApiUrlPath.startsWith('/') ? chunkApiUrlPath.substring(1) : chunkApiUrlPath;
const response = await fetch(window.BASE_URL + chunkApiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
'X-Batch-ID': this.batchId
},
body: chunk
});
let lastError = null;
if (!response.ok) {
throw new Error(`Failed to upload chunk: ${response.statusText}`);
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
if (attempt > 0) {
console.warn(`Retrying chunk (start: ${chunkStartPosition}) upload for ${this.file.webkitRelativePath || this.file.name} (Attempt ${attempt}/${this.maxRetries})...`);
this.updateProgressElementInfo(`Retrying attempt ${attempt}...`, 'var(--warning-color)');
}
// Use AbortController for potential timeout or cancellation during fetch
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 30000); // 30-second timeout per attempt
const response = await fetch(window.BASE_URL + chunkApiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
'X-Batch-ID': this.batchId
// Consider adding 'Content-Range': `bytes ${chunkStartPosition}-${chunkStartPosition + chunk.byteLength - 1}/${this.file.size}`
// If the server supports handling potential duplicate chunks via Content-Range
},
body: chunk,
signal: controller.signal // Add abort signal
});
clearTimeout(timeoutId); // Clear timeout if fetch completes
if (response.ok) {
const data = await response.json();
if (attempt > 0) {
console.log(`Chunk upload successful on retry attempt ${attempt} for ${this.file.webkitRelativePath || this.file.name}`);
}
// Update progress based on server response
// this.position is updated by readChunk(), so progress reflects total uploaded
this.updateProgress(data.progress);
// Success! Exit the retry loop.
this.updateProgressElementInfo('uploading...'); // Reset info message
return;
} else {
// Server responded with an error status (4xx, 5xx)
let errorText = 'Unknown server error';
try {
errorText = await response.text();
} catch (textError) { /* ignore if reading text fails */ }
// --- Add Special 404 Handling ---
if (response.status === 404 && attempt > 0) {
console.warn(`Received 404 Not Found on retry attempt ${attempt} for ${this.file.webkitRelativePath || this.file.name}. Assuming upload completed previously.`);
this.updateProgress(100); // Mark as complete
return; // Exit retry loop successfully
}
// --- End Special 404 Handling ---
lastError = new Error(`Failed to upload chunk: ${response.status} ${response.statusText}. Server response: ${errorText}`);
console.error(`Chunk upload attempt ${attempt} failed: ${lastError.message}`);
this.updateProgressElementInfo(`Attempt ${attempt} failed: ${response.statusText}`, 'var(--danger-color)');
}
} catch (error) {
// Network error, fetch failed completely, or timeout
lastError = error;
if (error.name === 'AbortError') {
console.error(`Chunk upload attempt ${attempt} timed out after 30 seconds.`);
this.updateProgressElementInfo(`Attempt ${attempt} timed out`, 'var(--danger-color)');
} else {
console.error(`Chunk upload attempt ${attempt} failed with network error: ${error.message}`);
this.updateProgressElementInfo(`Attempt ${attempt} network error`, 'var(--danger-color)');
}
}
// If not the last attempt, wait before retrying
if (attempt < this.maxRetries) {
// Exponential backoff: 1s, 2s, 4s, ... but capped
const delay = Math.min(this.retryDelay * Math.pow(2, attempt), 30000); // Max 30s delay
await new Promise(resolve => setTimeout(resolve, delay));
}
}
const data = await response.json();
this.updateProgress(data.progress);
// If we exit the loop, all retries have failed.
// Position reset is tricky. If the server *did* receive a chunk but failed to respond OK,
// simply resending might corrupt data unless the server handles it idempotently.
// Failing the whole upload is often safer.
// this.position = chunkStartPosition; // Re-enable if server can handle duplicate chunks safely
console.error(`Chunk upload failed permanently after ${this.maxRetries} retries for ${this.file.webkitRelativePath || this.file.name}, chunk starting at ${chunkStartPosition}.`);
this.updateProgressElementInfo(`Upload failed after ${this.maxRetries} retries`, 'var(--danger-color)');
throw lastError || new Error(`Chunk upload failed after ${this.maxRetries} retries.`);
}
createProgressElement() {
@ -240,8 +347,12 @@
}
// Update progress info
this.progressElement.infoSpan.textContent = `${rateText} · ${percent < 100 ? 'uploading...' : 'complete'}`;
this.progressElement.detailsSpan.textContent =
const statusText = percent < 100 ? 'uploading...' : 'complete';
// Use the helper for info updates, only update if not showing a retry message
if (!this.progressElement.infoSpan.textContent.startsWith('Retry') && !this.progressElement.infoSpan.textContent.startsWith('Attempt')) {
this.updateProgressElementInfo(`${rateText} · ${statusText}`);
}
this.progressElement.detailsSpan.textContent =
`${formatFileSize(this.position)} of ${formatFileSize(this.file.size)} (${percent.toFixed(1)}%)`;
// Update tracking variables
@ -255,6 +366,31 @@
}
}
}
// Helper to update the info message and color in the progress element
updateProgressElementInfo(message, color = '') {
if (this.progressElement && this.progressElement.infoSpan) {
this.progressElement.infoSpan.textContent = message;
this.progressElement.infoSpan.style.color = color; // Reset if color is empty string
}
}
// Helper to attempt cancellation on the server
async cancelUploadOnServer() {
if (!this.uploadId) return;
console.log(`Attempting to cancel upload ${this.uploadId} on server due to error.`);
try {
const cancelApiUrlPath = `/api/upload/cancel/${this.uploadId}`;
const cancelApiUrl = cancelApiUrlPath.startsWith('/') ? cancelApiUrlPath.substring(1) : cancelApiUrlPath;
// No need to wait for response here, just fire and forget
fetch(window.BASE_URL + cancelApiUrl, { method: 'POST' }).catch(err => {
console.warn(`Sending cancel request failed for upload ${this.uploadId}:`, err);
});
} catch (cancelError) {
// Catch synchronous errors, though unlikely with fetch
console.warn(`Error initiating cancel request for upload ${this.uploadId}:`, cancelError);
} // Add closing brace for try block
}
}
// UI Event Handlers
@ -830,17 +966,6 @@
const savedTheme = localStorage.getItem('theme') ||
(window.matchMedia('(prefers-color-scheme: dark)').matches ? 'dark' : 'light');
setTheme(savedTheme);
document.addEventListener('DOMContentLoaded', function() {
// Rewrite asset URLs to use BASE_URL as prefix if not absolute
const baseUrl = window.BASE_URL;
document.querySelectorAll('link[rel="stylesheet"], link[rel="manifest"], link[rel="icon"]').forEach(link => {
const href = link.getAttribute('href');
if (href && !href.startsWith('http') && !href.startsWith('data:') && !href.startsWith(baseUrl)) {
link.setAttribute('href', baseUrl + href.replace(/^\//, ''));
}
});
});
</script>
</body>
</html>

View File

@ -9,6 +9,7 @@ const cors = require('cors');
const cookieParser = require('cookie-parser');
const path = require('path');
const fs = require('fs');
const fsPromises = require('fs').promises;
const { config, validateConfig } = require('./config');
const logger = require('./utils/logger');
@ -53,6 +54,7 @@ app.get('/', (req, res) => {
let html = fs.readFileSync(path.join(__dirname, '../public', 'index.html'), 'utf8');
html = html.replace(/{{SITE_TITLE}}/g, config.siteTitle);
html = html.replace('{{AUTO_UPLOAD}}', config.autoUpload.toString());
html = html.replace('{{MAX_RETRIES}}', config.clientMaxRetries.toString());
// Ensure baseUrl has a trailing slash for correct asset linking
const baseUrlWithSlash = config.baseUrl.endsWith('/') ? config.baseUrl : config.baseUrl + '/';
html = html.replace(/{{BASE_URL}}/g, baseUrlWithSlash);
@ -88,6 +90,7 @@ app.use((req, res, next) => {
html = html.replace(/{{SITE_TITLE}}/g, config.siteTitle);
if (req.path === '/index.html' || req.path === 'index.html') {
html = html.replace('{{AUTO_UPLOAD}}', config.autoUpload.toString());
html = html.replace('{{MAX_RETRIES}}', config.clientMaxRetries.toString());
}
// Ensure baseUrl has a trailing slash
const baseUrlWithSlash = config.baseUrl.endsWith('/') ? config.baseUrl : config.baseUrl + '/';
@ -111,6 +114,10 @@ app.use((err, req, res, next) => { // eslint-disable-line no-unused-vars
});
});
// --- Add this after config is loaded ---
const METADATA_DIR = path.join(config.uploadDir, '.metadata');
// --- End addition ---
/**
* Initialize the application
* Sets up required directories and validates configuration
@ -122,6 +129,25 @@ async function initialize() {
// Ensure upload directory exists and is writable
await ensureDirectoryExists(config.uploadDir);
// --- Add this section ---
// Ensure metadata directory exists
try {
if (!fs.existsSync(METADATA_DIR)) {
await fsPromises.mkdir(METADATA_DIR, { recursive: true });
logger.info(`Created metadata directory: ${METADATA_DIR}`);
} else {
logger.info(`Metadata directory exists: ${METADATA_DIR}`);
}
// Check writability (optional but good practice)
await fsPromises.access(METADATA_DIR, fs.constants.W_OK);
logger.success(`Metadata directory is writable: ${METADATA_DIR}`);
} catch (err) {
logger.error(`Metadata directory error (${METADATA_DIR}): ${err.message}`);
// Decide if this is fatal. If resumability is critical, maybe throw.
throw new Error(`Failed to access or create metadata directory: ${METADATA_DIR}`);
}
// --- End added section ---
// Log configuration
logger.info(`Maximum file size set to: ${config.maxFileSize / (1024 * 1024)}MB`);

View File

@ -5,10 +5,17 @@ console.log('Loaded ENV:', {
LOCAL_UPLOAD_DIR: process.env.LOCAL_UPLOAD_DIR,
NODE_ENV: process.env.NODE_ENV
});
console.log('Loaded ENV:', {
PORT: process.env.PORT,
UPLOAD_DIR: process.env.UPLOAD_DIR,
LOCAL_UPLOAD_DIR: process.env.LOCAL_UPLOAD_DIR,
NODE_ENV: process.env.NODE_ENV
});
const { validatePin } = require('../utils/security');
const logger = require('../utils/logger');
const fs = require('fs');
const path = require('path');
const { version } = require('../../package.json'); // Get version from package.json
/**
* Environment Variables Reference
@ -35,6 +42,18 @@ const logConfig = (message, level = 'info') => {
console.log(`${prefix} CONFIGURATION: ${message}`);
};
// Default configurations
const DEFAULT_PORT = 3000;
const DEFAULT_CHUNK_SIZE = 1024 * 1024 * 100; // 100MB
const DEFAULT_SITE_TITLE = 'DumbDrop';
const DEFAULT_BASE_URL = 'http://localhost:3000';
const DEFAULT_CLIENT_MAX_RETRIES = 5; // Default retry count
const logAndReturn = (key, value, isDefault = false) => {
logConfig(`${key}: ${value}${isDefault ? ' (default)' : ''}`);
return value;
};
/**
* Determine the upload directory based on environment variables.
* Priority:
@ -94,6 +113,7 @@ ensureLocalUploadDirExists(resolvedUploadDir);
* Loads and validates environment variables
*/
const config = {
// =====================
// =====================
// Server settings
// =====================
@ -101,7 +121,7 @@ const config = {
* Port for the server (default: 3000)
* Set via PORT in .env
*/
port: process.env.PORT || 3000,
port: process.env.PORT || DEFAULT_PORT,
/**
* Node environment (default: 'development')
* Set via NODE_ENV in .env
@ -111,8 +131,9 @@ const config = {
* Base URL for the app (default: http://localhost:${PORT})
* Set via BASE_URL in .env
*/
baseUrl: process.env.BASE_URL || `http://localhost:${process.env.PORT || 3000}`,
baseUrl: process.env.BASE_URL || DEFAULT_BASE_URL,
// =====================
// =====================
// Upload settings
// =====================
@ -138,6 +159,7 @@ const config = {
*/
autoUpload: process.env.AUTO_UPLOAD === 'true',
// =====================
// =====================
// Security
// =====================
@ -147,6 +169,7 @@ const config = {
*/
pin: validatePin(process.env.DUMBDROP_PIN),
// =====================
// =====================
// UI settings
// =====================
@ -154,8 +177,9 @@ const config = {
* Site title (default: 'DumbDrop')
* Set via DUMBDROP_TITLE in .env
*/
siteTitle: process.env.DUMBDROP_TITLE || 'DumbDrop',
siteTitle: process.env.DUMBDROP_TITLE || DEFAULT_SITE_TITLE,
// =====================
// =====================
// Notification settings
// =====================
@ -175,6 +199,7 @@ const config = {
*/
appriseSizeUnit: process.env.APPRISE_SIZE_UNIT,
// =====================
// =====================
// File extensions
// =====================
@ -188,7 +213,30 @@ const config = {
allowedIframeOrigins: process.env.ALLOWED_IFRAME_ORIGINS
? process.env.ALLOWED_IFRAME_ORIGINS.split(',').map(origin => origin.trim()).filter(Boolean)
: null
: null,
/**
* Max number of retries for client-side chunk uploads (default: 5)
* Set via CLIENT_MAX_RETRIES in .env
*/
clientMaxRetries: (() => {
const envValue = process.env.CLIENT_MAX_RETRIES;
const defaultValue = DEFAULT_CLIENT_MAX_RETRIES;
if (envValue === undefined) {
return logAndReturn('CLIENT_MAX_RETRIES', defaultValue, true);
}
const retries = parseInt(envValue, 10);
if (isNaN(retries) || retries < 0) {
logConfig(
`Invalid CLIENT_MAX_RETRIES value: "${envValue}". Using default: ${defaultValue}`,
'warning',
);
return logAndReturn('CLIENT_MAX_RETRIES', defaultValue, true);
}
return logAndReturn('CLIENT_MAX_RETRIES', retries);
})(),
uploadPin: logAndReturn('UPLOAD_PIN', process.env.UPLOAD_PIN || null),
};
console.log(`Upload directory configured as: ${config.uploadDir}`);

View File

@ -1,413 +1,456 @@
/**
* File upload route handlers and batch upload management.
* Handles file uploads, chunked transfers, and folder creation.
* Manages upload sessions, batch timeouts, and cleanup.
* Manages upload sessions using persistent metadata for resumability.
*/
const express = require('express');
const router = express.Router();
const crypto = require('crypto');
const path = require('path');
const fs = require('fs').promises; // Use promise-based fs
const fsSync = require('fs'); // For sync checks like existsSync
const { config } = require('../config');
const logger = require('../utils/logger');
const { getUniqueFilePath, getUniqueFolderPath, sanitizeFilename, sanitizePathPreserveDirs } = require('../utils/fileUtils');
const { getUniqueFilePath, getUniqueFolderPath, sanitizeFilename, sanitizePathPreserveDirs, isValidBatchId } = require('../utils/fileUtils');
const { sendNotification } = require('../services/notifications');
const fs = require('fs');
const { cleanupIncompleteUploads } = require('../utils/cleanup');
const { isDemoMode, createMockUploadResponse } = require('../utils/demoMode');
const { isDemoMode } = require('../utils/demoMode');
// Store ongoing uploads
const uploads = new Map();
// Store folder name mappings for batch uploads with timestamps
// --- Persistence Setup ---
const METADATA_DIR = path.join(config.uploadDir, '.metadata');
// --- In-Memory Maps (Still useful for session-level data) ---
// Store folder name mappings for batch uploads (avoids FS lookups during session)
const folderMappings = new Map();
// Store batch activity timestamps
// Store batch activity timestamps (for cleaning up stale batches/folder mappings)
const batchActivity = new Map();
// Store upload to batch mappings
const uploadToBatch = new Map();
const BATCH_TIMEOUT = 30 * 60 * 1000; // 30 minutes
const BATCH_TIMEOUT = 30 * 60 * 1000; // 30 minutes for batch/folderMapping cleanup
let cleanupInterval;
// --- Helper Functions for Metadata ---
/**
* Start the cleanup interval for inactive batches
* @returns {NodeJS.Timeout} The interval handle
*/
function startBatchCleanup() {
if (cleanupInterval) {
clearInterval(cleanupInterval);
async function readUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.warn(`Attempted to read metadata with invalid uploadId: ${uploadId}`);
return null;
}
cleanupInterval = setInterval(() => {
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try {
const data = await fs.readFile(metaFilePath, 'utf8');
return JSON.parse(data);
} catch (err) {
if (err.code === 'ENOENT') {
return null; // Metadata file doesn't exist - normal case for new/finished uploads
}
logger.error(`Error reading metadata for ${uploadId}: ${err.message}`);
throw err; // Rethrow other errors
}
}
async function writeUploadMetadata(uploadId, metadata) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.error(`Attempted to write metadata with invalid uploadId: ${uploadId}`);
return; // Prevent writing
}
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
metadata.lastActivity = Date.now(); // Update timestamp on every write
try {
// Write atomically if possible (write to temp then rename) for more safety
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
await fs.rename(tempMetaPath, metaFilePath);
} catch (err) {
logger.error(`Error writing metadata for ${uploadId}: ${err.message}`);
// Attempt to clean up temp file if rename failed
try { await fs.unlink(tempMetaPath); } catch (unlinkErr) {/* ignore */}
throw err;
}
}
async function deleteUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.warn(`Attempted to delete metadata with invalid uploadId: ${uploadId}`);
return;
}
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try {
await fs.unlink(metaFilePath);
logger.debug(`Deleted metadata file for upload: ${uploadId}.meta`);
} catch (err) {
if (err.code !== 'ENOENT') { // Ignore if already deleted
logger.error(`Error deleting metadata file ${uploadId}.meta: ${err.message}`);
}
}
}
// --- Batch Cleanup (Focuses on batchActivity map, not primary upload state) ---
let batchCleanupInterval;
function startBatchCleanup() {
if (batchCleanupInterval) clearInterval(batchCleanupInterval);
batchCleanupInterval = setInterval(() => {
const now = Date.now();
logger.info(`Running batch cleanup, checking ${batchActivity.size} active batches`);
logger.info(`Running batch cleanup, checking ${batchActivity.size} active batch sessions`);
let cleanedCount = 0;
for (const [batchId, lastActivity] of batchActivity.entries()) {
if (now - lastActivity >= BATCH_TIMEOUT) {
logger.info(`Cleaning up inactive batch: ${batchId}`);
logger.info(`Cleaning up inactive batch session: ${batchId}`);
batchActivity.delete(batchId);
// Clean up associated folder mappings for this batch
for (const key of folderMappings.keys()) {
if (key.endsWith(`-${batchId}`)) {
folderMappings.delete(key);
}
}
cleanedCount++;
}
}
}, 5 * 60 * 1000); // 5 minutes
return cleanupInterval;
if (cleanedCount > 0) logger.info(`Cleaned up ${cleanedCount} inactive batch sessions.`);
}, 5 * 60 * 1000); // Check every 5 minutes
batchCleanupInterval.unref(); // Allow process to exit if this is the only timer
return batchCleanupInterval;
}
/**
* Stop the batch cleanup interval
*/
function stopBatchCleanup() {
if (cleanupInterval) {
clearInterval(cleanupInterval);
cleanupInterval = null;
if (batchCleanupInterval) {
clearInterval(batchCleanupInterval);
batchCleanupInterval = null;
}
}
// Start cleanup interval unless disabled
if (!process.env.DISABLE_BATCH_CLEANUP) {
startBatchCleanup();
}
// Run cleanup periodically
const CLEANUP_INTERVAL = 5 * 60 * 1000; // 5 minutes
const cleanupTimer = setInterval(() => {
cleanupIncompleteUploads(uploads, uploadToBatch, batchActivity)
.catch(err => logger.error(`Cleanup failed: ${err.message}`));
}, CLEANUP_INTERVAL);
// Handle cleanup timer errors
cleanupTimer.unref(); // Don't keep process alive just for cleanup
process.on('SIGTERM', () => {
clearInterval(cleanupTimer);
// Final cleanup
cleanupIncompleteUploads(uploads, uploadToBatch, batchActivity)
.catch(err => logger.error(`Final cleanup failed: ${err.message}`));
});
/**
* Log the current state of uploads and mappings
* @param {string} context - The context where this log is being called from
*/
function logUploadState(context) {
logger.debug(`Upload State [${context}]:
Active Uploads: ${uploads.size}
Active Batches: ${batchActivity.size}
Folder Mappings: ${folderMappings.size}
Upload-Batch Mappings: ${uploadToBatch.size}
`);
}
/**
* Validate batch ID format
* @param {string} batchId - Batch ID to validate
* @returns {boolean} True if valid
*/
function isValidBatchId(batchId) {
return /^\d+-[a-z0-9]{9}$/.test(batchId);
}
// --- Routes ---
// Initialize upload
router.post('/init', async (req, res) => {
// DEMO MODE CHECK - Bypass persistence if in demo mode
if (isDemoMode()) {
const { filename, fileSize } = req.body;
const uploadId = 'demo-' + crypto.randomBytes(16).toString('hex');
logger.info(`[DEMO] Initialized upload for ${filename} (${fileSize} bytes) with ID ${uploadId}`);
// Simulate zero-byte completion for demo
if (Number(fileSize) === 0) {
logger.success(`[DEMO] Completed zero-byte file upload: ${filename}`);
sendNotification(filename, 0, config); // Still send notification if configured
}
return res.json({ uploadId });
}
const { filename, fileSize } = req.body;
const clientBatchId = req.headers['x-batch-id'];
// --- Basic validations ---
if (!filename) return res.status(400).json({ error: 'Missing filename' });
if (fileSize === undefined || fileSize === null) return res.status(400).json({ error: 'Missing fileSize' });
const size = Number(fileSize);
if (isNaN(size) || size < 0) return res.status(400).json({ error: 'Invalid file size' });
const maxSizeInBytes = config.maxFileSize;
if (size > maxSizeInBytes) return res.status(413).json({ error: 'File too large', limit: maxSizeInBytes });
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
if (clientBatchId && !isValidBatchId(batchId)) return res.status(400).json({ error: 'Invalid batch ID format' });
batchActivity.set(batchId, Date.now()); // Track batch session activity
try {
// Log request details for debugging
if (process.env.DEBUG === 'true' || process.env.NODE_ENV === 'development') {
logger.info(`Upload init request:
Filename: ${filename}
Size: ${fileSize} (${typeof fileSize})
Batch ID: ${clientBatchId || 'none'}
`);
} else {
logger.info(`Upload init request: ${filename} (${fileSize} bytes)`);
}
// Validate required fields with detailed errors
if (!filename) {
return res.status(400).json({
error: 'Missing filename',
details: 'The filename field is required'
});
}
if (fileSize === undefined || fileSize === null) {
return res.status(400).json({
error: 'Missing fileSize',
details: 'The fileSize field is required'
});
}
// Convert fileSize to number if it's a string
const size = Number(fileSize);
if (isNaN(size) || size < 0) { // Changed from size <= 0 to allow zero-byte files
return res.status(400).json({
error: 'Invalid file size',
details: `File size must be a non-negative number, received: ${fileSize} (${typeof fileSize})`
});
}
// Validate file size
const maxSizeInBytes = config.maxFileSize;
if (size > maxSizeInBytes) {
const message = `File size ${size} bytes exceeds limit of ${maxSizeInBytes} bytes`;
logger.warn(message);
return res.status(413).json({
error: 'File too large',
message,
limit: maxSizeInBytes,
limitInMB: Math.floor(maxSizeInBytes / (1024 * 1024))
});
}
// Generate batch ID from header or create new one
const batchId = req.headers['x-batch-id'] || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
// Validate batch ID if provided in header
if (req.headers['x-batch-id'] && !isValidBatchId(batchId)) {
return res.status(400).json({
error: 'Invalid batch ID format',
details: `Batch ID must match format: timestamp-[9 alphanumeric chars], received: ${batchId}`
});
}
// Update batch activity
batchActivity.set(batchId, Date.now());
// Sanitize filename and convert to forward slashes, preserving directory structure
// --- Path handling and Sanitization ---
const sanitizedFilename = sanitizePathPreserveDirs(filename);
const safeFilename = path.normalize(sanitizedFilename)
.replace(/^(\.\.(\/|\\|$))+/, '')
.replace(/\\/g, '/')
.replace(/^\/+/, ''); // Remove leading slashes
// Log sanitized filename
logger.info(`Processing upload: ${safeFilename}`);
// Validate file extension if configured
.replace(/^\/+/, '');
logger.info(`Upload init request for: ${safeFilename}`);
// --- Extension Check ---
if (config.allowedExtensions) {
const fileExt = path.extname(safeFilename).toLowerCase();
if (!config.allowedExtensions.includes(fileExt)) {
return res.status(400).json({
error: 'File type not allowed',
allowedExtensions: config.allowedExtensions,
receivedExtension: fileExt
});
if (fileExt && !config.allowedExtensions.includes(fileExt)) {
logger.warn(`File type not allowed: ${safeFilename} (Extension: ${fileExt})`);
return res.status(400).json({ error: 'File type not allowed', receivedExtension: fileExt });
}
}
// --- Determine Paths & Handle Folders ---
const uploadId = crypto.randomBytes(16).toString('hex');
let filePath = path.join(config.uploadDir, safeFilename);
let fileHandle;
try {
// Handle file/folder paths
const pathParts = safeFilename.split('/').filter(Boolean); // Remove empty parts
if (pathParts.length > 1) {
// The first part is the root folder name from the client
const originalFolderName = pathParts[0];
// Always use a consistent mapping for this batch to avoid collisions
// This ensures all files in the batch go into the same (possibly renamed) root folder
let newFolderName = folderMappings.get(`${originalFolderName}-${batchId}`);
const folderPath = path.join(config.uploadDir, newFolderName || originalFolderName);
if (!newFolderName) {
try {
// Ensure parent directories exist
await fs.promises.mkdir(path.dirname(folderPath), { recursive: true });
// Try to create the target folder
await fs.promises.mkdir(folderPath, { recursive: false });
newFolderName = originalFolderName;
} catch (err) {
if (err.code === 'EEXIST') {
// If the folder exists, generate a unique folder name for this batch
const uniqueFolderPath = await getUniqueFolderPath(folderPath);
newFolderName = path.basename(uniqueFolderPath);
logger.info(`Folder "${originalFolderName}" exists, using "${newFolderName}" for batch ${batchId}`);
} else {
throw err;
}
let finalFilePath = path.join(config.uploadDir, safeFilename);
const pathParts = safeFilename.split('/').filter(Boolean);
if (pathParts.length > 1) {
const originalFolderName = pathParts[0];
let newFolderName = folderMappings.get(`${originalFolderName}-${batchId}`);
const baseFolderPath = path.join(config.uploadDir, newFolderName || originalFolderName);
if (!newFolderName) {
await fs.mkdir(path.dirname(baseFolderPath), { recursive: true });
try {
await fs.mkdir(baseFolderPath, { recursive: false });
newFolderName = originalFolderName;
} catch (err) {
if (err.code === 'EEXIST') {
const uniqueFolderPath = await getUniqueFolderPath(baseFolderPath);
newFolderName = path.basename(uniqueFolderPath);
logger.info(`Folder "${originalFolderName}" exists or conflict, using unique "${newFolderName}" for batch ${batchId}`);
await fs.mkdir(path.join(config.uploadDir, newFolderName), { recursive: true });
} else {
throw err;
}
// Store the mapping for this batch
folderMappings.set(`${originalFolderName}-${batchId}`, newFolderName);
}
// Always apply the mapping for this batch
pathParts[0] = newFolderName;
filePath = path.join(config.uploadDir, ...pathParts);
// Ensure all parent directories exist for the file
await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
folderMappings.set(`${originalFolderName}-${batchId}`, newFolderName);
}
// Get unique file path and handle
const result = await getUniqueFilePath(filePath);
filePath = result.path;
fileHandle = result.handle;
// Create upload entry
uploads.set(uploadId, {
safeFilename: path.relative(config.uploadDir, filePath),
filePath,
fileSize: size,
bytesReceived: 0,
writeStream: fileHandle.createWriteStream()
});
// Associate upload with batch
uploadToBatch.set(uploadId, batchId);
logger.info(`Initialized upload for ${path.relative(config.uploadDir, filePath)} (${size} bytes)`);
// Log state after initialization
logUploadState('After Upload Init');
// Handle zero-byte files immediately
if (size === 0) {
const upload = uploads.get(uploadId);
upload.writeStream.end();
uploads.delete(uploadId);
logger.success(`Completed zero-byte file upload: ${upload.safeFilename}`);
sendNotification(upload.safeFilename, 0, config);
}
// Send response
return res.json({ uploadId });
} catch (err) {
if (fileHandle) {
await fileHandle.close().catch(() => {});
fs.promises.unlink(filePath).catch(() => {});
}
throw err;
pathParts[0] = newFolderName;
finalFilePath = path.join(config.uploadDir, ...pathParts);
await fs.mkdir(path.dirname(finalFilePath), { recursive: true });
} else {
await fs.mkdir(config.uploadDir, { recursive: true }); // Ensure base upload dir exists
}
// --- Check Final Path Collision & Get Unique Name if Needed ---
let checkPath = finalFilePath;
let counter = 1;
while (fsSync.existsSync(checkPath)) {
logger.warn(`Final destination file already exists: ${checkPath}. Generating unique name.`);
const dir = path.dirname(finalFilePath);
const ext = path.extname(finalFilePath);
const baseName = path.basename(finalFilePath, ext);
checkPath = path.join(dir, `${baseName} (${counter})${ext}`);
counter++;
}
if (checkPath !== finalFilePath) {
logger.info(`Using unique final path: ${checkPath}`);
finalFilePath = checkPath;
// If path changed, ensure directory exists (might be needed if baseName contained '/')
await fs.mkdir(path.dirname(finalFilePath), { recursive: true });
}
const partialFilePath = finalFilePath + '.partial';
// --- Create and Persist Metadata ---
const metadata = {
uploadId,
originalFilename: safeFilename, // Store the path as received by client
filePath: finalFilePath, // The final, possibly unique, path
partialFilePath,
fileSize: size,
bytesReceived: 0,
batchId,
createdAt: Date.now(),
lastActivity: Date.now()
};
await writeUploadMetadata(uploadId, metadata);
logger.info(`Initialized persistent upload: ${uploadId} for ${safeFilename} -> ${finalFilePath}`);
// --- Handle Zero-Byte Files --- // (Important: Handle *after* metadata potentially exists)
if (size === 0) {
try {
await fs.writeFile(finalFilePath, ''); // Create the empty file
logger.success(`Completed zero-byte file upload: ${metadata.originalFilename} as ${finalFilePath}`);
await deleteUploadMetadata(uploadId); // Clean up metadata since it's done
sendNotification(metadata.originalFilename, 0, config);
} catch (writeErr) {
logger.error(`Failed to create zero-byte file ${finalFilePath}: ${writeErr.message}`);
await deleteUploadMetadata(uploadId).catch(() => {}); // Attempt cleanup on error
throw writeErr; // Let the main catch block handle it
}
}
res.json({ uploadId });
} catch (err) {
logger.error(`Upload initialization failed:
Error: ${err.message}
Stack: ${err.stack}
Filename: ${filename}
Size: ${fileSize}
Batch ID: ${clientBatchId || 'none'}
`);
return res.status(500).json({
error: 'Failed to initialize upload',
details: err.message
});
logger.error(`Upload initialization failed: ${err.message} ${err.stack}`);
return res.status(500).json({ error: 'Failed to initialize upload', details: err.message });
}
});
// Upload chunk
router.post('/chunk/:uploadId', express.raw({
limit: '10mb',
limit: config.maxFileSize + (10 * 1024 * 1024), // Generous limit for raw body
type: 'application/octet-stream'
}), async (req, res) => {
const { uploadId } = req.params;
const upload = uploads.get(uploadId);
const chunkSize = req.body.length;
const batchId = req.headers['x-batch-id'];
if (!upload) {
logger.warn(`Upload not found: ${uploadId}, Batch ID: ${batchId || 'none'}`);
return res.status(404).json({ error: 'Upload not found' });
// DEMO MODE CHECK
if (isDemoMode()) {
const { uploadId } = req.params;
logger.debug(`[DEMO] Received chunk for ${uploadId}`);
// Fake progress - requires knowing file size which isn't easily available here in demo
const demoProgress = Math.min(100, Math.random() * 100); // Placeholder
return res.json({ bytesReceived: 0, progress: demoProgress });
}
const { uploadId } = req.params;
let chunk = req.body;
let chunkSize = chunk.length;
const clientBatchId = req.headers['x-batch-id']; // Logged but not used directly here
if (!chunkSize) return res.status(400).json({ error: 'Empty chunk received' });
let metadata;
let fileHandle;
try {
// Update batch activity if batch ID provided
if (batchId && isValidBatchId(batchId)) {
batchActivity.set(batchId, Date.now());
}
metadata = await readUploadMetadata(uploadId);
// Write chunk
await new Promise((resolve, reject) => {
upload.writeStream.write(Buffer.from(req.body), (err) => {
if (err) reject(err);
else resolve();
});
});
upload.bytesReceived += chunkSize;
// Calculate progress, ensuring it doesn't exceed 100%
const progress = Math.min(
Math.round((upload.bytesReceived / upload.fileSize) * 100),
100
);
logger.debug(`Chunk received:
File: ${upload.safeFilename}
Progress: ${progress}%
Bytes Received: ${upload.bytesReceived}/${upload.fileSize}
Chunk Size: ${chunkSize}
Upload ID: ${uploadId}
Batch ID: ${batchId || 'none'}
`);
// Check if upload is complete
if (upload.bytesReceived >= upload.fileSize) {
await new Promise((resolve, reject) => {
upload.writeStream.end((err) => {
if (err) reject(err);
else resolve();
});
});
uploads.delete(uploadId);
// Format completion message based on debug mode
if (process.env.DEBUG === 'true' || process.env.NODE_ENV === 'development') {
logger.success(`Upload completed:
File: ${upload.safeFilename}
Size: ${upload.fileSize}
Upload ID: ${uploadId}
Batch ID: ${batchId || 'none'}
`);
} else {
logger.success(`Upload completed: ${upload.safeFilename} (${upload.fileSize} bytes)`);
if (!metadata) {
logger.warn(`Upload metadata not found for chunk request: ${uploadId}. Client Batch ID: ${clientBatchId || 'none'}. Upload may be complete or cancelled.`);
// Check if the final file exists as a fallback for completed uploads
// This is a bit fragile, but handles cases where metadata was deleted slightly early
try {
// Need to guess the final path - THIS IS NOT ROBUST
// A better approach might be needed if this is common
// For now, just return 404
// await fs.access(potentialFinalPath);
// return res.json({ bytesReceived: fileSizeGuess, progress: 100 });
return res.status(404).json({ error: 'Upload session not found or already completed' });
} catch (finalCheckErr) {
return res.status(404).json({ error: 'Upload session not found or already completed' });
}
// Send notification
sendNotification(upload.safeFilename, upload.fileSize, config);
logUploadState('After Upload Complete');
}
res.json({
bytesReceived: upload.bytesReceived,
progress
});
// Update batch activity using metadata's batchId
if (metadata.batchId && isValidBatchId(metadata.batchId)) {
batchActivity.set(metadata.batchId, Date.now());
}
// --- Sanity Checks & Idempotency ---
if (metadata.bytesReceived >= metadata.fileSize) {
logger.warn(`Received chunk for already completed upload ${uploadId} (${metadata.originalFilename}). Finalizing again if needed.`);
// Ensure finalization if possible, then return success
try {
await fs.access(metadata.filePath); // Check if final file exists
logger.info(`Upload ${uploadId} already finalized at ${metadata.filePath}.`);
} catch (accessErr) {
// Final file doesn't exist, attempt rename
try {
await fs.rename(metadata.partialFilePath, metadata.filePath);
logger.info(`Finalized ${uploadId} on redundant chunk request (renamed ${metadata.partialFilePath} -> ${metadata.filePath}).`);
} catch (renameErr) {
if (renameErr.code === 'ENOENT') {
logger.warn(`Partial file ${metadata.partialFilePath} missing during redundant chunk finalization for ${uploadId}.`);
} else {
logger.error(`Error finalizing ${uploadId} on redundant chunk: ${renameErr.message}`);
}
}
}
// Regardless of rename outcome, delete metadata if it still exists
await deleteUploadMetadata(uploadId);
return res.json({ bytesReceived: metadata.fileSize, progress: 100 });
}
// Prevent writing beyond expected file size (simple protection)
if (metadata.bytesReceived + chunkSize > metadata.fileSize) {
logger.warn(`Chunk for ${uploadId} exceeds expected file size. Received ${metadata.bytesReceived + chunkSize}, expected ${metadata.fileSize}. Truncating chunk.`);
const bytesToWrite = metadata.fileSize - metadata.bytesReceived;
chunk = chunk.slice(0, bytesToWrite);
chunkSize = chunk.length;
if (chunkSize <= 0) { // If we already have exactly the right amount
logger.info(`Upload ${uploadId} already has expected bytes. Skipping write, proceeding to finalize.`);
// Skip write, proceed to finalization check below
metadata.bytesReceived = metadata.fileSize; // Ensure state is correct for finalization
} else {
logger.info(`Truncated chunk for ${uploadId} to ${chunkSize} bytes.`);
}
}
// --- Write Chunk (Append Mode) --- // Only write if chunk has size after potential truncation
if (chunkSize > 0) {
fileHandle = await fs.open(metadata.partialFilePath, 'a');
const writeResult = await fileHandle.write(chunk);
await fileHandle.close(); // Close immediately
if (writeResult.bytesWritten !== chunkSize) {
// This indicates a partial write, which is problematic.
logger.error(`Partial write for chunk ${uploadId}! Expected ${chunkSize}, wrote ${writeResult.bytesWritten}. Disk full?`);
// How to recover? Maybe revert bytesReceived? For now, throw.
throw new Error(`Failed to write full chunk for ${uploadId}`);
}
metadata.bytesReceived += writeResult.bytesWritten;
}
// --- Update State --- (bytesReceived updated above or set if truncated to zero)
const progress = metadata.fileSize === 0 ? 100 :
Math.min( Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
logger.debug(`Chunk written for ${uploadId}: ${metadata.bytesReceived}/${metadata.fileSize} (${progress}%)`);
// --- Persist Updated Metadata (Before potential finalization) ---
await writeUploadMetadata(uploadId, metadata);
// --- Check for Completion --- // Now happens after metadata update
if (metadata.bytesReceived >= metadata.fileSize) {
logger.info(`Upload ${uploadId} (${metadata.originalFilename}) completed ${metadata.bytesReceived} bytes.`);
try {
await fs.rename(metadata.partialFilePath, metadata.filePath);
logger.success(`Upload completed and finalized: ${metadata.originalFilename} as ${metadata.filePath} (${metadata.fileSize} bytes)`);
await deleteUploadMetadata(uploadId); // Clean up metadata file AFTER successful rename
sendNotification(metadata.originalFilename, metadata.fileSize, config);
} catch (renameErr) {
if (renameErr.code === 'ENOENT') {
logger.warn(`Partial file ${metadata.partialFilePath} not found during finalization for ${uploadId}. Assuming already finalized elsewhere.`);
// Attempt to delete metadata anyway if partial is gone
await deleteUploadMetadata(uploadId).catch(() => {});
} else {
logger.error(`CRITICAL: Failed to rename partial file ${metadata.partialFilePath} to ${metadata.filePath}: ${renameErr.message}`);
// Keep metadata and partial file for manual recovery.
// Return success to client as data is likely there, but log server issue.
}
}
}
res.json({ bytesReceived: metadata.bytesReceived, progress });
} catch (err) {
logger.error(`Chunk upload failed:
Error: ${err.message}
Stack: ${err.stack}
File: ${upload.safeFilename}
Upload ID: ${uploadId}
Batch ID: ${batchId || 'none'}
Bytes Received: ${upload.bytesReceived}/${upload.fileSize}
`);
res.status(500).json({ error: 'Failed to process chunk' });
// Ensure file handle is closed on error
if (fileHandle) {
await fileHandle.close().catch(closeErr => logger.error(`Error closing file handle for ${uploadId} after error: ${closeErr.message}`));
}
logger.error(`Chunk upload failed for ${uploadId}: ${err.message} ${err.stack}`);
// Don't delete metadata on generic chunk errors, let client retry or cleanup handle stale files
res.status(500).json({ error: 'Failed to process chunk', details: err.message });
}
});
// Cancel upload
router.post('/cancel/:uploadId', async (req, res) => {
const { uploadId } = req.params;
const upload = uploads.get(uploadId);
if (upload) {
upload.writeStream.end();
try {
await fs.promises.unlink(upload.filePath);
} catch (err) {
logger.error(`Failed to delete incomplete upload: ${err.message}`);
}
uploads.delete(uploadId);
uploadToBatch.delete(uploadId);
logger.info(`Upload cancelled: ${upload.safeFilename}`);
// DEMO MODE CHECK
if (isDemoMode()) {
logger.info(`[DEMO] Upload cancelled: ${req.params.uploadId}`);
return res.json({ message: 'Upload cancelled (Demo)' });
}
res.json({ message: 'Upload cancelled' });
const { uploadId } = req.params;
logger.info(`Received cancel request for upload: ${uploadId}`);
try {
const metadata = await readUploadMetadata(uploadId);
if (metadata) {
// Delete partial file first
try {
await fs.unlink(metadata.partialFilePath);
logger.info(`Deleted partial file on cancellation: ${metadata.partialFilePath}`);
} catch (unlinkErr) {
if (unlinkErr.code !== 'ENOENT') { // Ignore if already gone
logger.error(`Failed to delete partial file ${metadata.partialFilePath} on cancel: ${unlinkErr.message}`);
}
}
// Then delete metadata file
await deleteUploadMetadata(uploadId);
logger.info(`Upload cancelled and cleaned up: ${uploadId} (${metadata.originalFilename})`);
} else {
logger.warn(`Cancel request for non-existent or already completed upload: ${uploadId}`);
}
res.json({ message: 'Upload cancelled or already complete' });
} catch (err) {
logger.error(`Error during upload cancellation for ${uploadId}: ${err.message}`);
res.status(500).json({ error: 'Failed to cancel upload' });
}
});
module.exports = {
router,
startBatchCleanup,
stopBatchCleanup,
// Export for testing
batchActivity,
BATCH_TIMEOUT
// Export for testing if required
readUploadMetadata,
writeUploadMetadata,
deleteUploadMetadata
};

View File

@ -4,23 +4,22 @@
* Provides cleanup task registration and execution system.
*/
const fs = require('fs');
const fs = require('fs').promises;
const path = require('path');
const logger = require('./logger');
const { config } = require('../config');
/**
* Stores cleanup tasks that need to be run during shutdown
* @type {Set<Function>}
*/
const cleanupTasks = new Set();
const METADATA_DIR = path.join(config.uploadDir, '.metadata');
const UPLOAD_TIMEOUT = config.uploadTimeout || 30 * 60 * 1000; // Use a config or default (e.g., 30 mins)
let cleanupTasks = [];
/**
* Register a cleanup task to be executed during shutdown
* @param {Function} task - Async function to be executed during cleanup
*/
function registerCleanupTask(task) {
cleanupTasks.add(task);
cleanupTasks.push(task);
}
/**
@ -28,7 +27,7 @@ function registerCleanupTask(task) {
* @param {Function} task - Task to remove
*/
function removeCleanupTask(task) {
cleanupTasks.delete(task);
cleanupTasks = cleanupTasks.filter((t) => t !== task);
}
/**
@ -37,7 +36,7 @@ function removeCleanupTask(task) {
* @returns {Promise<void>}
*/
async function executeCleanup(timeout = 1000) {
const taskCount = cleanupTasks.size;
const taskCount = cleanupTasks.length;
if (taskCount === 0) {
logger.info('No cleanup tasks to execute');
return;
@ -49,7 +48,7 @@ async function executeCleanup(timeout = 1000) {
// Run all cleanup tasks in parallel with timeout
await Promise.race([
Promise.all(
Array.from(cleanupTasks).map(async (task) => {
cleanupTasks.map(async (task) => {
try {
await Promise.race([
task(),
@ -80,7 +79,7 @@ async function executeCleanup(timeout = 1000) {
}
} finally {
// Clear all tasks regardless of success/failure
cleanupTasks.clear();
cleanupTasks = [];
}
}
@ -113,7 +112,7 @@ async function cleanupIncompleteUploads(uploads, uploadToBatch, batchActivity) {
// Delete incomplete file
try {
await fs.promises.unlink(upload.filePath);
await fs.unlink(upload.filePath);
logger.info(`Cleaned up incomplete upload: ${upload.safeFilename}`);
} catch (err) {
if (err.code !== 'ENOENT') {
@ -138,31 +137,173 @@ async function cleanupIncompleteUploads(uploads, uploadToBatch, batchActivity) {
}
}
/**
* Clean up stale/incomplete uploads based on metadata files.
*/
async function cleanupIncompleteMetadataUploads() {
logger.info('Running cleanup for stale metadata/partial uploads...');
let cleanedCount = 0;
let checkedCount = 0;
try {
// Ensure metadata directory exists before trying to read it
try {
await fs.access(METADATA_DIR);
} catch (accessErr) {
if (accessErr.code === 'ENOENT') {
logger.info('Metadata directory does not exist, skipping metadata cleanup.');
return;
}
throw accessErr; // Rethrow other access errors
}
const files = await fs.readdir(METADATA_DIR);
const now = Date.now();
for (const file of files) {
if (file.endsWith('.meta')) {
checkedCount++;
const uploadId = file.replace('.meta', '');
const metaFilePath = path.join(METADATA_DIR, file);
let metadata;
try {
const data = await fs.readFile(metaFilePath, 'utf8');
metadata = JSON.parse(data);
// Check inactivity based on lastActivity timestamp in metadata
if (now - (metadata.lastActivity || metadata.createdAt || 0) > UPLOAD_TIMEOUT) {
logger.warn(`Found stale upload metadata: ${file}. Last activity: ${new Date(metadata.lastActivity || metadata.createdAt)}`);
// Attempt to delete partial file
if (metadata.partialFilePath) {
try {
await fs.unlink(metadata.partialFilePath);
logger.info(`Deleted stale partial file: ${metadata.partialFilePath}`);
} catch (unlinkPartialErr) {
if (unlinkPartialErr.code !== 'ENOENT') { // Ignore if already gone
logger.error(`Failed to delete stale partial file ${metadata.partialFilePath}: ${unlinkPartialErr.message}`);
}
}
}
// Attempt to delete metadata file
try {
await fs.unlink(metaFilePath);
logger.info(`Deleted stale metadata file: ${file}`);
cleanedCount++;
} catch (unlinkMetaErr) {
logger.error(`Failed to delete stale metadata file ${metaFilePath}: ${unlinkMetaErr.message}`);
}
}
} catch (readErr) {
logger.error(`Error reading or parsing metadata file ${metaFilePath} during cleanup: ${readErr.message}. Skipping.`);
// Optionally attempt to delete the corrupt meta file?
// await fs.unlink(metaFilePath).catch(()=>{});
}
} else if (file.endsWith('.tmp')) {
// Clean up potential leftover temp metadata files
const tempMetaPath = path.join(METADATA_DIR, file);
try {
const stats = await fs.stat(tempMetaPath);
if (now - stats.mtime.getTime() > UPLOAD_TIMEOUT) { // If temp file is also old
logger.warn(`Deleting stale temporary metadata file: ${file}`);
await fs.unlink(tempMetaPath);
}
} catch (statErr) {
if (statErr.code !== 'ENOENT') { // Ignore if already gone
logger.error(`Error checking temporary metadata file ${tempMetaPath}: ${statErr.message}`);
}
}
}
}
if (checkedCount > 0 || cleanedCount > 0) {
logger.info(`Metadata cleanup finished. Checked: ${checkedCount}, Cleaned stale: ${cleanedCount}.`);
}
} catch (err) {
// Handle errors reading the METADATA_DIR itself
if (err.code === 'ENOENT') {
logger.info('Metadata directory not found during cleanup scan.'); // Should have been created on init
} else {
logger.error(`Error during metadata cleanup scan: ${err.message}`);
}
}
// Also run empty folder cleanup
await cleanupEmptyFolders(config.uploadDir);
}
// Schedule the new cleanup function
const METADATA_CLEANUP_INTERVAL = 15 * 60 * 1000; // e.g., every 15 minutes
let metadataCleanupTimer = setInterval(cleanupIncompleteMetadataUploads, METADATA_CLEANUP_INTERVAL);
metadataCleanupTimer.unref(); // Allow process to exit if this is the only timer
process.on('SIGTERM', () => clearInterval(metadataCleanupTimer));
process.on('SIGINT', () => clearInterval(metadataCleanupTimer));
/**
* Recursively remove empty folders
* @param {string} dir - Directory to clean
*/
async function cleanupEmptyFolders(dir) {
try {
const files = await fs.promises.readdir(dir);
// Avoid trying to clean the special .metadata directory itself
if (path.basename(dir) === '.metadata') {
logger.debug(`Skipping cleanup of metadata directory: ${dir}`);
return;
}
const files = await fs.readdir(dir);
for (const file of files) {
const fullPath = path.join(dir, file);
const stats = await fs.promises.stat(fullPath);
// Skip the metadata directory during traversal
if (path.basename(fullPath) === '.metadata') {
logger.debug(`Skipping traversal into metadata directory: ${fullPath}`);
continue;
}
let stats;
try {
stats = await fs.stat(fullPath);
} catch (statErr) {
if (statErr.code === 'ENOENT') continue; // File might have been deleted concurrently
throw statErr;
}
if (stats.isDirectory()) {
await cleanupEmptyFolders(fullPath);
// Check if directory is empty after cleaning subdirectories
const remaining = await fs.promises.readdir(fullPath);
let remaining = [];
try {
remaining = await fs.readdir(fullPath);
} catch (readErr) {
if (readErr.code === 'ENOENT') continue; // Directory was deleted
throw readErr;
}
if (remaining.length === 0) {
await fs.promises.rmdir(fullPath);
logger.info(`Removed empty directory: ${fullPath}`);
// Make sure we don't delete the main upload dir
if (fullPath !== path.resolve(config.uploadDir)) {
try {
await fs.rmdir(fullPath);
logger.info(`Removed empty directory: ${fullPath}`);
} catch (rmErr) {
if (rmErr.code !== 'ENOENT') { // Ignore if already deleted
logger.error(`Failed to remove supposedly empty directory ${fullPath}: ${rmErr.message}`);
}
}
}
}
}
}
} catch (err) {
logger.error(`Failed to clean empty folders: ${err.message}`);
if (err.code !== 'ENOENT') { // Ignore if dir was already deleted
logger.error(`Failed to clean empty folders in ${dir}: ${err.message}`);
}
}
}
@ -171,5 +312,6 @@ module.exports = {
removeCleanupTask,
executeCleanup,
cleanupIncompleteUploads,
cleanupIncompleteMetadataUploads,
cleanupEmptyFolders
};

View File

@ -160,6 +160,16 @@ function sanitizePathPreserveDirs(filePath) {
.join('/');
}
/**
* Validate batch ID format
* @param {string} batchId - Batch ID to validate
* @returns {boolean} True if valid (matches timestamp-9_alphanumeric format)
*/
function isValidBatchId(batchId) {
if (!batchId) return false;
return /^\d+-[a-z0-9]{9}$/.test(batchId);
}
module.exports = {
formatFileSize,
calculateDirectorySize,
@ -167,5 +177,6 @@ module.exports = {
getUniqueFilePath,
getUniqueFolderPath,
sanitizeFilename,
sanitizePathPreserveDirs
sanitizePathPreserveDirs,
isValidBatchId
};