Jelajahi Sumber

重启之后的重新开发前文档

Y7000\张扬阳 2 minggu lalu
induk
melakukan
67e33e907d

+ 67 - 0
.claude/plans/wiggly-tinkering-origami-agent-abb4ef1b1734c439f.md

@@ -0,0 +1,67 @@
+# Media Pipeline Prototype Implementation Plan
+
+## 1. Overview
+This plan outlines the implementation of a prototype media processing pipeline, following the "non-blocking" principle described in `ARCHITECTURE.md`. The goal is to move from file upload to background transcoding using MinIO, BullMQ, and FFmpeg.
+
+## 2. Data Flow
+**User Upload** $\rightarrow$ **Next.js Server Action** (Multipart Form Data) $\rightarrow$ **MinIO** (Store original file + create DB record `status: PENDING`) $\rightarrow$ **BullMQ Producer** (Add job to queue with metadata) $\rightarrow$ **Redis** (Job persistence) $\rightarrow$ **FFmpeg Worker** (Consumes job, downloads from MinIO, transcodes) $\rightarrow$ **MinIO** (Upload processed variants) $\rightarrow$ **Database Update** (`status: COMPLETED`, store variant URLs).
+
+## 3. Proposed File Structure
+```text
+src/
+├── app/
+│   └── api/
+│       └── media/
+│           └── upload/          # Route handler for large file uploads (optional fallback)
+├── actions/
+│   └── media.ts                 # Server Actions: uploadMedia, getProcessingStatus
+├── lib/
+│   ├── minio.ts                 # MinIO client singleton & utility functions
+│   ├── queue/
+│   │   ├── connection.ts        # Redis connection config
+│   │   ├── producer.ts          # BullMQ job addition logic
+│   │   └── worker-types.ts      # Shared interfaces for jobs
+│   └── ffmpeg/                  # (Shared types/utils for FFmpeg)
+├── workers/
+│   └── media-processor.ts       # Independent Node.js process consuming BullMQ jobs
+└── db/
+    └── schema/
+        └── media.ts             # New schema: Media files, variants, processing status
+```
+
+## 4. Key Components & Implementation Details
+
+### A. Storage & Database (MinIO + Drizzle)
+- **Schema**: `media` table tracking `id`, `original_key`, `mime_type`, `status` (`pending`, `processing`, `completed`, `failed`), and a JSONB column for `variants` (resolutions, URLs).
+- **Direct Uploads**: Server Action receives the file, streams it to MinIO via the `minio` SDK, then initiates the queue.
+
+### B. Asynchronous Queue (BullMQ + Redis)
+- **Job Payload**: `{ mediaId: string, originalKey: string, filename: string }`.
+- **Progress Tracking**: The worker will use BullMQ's `job.updateProgress(n)` to report percentage. This can be polled via a Server Action or pushed via WebSockets/SSE in the future.
+
+### C. FFmpeg Worker (The "Heavy Lifter")
+- A standalone script (`src/workers/media-processor.ts`) that runs as a separate process.
+- **Steps**:
+  1. Pull job from queue.
+  2. Download original file from MinIO to `/tmp`.
+  3. Run FFmpeg command (e.g., `ffmpeg -i input -vf scale=-1:720 ...`).
+  4. Upload resulting `.mp4` or HLS segments back to MinIO.
+  5. Update PostgreSQL via Drizzle with the new variant metadata and set status to `completed`.
+
+## 5. Testing Strategy
+1. **Unit Tests**: Verify MinIO upload utility and BullMQ job creation logic using mocks.
+2. **Integration Test (Local)**: 
+   - Spin up local Docker containers for MinIO, Redis, and Postgres.
+   - Execute the Server Action with a sample `.mp4`.
+   - Monitor Redis to ensure the job is queued.
+   - Run the worker and verify:
+     - FFmpeg successfully generates files in `/tmp`.
+     - Files appear in MinIO.
+     - Database record status changes from `pending` $\rightarrow$ `processing` $\rightarrow$ `completed`.
+3. **Failure Resilience**: Simulate a worker crash during transcoding to ensure BullMQ retries the job or marks it as `failed`.
+
+## 6. Success Criteria
+- [ ] User receives immediate "Upload Successful" after file reaches MinIO.
+- [ ] Database shows the file record with `status: pending`.
+- [ ] Background worker picks up the task and updates status to `processing`.
+- [ ] Final output variants are accessible via MinIO URLs and recorded in the DB.

+ 2 - 1
.claude/settings.local.json

@@ -5,7 +5,8 @@
       "Bash(npm init *)",
       "Bash(npm install *)",
       "Bash(npx tsx *)",
-      "Bash(npx drizzle-kit *)"
+      "Bash(npx drizzle-kit *)",
+      "Bash(npx vitest *)"
     ]
   }
 }

+ 1 - 0
node_modules/.vite/vitest/da39a3ee5e6b4b0d3255bfef95601890afd80709/results.json

@@ -0,0 +1 @@
+{"version":"4.1.4","results":[[":tests/auth/permission.test.ts",{"duration":979.8820999999999,"failed":false}]]}

+ 3 - 1
src/db/index.ts

@@ -1,6 +1,8 @@
 import { drizzle } from 'drizzle-orm/node-postgres';
 import { Pool } from 'pg';
 import * as dotenv from 'dotenv';
+import * as authSchema from './schema/auth';
+import * as resourceSchema from './schema/resource';
 
 dotenv.config();
 
@@ -8,7 +10,7 @@ const pool = new Pool({
   connectionString: process.env.DATABASE_URL,
 });
 
-export const db = drizzle(pool);
+export const db = drizzle(pool, { schema: { ...authSchema, ...resourceSchema } });
 
 // 用于测试连接的函数
 export async function testConnection() {

+ 77 - 38
src/db/seed.ts

@@ -1,94 +1,133 @@
 import { db } from './index';
 import { users, groups, roles, permissions, userGroups, groupRoles, rolePermissions, userRoles, resources, aclRules } from './schema/auth';
 import { resources as resourceSchema, aclRules as aclRulesSchema } from './schema/resource';
-import { eq } from 'drizzle-orm';
+import { eq, and } from 'drizzle-orm';
 
 async function seed() {
-  console.log('🌱 开始执行 Seed 脚本...');
+  console.log('🌱 开始执行 Seed 脚本 (稳健 Upsert 模式)...');
 
   try {
-    // 1. 清理旧数据 (为了保证多次运行 seed 的幂等性)
-    // 注意:在生产环境严禁这样做,这里仅用于开发测试
-    console.log('🧹 清理现有数据...');
-    // 由于存在外键约束,需要按顺序删除或使用 TRUNCATE
-    // 这里简单处理,直接尝试删除(实际开发中建议用更优雅的清理方式)
-    // 为了演示方便,我们假设是干净的环境
-
-    // 2. 创建基础角色和权限
-    console.log('🔑 创建基础角色与权限...');
+    // --- 1. 创建基础角色与权限 ---
+    console.log('🔑 正在同步角色与权限...');
+
     const [adminRole] = await db.insert(roles).values({
       name: 'admin',
       description: '系统管理员,拥有最高权限'
+    }).onConflictDoUpdate({
+      target: roles.name,
+      set: { description: '系统管理员,拥有最高权限' }
     }).returning();
 
     const [editorRole] = await db.insert(roles).values({
       name: 'editor',
       description: '内容编辑者,可以读写资源'
+    }).onConflictDoUpdate({
+      target: roles.name,
+      set: { description: '内容编辑者,可以读写资源' }
     }).returning();
 
     const [viewerRole] = await db.insert(roles).values({
       name: 'viewer',
       description: '普通查看者,仅限读取'
+    }).onConflictDoUpdate({
+      target: roles.name,
+      set: { description: '普通查看者,仅限读取' }
     }).returning();
 
+    // 权限定义 (使用 upsert 确保唯一性)
     const [readPerm] = await db.insert(permissions).values({
       action: 'read',
       resourceType: 'document'
-    }).returning();
+    }).onConflictDoNothing().returning();
 
     const [writePerm] = await db.insert(permissions).values({
       action: 'write',
       resourceType: 'document'
-    }).returning();
+    }).onConflictDoNothing().returning();
 
-    // 绑定权限到角色
+    // 绑定权限到角色 (使用 upsert 避免重复插入关联)
     await db.insert(rolePermissions).values([
       { roleId: adminRole.id, permissionId: readPerm.id },
       { roleId: adminRole.id, permissionId: writePerm.id },
       { roleId: editorRole.id, permissionId: readPerm.id },
       { roleId: editorRole.id, permissionId: writePerm.id },
       { roleId: viewerRole.id, permissionId: readPerm.id },
-    ]);
+    ]).onConflictDoNothing();
+
+    // --- 2. 创建用户与组 ---
+    console.log('👥 正在同步测试用户与组织...');
 
-    // 3. 创建用户和组
-    console.log('👥 创建测试用户与组织...');
     const [adminUser] = await db.insert(users).values({
       email: 'admin@ekb.com',
       name: 'System Admin',
-      passwordHash: 'hashed_password_here' // 实际应使用 bcrypt/argon2
+      passwordHash: 'hashed_password_here'
+    }).onConflictDoUpdate({
+      target: users.email,
+      set: { name: 'System Admin' }
     }).returning();
 
     const [testUser] = await db.insert(users).values({
       email: 'tester@ekb.com',
       name: 'Test User',
       passwordHash: 'hashed_password_here'
+    }).onConflictDoUpdate({
+      target: users.email,
+      set: { name: 'Test User' }
     }).returning();
 
     const [engGroup] = await db.insert(groups).values({
       name: 'Engineering Department'
-    }).returning();
+    }).onConflictDoNothing().returning();
 
-    // 将 admin 加入工程组,并赋予 editor 角色
-    await db.insert(userGroups).values({ userId: adminUser.id, groupId: engGroup.id });
-    await db.insert(groupRoles).values({ groupId: engGroup.id, roleId: editorRole.id });
-
-    // 4. 创建资源与 ACL 测试 (核心:测试 Deny-Override)
-    console.log('📂 创建测试资源与 ACL 规则...');
-    const [publicFolder] = await db.insert(resourceSchema).values({
-      name: 'Public Docs',
-      path: '/public',
-      type: 'folder',
-      ownerId: adminUser.id
-    }).returning();
+    // 关联用户到组和角色 (使用 upsert)
+    await db.insert(userGroups).values({ userId: adminUser.id, groupId: engGroup.id })
+      .onConflictDoNothing();
 
-    const [secretFolder] = await db.insert(resourceSchema).values({
-      name: 'Secret Projects',
-      path: '/projects/secret',
-      type: 'folder',
-      ownerId: adminUser.id
-    }).returning();
+    await db.insert(groupRoles).values({ groupId: engGroup.id, roleId: editorRole.id })
+      .onConflictDoNothing();
+
+    await db.insert(userRoles).values({ userId: adminUser.id, roleId: adminRole.id })
+      .onConflictDoNothing();
+
+    // --- 3. 创建资源与 ACL 测试 (核心:测试 Deny-Override) ---
+    console.log('📂 正在同步测试资源与 ACL 规则...');
+
+    // 获取或创建 Public Folder
+    let publicFolder = await db.query.resources.findFirst({
+      where: eq(resourceSchema.path, '/public')
+    });
+    if (!publicFolder) {
+      [publicFolder] = await db.insert(resourceSchema).values({
+        name: 'Public Docs',
+        path: '/public',
+        type: 'folder',
+        ownerId: adminUser.id
+      }).returning();
+    }
+
+    // 获取或创建 Secret Folder
+    let secretFolder = await db.query.resources.findFirst({
+      where: eq(resourceSchema.path, '/projects/secret')
+    });
+    if (!secretFolder) {
+      [secretFolder] = await db.insert(resourceSchema).values({
+        name: 'Secret Projects',
+        path: '/projects/secret',
+        type: 'folder',
+        ownerId: adminUser.id
+      }).returning();
+    }
+
+    // 为 secretFolder 设置一条针对 testUser 的显式 DENY 规则 (使用 upsert 逻辑)
+    // 先清理旧的该用户在该资源上的规则,确保测试环境干净
+    await db.delete(aclRulesSchema).where(
+      and(
+        eq(aclRulesSchema.resourceId, secretFolder.id),
+        eq(aclRulesSchema.subjectId, testUser.id),
+        eq(aclRulesSchema.subjectType, 'user')
+      )
+    );
 
-    // 为 secretFolder 设置一条针对 testUser 的显式 DENY 规则
     await db.insert(aclRulesSchema).values({
       resourceId: secretFolder.id,
       subjectType: 'user',

+ 96 - 0
src/lib/auth/permission.ts

@@ -0,0 +1,96 @@
+import { eq, and, inArray, sql } from 'drizzle-orm';
+import { db } from '../../db';
+import { users, groups, roles, permissions, userGroups, groupRoles, rolePermissions, userRoles } from '../../db/schema/auth';
+import { resources, aclRules } from '../../db/schema/resource';
+import { AuthContext, PermissionAction, ResourceType, PermissionResult } from './types';
+
+/**
+ * 核心权限校验引擎
+ * 实现逻辑:
+ * 1. 收集用户的所有身份 (userId + groupIds)
+ * 2. 检查 ACL 规则 (路径继承 + Deny-Override)
+ * 3. 如果没有 ACL 覆盖,则检查 RBAC 角色权限
+ */
+export async function checkPermission(
+  ctx: AuthContext,
+  resourcePath: string,
+  action: PermissionAction,
+  resourceType: ResourceType
+): Promise<PermissionResult> {
+  const { userId, groupIds } = ctx;
+  const allSubjectIds = [userId, ...groupIds];
+
+  // 1. 构建路径层级列表 (用于实现权限继承)
+  // 例如: /a/b/c -> ['/a/b/c', '/a/b', '/a', '/']
+  const pathParts = resourcePath.split('/').filter(Boolean);
+  const hierarchyPaths: string[] = ['/'];
+  let currentPath = '';
+  for (const part of pathParts) {
+    currentPath += `/${part}`;
+    hierarchyPaths.push(currentPath);
+  }
+
+  // 2. 检查 ACL 规则 (优先级最高)
+  // 我们需要查询该路径及其所有父路径上,针对当前用户或其所属组的 ACL 规则
+  const aclRulesResult = await db.query.aclRules.findMany({
+    where: and(
+      inArray(aclRules.subjectId, allSubjectIds),
+      inArray(aclRules.action, [action]),
+      // 这里需要通过 SQL 实现路径匹配,Drizzle query API 对此支持有限,改用 sql 辅助
+    ),
+  });
+
+  // 由于 Drizzle query API 在处理复杂的路径前缀匹配时不够灵活,
+  // 我们使用更直接的 SQL 查询来获取所有相关的 ACL 规则
+  const relevantAclRules = await db.execute(sql`
+    SELECT subject_type, subject_id, permission_type, action
+    FROM acl_rules
+    WHERE subject_id IN (${sql.join(allSubjectIds.map(id => sql`${id}`), sql`, `)})
+      AND action = ${action}
+      AND resource_id IN (
+        SELECT id FROM resources WHERE path IN (${sql.join(hierarchyPaths.map(p => sql`${p}`), sql`, `)})
+      )
+  `);
+
+  // 评估 ACL 规则:Deny-Override 原则
+  if (relevantAclRules.rows.length > 0) {
+    // 检查是否存在任何显式的 'deny'
+    const hasDeny = relevantAclRules.rows.some(
+      (rule: any) => rule.permission_type === 'deny'
+    );
+
+    if (hasDeny) {
+      return { granted: false, reason: 'Explicitly denied by ACL rule' };
+    }
+
+    // 检查是否存在显式的 'allow'
+    const hasAllow = relevantAclRules.rows.some(
+      (rule: any) => rule.permission_type === 'allow'
+    );
+
+    if (hasAllow) {
+      return { granted: true, reason: 'Explicitly allowed by ACL rule' };
+    }
+  }
+
+  // 3. 检查 RBAC 权限 (兜底逻辑)
+  // 查询用户及其所属组所拥有的所有权限
+  const rbacPermissions = await db.execute(sql`
+    SELECT p.action, p.resource_type
+    FROM permissions p
+    JOIN role_permissions rp ON p.id = rp.permission_id
+    LEFT JOIN user_roles ur ON rp.role_id = ur.role_id
+    LEFT JOIN group_roles gr ON rp.role_id = gr.role_id
+    LEFT JOIN user_groups ug ON gr.group_id = ug.group_id
+    WHERE (ur.user_id = ${userId} OR ug.user_id = ${userId})
+      AND p.action = ${action}
+      AND p.resource_type = ${resourceType}
+  `);
+
+  if (rbacPermissions.rows.length > 0) {
+    return { granted: true, reason: 'Granted by RBAC role' };
+  }
+
+  // 4. 默认拒绝
+  return { granted: false, reason: 'No matching permission found (Default Deny)' };
+}

+ 15 - 0
src/lib/auth/types.ts

@@ -0,0 +1,15 @@
+export type PermissionAction = 'read' | 'write' | 'delete';
+
+export type ResourceType = 'document' | 'video' | 'folder';
+
+export type SubjectType = 'user' | 'group';
+
+export interface AuthContext {
+  userId: string;
+  groupIds: string[];
+}
+
+export interface PermissionResult {
+  granted: boolean;
+  reason?: string;
+}

+ 77 - 0
tests/auth/permission.test.ts

@@ -0,0 +1,77 @@
+import { describe, it, expect } from 'vitest';
+import { db } from '../../src/db';
+import { checkPermission } from '../../src/lib/auth/permission';
+import { eq } from 'drizzle-orm';
+import { users, groups, roles, permissions, userGroups, groupRoles, rolePermissions, userRoles, resources, aclRules } from '../../src/db/schema/auth';
+import { resources as resourceSchema, aclRules as aclRulesSchema } from '../../src/db/schema/resource';
+
+describe('Permission Engine Integration Tests', () => {
+  // 注意:这些测试依赖于 Seed 脚本注入的数据
+  // 管理员: admin@ekb.com
+  // 测试用户: tester@ekb.com (已被禁止访问 /projects/secret)
+
+  const ADMIN_EMAIL = 'admin@ekb.com';
+  const TESTER_EMAIL = 'tester@ekb.com';
+
+  it('should allow admin to access any resource via RBAC', async () => {
+    // 获取管理员 ID
+    const [admin] = await db.select().from(users).where(eq(users.email, ADMIN_EMAIL));
+
+    // 模拟管理员上下文 (假设他属于一个拥有 editor 角色的组)
+    // 在 Seed 中,admin 被赋予了 editor 角色
+    const [engGroup] = await db.select().from(groups).where(eq(groups.name, 'Engineering Department'));
+
+    const result = await checkPermission(
+      { userId: admin.id, groupIds: [engGroup.id] },
+      '/any/path',
+      'read',
+      'document'
+    );
+
+    expect(result.granted).toBe(true);
+  });
+
+  it('should deny tester access to secret folder due to explicit ACL deny rule', async () => {
+    // 获取测试用户 ID
+    const [tester] = await db.select().from(users).where(eq(users.email, TESTER_EMAIL));
+
+    const result = await checkPermission(
+      { userId: tester.id, groupIds: [] },
+      '/projects/secret',
+      'read',
+      'document'
+    );
+
+    expect(result.granted).toBe(false);
+    expect(result.reason).toBe('Explicitly denied by ACL rule');
+  });
+
+  it('should allow tester access to public folder (no deny rule)', async () => {
+    const [tester] = await db.select().from(users).where(eq(users.email, TESTER_EMAIL));
+
+    const result = await checkPermission(
+      { userId: tester.id, groupIds: [] },
+      '/public',
+      'read',
+      'document'
+    );
+
+    // 注意:由于 tester 本身没有 RBAC 权限,这里应该返回 false (Default Deny)
+    expect(result.granted).toBe(false);
+  });
+
+  it('should respect path inheritance for ACL rules', async () => {
+    const [tester] = await db.select().from(users).where(eq(users.email, TESTER_EMAIL));
+
+    // 测试访问子路径 /projects/secret/sub-file,应该同样被拒绝
+    const result = await checkPermission(
+      { userId: tester.id, groupIds: [] },
+      '/projects/secret/sub-file',
+      'read',
+      'document'
+    );
+
+    expect(result.granted).toBe(false);
+    expect(result.reason).toBe('Explicitly denied by ACL rule');
+  });
+});