| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716 |
- import os
- import cv2
- import base64
- import uuid
- import shutil
- import logging
- import requests
- from typing import Optional
- from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Body
- from fastapi.responses import JSONResponse, StreamingResponse
- from fastapi.staticfiles import StaticFiles
- from fastapi.middleware.cors import CORSMiddleware
- from pydantic import BaseModel
- from openai import OpenAI
- from scenedetect import open_video, SceneManager
- from scenedetect.detectors import ContentDetector
- import asyncio
- from concurrent.futures import ThreadPoolExecutor
- from minio import Minio
- import traceback
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- class Config:
- # API_KEY = "sk-0bfcef4bcb124cba8484bb196a8befc6"
- API_KEY = "empety"
- # BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
- BASE_URL = "http://192.168.3.33:11434/v1"
- # MODEL_NAME = "qwen3-vl-flash"
- MODEL_NAME = "qwen3-vl:8b"
-
- # 视频处理参数
- KEYFRAMES_PER_SCENE = 3 # 每个镜头抽取的关键帧数量
- EXTRACT_INTERVAL = 3 # 无镜头检测时的抽帧间隔(秒)
- TEMP_DIR = "./temp_file" # 临时文件目录
- UPLOAD_DIR = "./uploads" # 上传文件目录
-
- # 提示词模板
- PROMPT_TEXT = "请回答用户的问题。"
- # PROMPT_IMAGE = "请描述图像中的主要内容、场景与人物。"
- PROMPT_VIDEO = "请描述镜头中的主要内容、场景、动作与事件。"
- PROMPT_IMAGE = """
- 你是一名专业的“应急灾害影像分析员”,需要根据图像内容对以下 6 类应急场景进行检测与风险分析:
- 烟火 / 烟雾 / 热异常
- 积水 / 涝情
- 山体滑坡 / 崩塌 / 灾害迹象
- 道路交通状况
- 基础设施受损情况
- 人员活动 / 救援迹象
- 请严格按照以下规则执行任务:
- 【任务要求】
- 1. 对每一个类别输出一个 JSON 对象,共 6 个
- 类别列表:
- "fire_smoke_detection"
- "water_accumulation"
- "landslide_signs"
- "road_traffic_status"
- "critical_infrastructure"
- "human_activity_signs"
- 2. 每个类别内部必须包含以下字段(字段名固定):
- {
- "category": "",
- "is_detected": false,
- "confidence": null,
- "description": "",
- "risk_assessment": "",
- "location_reference": ""
- }
- 字段含义如下:
- 字段名 说明
- category 以上六个类别之一,保持字符串一致
- is_detected 是否检测到关键要素(true/false)
- confidence [可选] 你对判断的置信度 0~1
- description 详细描述:出现的对象、规模、范围、位置、特征
- risk_assessment 风险等级:"低" / "中" / "高"
- location_reference 相对位置,如:道路北侧、桥梁附近、画面左下角等
- 3. 必须严格输出 JSON 数组格式,不得输出解释性文字
- [
- {... 六个类别的 JSON ...}
- ]
- 4. 六类检测提示词(模型的视觉分析指南)
- 【六类视觉检测指引】
- 1. fire_smoke_detection(烟火监测)
- 分析要点:
- 是否有烟雾、明火、热异常、烧痕
- 位置:如“山腰东南方向”“画面左侧”
- 烟雾颜色:白/灰/黑
- 火势规模:小片区 / 中等 / 大范围
- 风险评估:低/中/高
- 目标元素: 烟雾、明火、热异常区、过火痕迹
- 评估维度: 有无、位置、范围、趋势、风险等级
- 2. water_accumulation(积水)
- 分析要点:
- 路面积水、低洼区积水、河道上涨
- 水面范围、估算深度
- 是否影响道路、房屋、车辆
- 是否有潜在次生灾害(淹没道路、冲刷边坡等)
- 目标元素: 路面漫水、低洼积水、河道上涨、排水设施
- 评估维度: 有无、位置、范围、深度、风险性
- 3. landslide_signs(山体异常)
- 分析要点:
- 滑坡体、崩塌体、碎屑流、裂缝、树木倾斜
- 新鲜裸露土体 / 老滑坡复活迹象
- 是否影响道路、河道或建筑
- 活动性(新滑坡 / 发展中)
- 目标元素: 滑坡体、崩塌、裂缝、碎屑流、植被破坏
- 评估维度: 有无、类型、规模、位置、活动性、影响范围
- 4. road_traffic_status(道路交通)
- 分析要点:
- 道路是否通畅、受阻、中断
- 是否有事故、拥堵、车辆滞留
- 路面损坏:塌陷、裂缝、障碍物
- 是否有救援力量
- 目标元素: 通行中断、拥堵、车辆滞留、损坏、救援车辆
- 评估维度: 通行状态 / 原因 / 影响程度 / 救援通道是否畅通
- 5. critical_infrastructure(关键基建)
- 分析要点:
- 桥梁是否倾斜 / 断裂
- 隧道口是否堵塞
- 输电线塔是否倒伏
- 通信基站是否损坏
- 损伤程度及影响功能
- 目标元素: 桥梁、隧道、线塔、重要设施
- 评估维度: 结构完整性 / 功能状态 / 受损程度
- 6. human_activity_signs(人员活动与救援)
- 分析要点:
- 有无人员:正常/受困
- 救援队伍、工程车辆、临时安置点
- 位置信息
- 该区域是否安全
- 目标元素: 受困人员、救援人员、工程车、临时设施
- 评估维度: 活动类型 / 人员规模 / 位置 / 安全性
- 最终输出格式(必须严格遵守)
- 输出示例结构如下(内容仅示意):
- [
- {
- "category": "fire_smoke_detection",
- "is_detected": false,
- "confidence": 0.12,
- "description": "",
- "risk_assessment": "低",
- "location_reference": ""
- },
- {
- "category": "water_accumulation",
- "is_detected": true,
- "confidence": 0.78,
- "description": "画面右下角出现浅层路面积水,范围较小。",
- "risk_assessment": "低",
- "location_reference": "画面右下区域"
- },
- ...
- ]
- 禁止输出以下内容:
- 解释文字
- Markdown
- 图片描述以外的闲聊
- 未定义字段
- JSON 外的任何文字
- """
-
- # MinIO配置
- # MINIO_ENDPOINT = 'xia0miduo.gicp.net:9000'
- # MINIO_ACCESS_KEY = 'minioadmin'
- # MINIO_SECRET_KEY = 'minioadmin'
- # MINIO_BUCKET = 'papbtest'
- # MINIO_URL = "http://xia0miduo.gicp.net:9000"
- # MINIO_SECURE = False
- # MINIO_ENDPOINT = '127.0.0.1:30802'
- # MINIO_ACCESS_KEY = 'admin'
- # MINIO_SECRET_KEY = 'Ryu304307910'
- # MINIO_BUCKET = 'file-storage-privatization'
- # MINIO_URL = "http://127.0.0.1:30802"
- # MINIO_SECURE = False
- # MINIO_ENDPOINT = '192.168.3.33:9000'
- # MINIO_ACCESS_KEY = 'minioadmin'
- # MINIO_SECRET_KEY = 'minioadmin'
- # MINIO_BUCKET = 'dji-fh'
- # MINIO_URL = "http://192.168.3.33:9000"
- # MINIO_SECURE = False
- MINIO_ENDPOINT = 'minio.ryuiso.com:59000'
- MINIO_ACCESS_KEY = 'oss_library'
- MINIO_SECRET_KEY = 'yDkG9YJiC92G3vk52goST'
- MINIO_BUCKET = 'dji-cloudapi'
- MINIO_URL = "http://minio.ryuiso.com:59000"
- MINIO_SECURE = False
- # 创建目录
- os.makedirs(Config.TEMP_DIR, exist_ok=True)
- os.makedirs(Config.UPLOAD_DIR, exist_ok=True)
- # 线程池用于异步处理
- executor = ThreadPoolExecutor(max_workers=4)
- # MinIO客户端初始化
- try:
- minio_client = Minio(
- Config.MINIO_ENDPOINT,
- access_key=Config.MINIO_ACCESS_KEY,
- secret_key=Config.MINIO_SECRET_KEY,
- secure=Config.MINIO_SECURE,
- )
- logger.info("MinIO客户端初始化成功")
- except Exception as e:
- logger.error(f"MinIO客户端初始化失败: {e}")
- minio_client = None
- def build_image_content(image_path_or_url: str) -> dict:
- """
- 构建图像请求格式内容
- """
- if image_path_or_url.startswith("http://") or image_path_or_url.startswith("https://"):
- try:
- # 下载网络图片
- logger.info(f"正在下载网络图片: {image_path_or_url}")
- response = requests.get(image_path_or_url, timeout=30)
- response.raise_for_status()
- img_bytes = response.content
-
- # 根据Content-Type或URL扩展名确定MIME类型
- mime = "image/jpeg" # 默认MIME类型
- content_type = response.headers.get('Content-Type', '')
- if 'image/png' in content_type or image_path_or_url.lower().endswith(".png"):
- mime = "image/png"
- elif 'image/gif' in content_type or image_path_or_url.lower().endswith(".gif"):
- mime = "image/gif"
- elif 'image/webp' in content_type or image_path_or_url.lower().endswith(".webp"):
- mime = "image/webp"
-
- # 转换为base64编码
- b64 = base64.b64encode(img_bytes).decode("utf-8")
- logger.info(f"网络图片下载并转换成功: {image_path_or_url}")
-
- return {"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}}
- except requests.exceptions.RequestException as e:
- logger.error(f"下载网络图片失败: {image_path_or_url}, 错误: {str(e)}")
- raise ValueError(f"无法下载网络图片: {image_path_or_url}, 错误: {str(e)}")
- except Exception as e:
- logger.error(f"处理网络图片时出错: {image_path_or_url}, 错误: {str(e)}")
- raise ValueError(f"处理网络图片时出错: {image_path_or_url}, 错误: {str(e)}")
-
- elif os.path.exists(image_path_or_url):
- with open(image_path_or_url, "rb") as f:
- img_bytes = f.read()
- # 转码
- b64 = base64.b64encode(img_bytes).decode("utf-8")
-
- # 根据文件扩展名确定MIME类型
- mime = "image/jpeg"
- if image_path_or_url.lower().endswith(".png"):
- mime = "image/png"
- elif image_path_or_url.lower().endswith(".gif"):
- mime = "image/gif"
- elif image_path_or_url.lower().endswith(".webp"):
- mime = "image/webp"
-
- return {"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}}
- else:
- raise ValueError(f"无效的图片路径或URL: {image_path_or_url}")
- def detect_scenes(video_path: str) -> list:
- """
- 使用PySceneDetect检测视频中的镜头切换
- """
- # 有关PySceneDetect的镜头检测,详见https://www.scenedetect.com/docs/latest/api/scene_manager.html#usage
- video = open_video(video_path)
- scene_manager = SceneManager()
- scene_manager.add_detector(ContentDetector(threshold=20.0))
- scene_manager.detect_scenes(video)
- scenes = scene_manager.get_scene_list()
- print(f"检测到 {len(scenes)} 个镜头片段")
- return scenes
- def extract_frames_every(video_path: str, seconds: int = 3) -> list:
- """
- 按固定时间间隔从视频中提取帧(没有检测到镜头时)
- """
- cap = cv2.VideoCapture(video_path)
- fps = cap.get(cv2.CAP_PROP_FPS)
- interval = int(fps * seconds)
- frame_paths = []
- count = 0
-
- while cap.isOpened():
- ret, frame = cap.read()
- if not ret:
- break
- if count % interval == 0:
- path = f"{Config.TEMP_DIR}/frame_{count//interval+1}.jpg"
- cv2.imwrite(path, frame)
- frame_paths.append(path)
- count += 1
-
- cap.release()
- print(f"基于间隔抽取 {len(frame_paths)} 张帧")
- return frame_paths
- def extract_keyframes(video_path: str, scenes: list, num_frames: int = 3) -> list:
- """
- 从每个检测到的镜头中提取关键帧
- """
- cap = cv2.VideoCapture(video_path)
- keyframes = []
-
- for i, (start, end) in enumerate(scenes):
- start_s, end_s = start.get_seconds(), end.get_seconds()
- duration = end_s - start_s
- frames = []
-
- for j in range(num_frames):
- # 在镜头时间范围内均匀抽帧
- t = start_s + (j + 1) * duration / (num_frames + 1)
- cap.set(cv2.CAP_PROP_POS_MSEC, t * 1000)
- ret, frame = cap.read()
-
- if ret:
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- # 过滤过暗的帧
- if gray.mean() > 30:
- path = f"{Config.TEMP_DIR}/scene{i+1}_{j+1}.jpg"
- cv2.imwrite(path, frame)
- frames.append(path)
-
- if frames:
- keyframes.append(frames)
-
- cap.release()
- return keyframes
- def describe_scene(client: OpenAI, model_name: str, image_paths: list, scene_idx: int, prompt: str = None) -> str:
- """
- 调用多模态模型描述单个场景
- """
- if not prompt:
- prompt = Config.PROMPT_VIDEO
- contents = [build_image_content(p) for p in image_paths]
- contents.append({"type": "text", "text": f"{prompt}(镜头 {scene_idx})"})
-
- response = client.chat.completions.create(
- model=model_name,
- messages=[{"role": "user", "content": contents}],
- max_tokens=5120,
- extra_body={"mm_processor_kwargs": {"fps": 2, "do_sample_frames": True}},
- )
- return response.choices[0].message.content
- def summarize_video(client: OpenAI, model_name: str, scene_descriptions: list) -> str:
- """
- 将所有场景描述汇总为整体视频摘要
- """
- prompt = "以下是视频中每个镜头的描述,请总结视频的主要内容与主题:\n\n"
- for i, d in enumerate(scene_descriptions):
- prompt += f"镜头{i+1}: {d}\n"
- prompt += "\n请生成简洁的整体视频摘要。"
-
- response = client.chat.completions.create(
- model=model_name,
- messages=[{"role": "user", "content": [{"type": "text", "text": prompt}]}],
- max_tokens=5120
- )
- return response.choices[0].message.content
- def detect_content_type(input_path: str) -> str:
- """
- 自动检测输入内容的类型
- """
- # 仅通过文件扩展名判断
- if os.path.isfile(input_path):
- ext = os.path.splitext(input_path)[1].lower()
- if ext in [".mp4", ".mov", ".avi", ".mkv"]:
- return "video"
- elif ext in [".jpg", ".jpeg", ".png", ".gif"]:
- return "image"
- else:
- return "text"
- # 通过URL判断
- elif input_path.startswith("http://") or input_path.startswith("https://"):
- if any(x in input_path.lower() for x in [".mp4", ".mov", ".avi", ".mkv"]):
- return "video"
- elif any(x in input_path.lower() for x in [".jpg", ".jpeg", ".png", ".gif"]):
- return "image"
- else:
- return "text"
- else:
- return "text"
- def process_content(
- content: str,
- content_type: Optional[str] = None,
- keyframes_per_scene: int = 3,
- extract_interval: int = 3,
- prompt_text: str = None,
- prompt_image: str = None,
- prompt_video: str = None
- ) -> dict:
- """
- 处理内容分析
- """
- try:
- logger.info(f"开始处理内容: {content[:100]}...")
- # 使用默认提示词(如果未提供)
- if not prompt_text:
- prompt_text = Config.PROMPT_TEXT
- if not prompt_image:
- prompt_image = Config.PROMPT_IMAGE
- if not prompt_video:
- prompt_video = Config.PROMPT_VIDEO
- logger.info("初始化OpenAI客户端...")
- client = OpenAI(api_key=Config.API_KEY, base_url=Config.BASE_URL, timeout=3600)
-
- # 自动检测内容类型
- if not content_type:
- content_type = detect_content_type(content)
-
- logger.info(f"检测到内容类型: {content_type}")
- print(f"内容类型: {content_type}")
- except Exception as e:
- logger.error(f"process_content初始化失败: {str(e)}")
- logger.error(traceback.format_exc())
- raise
-
- # 根据类型处理
- if content_type == "text":
- try:
- logger.info("处理文本内容...")
- messages = [{"role": "user", "content": [{"type": "text", "text": f"{prompt_text}\n\n{content}"}]}]
- resp = client.chat.completions.create(model=Config.MODEL_NAME, messages=messages, max_tokens=5120)
- logger.info("文本内容处理成功")
- return {
- "status": "success",
- "content_type": "text",
- "result": resp.choices[0].message.content,
- "scenes": None
- }
- except Exception as e:
- logger.error(f"文本处理失败: {str(e)}")
- raise
-
- elif content_type == "image":
- try:
- logger.info("处理图像内容...")
- content_obj = build_image_content(content)
- messages = [{"role": "user", "content": [content_obj, {"type": "text", "text": prompt_image}]}]
- resp = client.chat.completions.create(model=Config.MODEL_NAME, messages=messages, max_tokens=5120)
- logger.info("图像内容处理成功")
- return {
- "status": "success",
- "content_type": "image",
- "result": resp.choices[0].message.content,
- "scenes": None
- }
- except Exception as e:
- logger.error(f"图像处理失败: {str(e)}")
- raise
-
- elif content_type == "video":
- try:
- # 视频处理流程
- logger.info("开始视频处理...")
- print("开始镜头检测...")
- scenes = detect_scenes(content)
-
- # 没检测到镜头
- if not scenes:
- logger.info("未检测到镜头,使用时间间隔抽帧")
- print("未检测到镜头,使用时间间隔抽帧...")
- frames = extract_frames_every(content, seconds=extract_interval)
- keyframes = [[f] for f in frames]
- else:
- logger.info(f"检测到{len(scenes)}个镜头")
- print("提取关键帧...")
- keyframes = extract_keyframes(content, scenes, num_frames=keyframes_per_scene)
-
- logger.info("开始场景分析...")
- print("调用模型分析场景...")
- scene_descriptions = []
- for i, frames in enumerate(keyframes):
- print(f"分析镜头 {i+1}/{len(keyframes)}...")
- logger.info(f"分析镜头 {i+1}/{len(keyframes)}")
- desc = describe_scene(client, Config.MODEL_NAME, frames, i + 1, prompt_video)
- scene_descriptions.append(desc)
- print(f"镜头{i+1}: {desc}")
-
- logger.info("生成视频摘要...")
- print("生成视频摘要...")
- summary = summarize_video(client, Config.MODEL_NAME, scene_descriptions)
- logger.info("视频处理完成")
-
- return {
- "status": "success",
- "content_type": "video",
- "result": summary,
- "scenes": scene_descriptions
- }
- except Exception as e:
- logger.error(f"视频处理失败: {str(e)}")
- raise
-
- else:
- logger.error(f"不支持的内容类型: {content_type}")
- raise ValueError(f"不支持的内容类型: {content_type}")
- # Pydantic模型定义请求体
- class AnalyzeRequest(BaseModel):
- content: str
- content_type: Optional[str] = None
- keyframes_per_scene: Optional[int] = 3
- extract_interval: Optional[int] = 3
- prompt_text: Optional[str] = None
- prompt_image: Optional[str] = None
- prompt_video: Optional[str] = None
- @app.post("/analyze")
- async def analyze_content(request: AnalyzeRequest):
- """
- 分析文本、URL或已有路径的内容
- """
- try:
- logger.info(f"收到/analyze请求: content={request.content[:100] if len(request.content) > 100 else request.content}...")
- logger.info(f"请求参数: content_type={request.content_type}, keyframes_per_scene={request.keyframes_per_scene}, extract_interval={request.extract_interval}")
-
- # 验证content参数
- if not request.content:
- logger.error("请求缺少content参数")
- raise HTTPException(status_code=400, detail="缺少content参数")
-
- # 在线程池中执行处理
- loop = asyncio.get_running_loop()
- logger.info("开始异步处理内容...")
- result = await loop.run_in_executor(
- executor,
- process_content,
- request.content,
- request.content_type,
- request.keyframes_per_scene,
- request.extract_interval,
- request.prompt_text,
- request.prompt_image,
- request.prompt_video
- )
- logger.info(f"处理完成: {result['status']}")
- return result
- except HTTPException as he:
- logger.error(f"HTTP异常: {he.status_code} - {he.detail}")
- raise
- except Exception as e:
- logger.error(f"处理请求时发生异常: {str(e)}")
- logger.error(f"异常详情: {traceback.format_exc()}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/minio/files")
- async def list_minio_files():
- """
- 获取MinIO中的文件列表
- """
- try:
- objects = minio_client.list_objects(Config.MINIO_BUCKET, recursive=True)
- files = []
- for obj in objects:
- # 只获取图片和视频文件
- ext = os.path.splitext(obj.object_name)[1].lower()
- if ext in ['.jpg', '.jpeg', '.png', '.gif', '.mp4', '.mov', '.avi', '.mkv']:
- file_url = f"{Config.MINIO_URL}/{Config.MINIO_BUCKET}/{obj.object_name}"
- files.append({
- "name": obj.object_name,
- "size": obj.size,
- "url": file_url,
- "type": "image" if ext in ['.jpg', '.jpeg', '.png', '.gif'] else "video"
- })
- print(f"文件: {obj.object_name}, 大小: {obj.size}, URL: {file_url}")
- return {"files": files}
- except Exception as e:
- logger.error(f"获取MinIO文件列表时发生异常: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- # @app.post("/upload")
- # async def upload_and_analyze(
- # file: UploadFile = File(...),
- # content_type: Optional[str] = Form(None),
- # keyframes_per_scene: Optional[int] = Form(3),
- # extract_interval: Optional[int] = Form(3),
- # prompt_text: Optional[str] = Form(None),
- # prompt_image: Optional[str] = Form(None),
- # prompt_video: Optional[str] = Form(None)
- # ):
- # """
- # 上传文件并进行分析
- # """
- # try:
- # # 生成唯一文件名
- # file_id = str(uuid.uuid4())
- # file_ext = os.path.splitext(file.filename)[1]
- # file_path = os.path.join(Config.UPLOAD_DIR, f"{file_id}{file_ext}")
- #
- # # 保存上传的文件
- # with open(file_path, "wb") as buffer:
- # shutil.copyfileobj(file.file, buffer)
- #
- # print(f"文件已保存: {file_path}")
- #
- # # 在线程池中执行处理
- # loop = asyncio.get_event_loop()
- # result = await loop.run_in_executor(
- # executor,
- # process_content,
- # file_path,
- # content_type,
- # keyframes_per_scene,
- # extract_interval,
- # prompt_text,
- # prompt_image,
- # prompt_video
- # )
- #
- # return JSONResponse(content=result)
- # except Exception as e:
- # raise HTTPException(status_code=500, detail=str(e))
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
|