import { getMediaStatus, uploadMedia } from '../../src/actions/media'; import { mediaQueue } from '../../src/lib/queue/index'; import { startMediaWorker } from '../../src/workers/media-processor'; import * as fs from 'fs'; import * as path from 'path'; import * as os from 'os'; import { cleanupMediaRecord } from '../helpers/media-cleanup'; async function runTest() { console.log('๐Ÿงช Starting Media Pipeline Integration Test...'); let exitCode = 1; let mediaId: string | null = null; // 1. Setup: Create a dummy video file (mp4) const testDir = fs.mkdtempSync(path.join(os.tmpdir(), 'test-media-')); const testFilePath = path.join(testDir, 'test_video.mp4'); // Note: In a real environment, we'd use a real small mp4. // For this test, we'll assume a file exists or create a dummy one if possible. if (!fs.existsSync('test_sample.mp4')) { console.error('โŒ Error: Please place a small "test_sample.mp4" file in the project root for testing.'); process.exit(1); } fs.copyFileSync('test_sample.mp4', testFilePath); try { await mediaQueue.drain(true); await mediaQueue.clean(0, 1000, 'failed'); await mediaQueue.clean(0, 1000, 'completed'); console.log('โœ… Queue cleaned'); // 2. Start Worker const worker = await startMediaWorker(); console.log('โœ… Worker started'); // 3. Simulate Upload (Producer) console.log('๐Ÿ“ค Simulating upload...'); const fileMock = { name: 'e2e-pipeline-test_sample.mp4', type: 'video/mp4', size: fs.statSync(testFilePath).size, arrayBuffer: async () => { const fileBuffer = fs.readFileSync(testFilePath); return fileBuffer.buffer.slice( fileBuffer.byteOffset, fileBuffer.byteOffset + fileBuffer.byteLength ); }, }; const record = await uploadMedia(fileMock as any); mediaId = record.media.id; console.log(`โœ… Uploaded. Media ID: ${record.media.id}, Status: ${record.media.status}`); // 4. Wait for Worker to process (Polling) console.log('โณ Waiting for worker to complete processing...'); let attempts = 0; const maxAttempts = 60; // ~60 seconds let completed = false; while (attempts < maxAttempts) { await new Promise(r => setTimeout(r, 1000)); const updatedRecord = await getMediaStatus(record.media.id); if (updatedRecord?.status === 'completed') { console.log('๐ŸŽ‰ Success! Media status is COMPLETED'); console.log('Metadata:', updatedRecord.metadata); console.log('HLS URL generated:', Boolean(updatedRecord.hlsUrl)); completed = true; break; } else if (updatedRecord?.status === 'failed') { console.error('โŒ Processing failed according to DB status'); break; } attempts++; process.stdout.write('.'); } if (!completed) { console.error('\nโŒ Timeout: Worker did not complete processing in time.'); } else { console.log('\nโœจ Integration Test Passed!'); exitCode = 0; } await worker.close(); } catch (err) { console.error('โŒ Test failed with error:', err); } finally { // Cleanup fs.rmSync(testDir, { recursive: true, force: true }); if (mediaId) { await cleanupMediaRecord(mediaId); } await mediaQueue.close(); process.exit(exitCode); } } runTest();