#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import time import asyncio import traceback from pathlib import Path from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional import uvicorn from mineru.cli.common import read_fn, prepare_env, _process_output from mineru.data.data_reader_writer import FileBasedDataWriter from mineru.backend.vlm.vlm_analyze import aio_doc_analyze, ModelSingleton from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make from mineru.utils.enum_class import MakeMode from httpx import ReadError, ConnectError, ConnectTimeout os.environ["CUDA_VISIBLE_DEVICES"] = "4" app = FastAPI(title="MinerU Service") # 全局模型实例 model_singleton = ModelSingleton() predictor = None MAX_RETRY = 5 RETRY_INTERVAL = 2 # 秒 class ParseRequest(BaseModel): pdf_path: str output_dir: Optional[str] = "./tmp_file" server_url: Optional[str] = "http://127.0.0.1:9999" class ParseResponse(BaseModel): code: int message: str data: Optional[dict] = None async def get_predictor(server_url: str): """获取或创建 MinerU predictor""" global predictor if predictor is None: os.environ["MINERU_MODEL_SOURCE"] = "modelscope" print(f"正在连接 vLLM 服务器: {server_url}...") predictor = model_singleton.get_model( backend="http-client", model_path=None, server_url=server_url, ) print("服务器连接成功。") return predictor async def parse_pdf_with_retry(pdf_path: str, output_dir: str, server_url: str): """带重试的 PDF 解析""" pdf_bytes = read_fn(pdf_path) pdf_file_name = os.path.splitext(os.path.basename(pdf_path))[0] + time.strftime("_%Y%m%d%H%M%S") local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, "vlm") image_writer = FileBasedDataWriter(local_image_dir) md_writer = FileBasedDataWriter(local_md_dir) print(f"正在解析文档: {pdf_path}") # 重试逻辑 for attempt in range(MAX_RETRY): try: pred = await get_predictor(server_url) middle_json, infer_result = await aio_doc_analyze( pdf_bytes, image_writer=image_writer, predictor=pred, backend="http-client", ) break except (ReadError, ConnectError, ConnectTimeout) as e: if attempt == MAX_RETRY - 1: raise RuntimeError(f"VLM 连接失败,重试 {MAX_RETRY} 次后仍失败") print(f"VLM连接失败,重试 {attempt + 1}/{MAX_RETRY}") await asyncio.sleep(RETRY_INTERVAL ** attempt) # 指数退避 # 生成内容 pdf_info = middle_json.get("pdf_info", {}) content_list = union_make(pdf_info, MakeMode.CONTENT_LIST, img_buket_path=local_image_dir) # 保存输出 _process_output( pdf_info=pdf_info, pdf_bytes=pdf_bytes, pdf_file_name=pdf_file_name, local_md_dir=local_md_dir, local_image_dir=local_image_dir, md_writer=md_writer, f_draw_layout_bbox=False, f_draw_span_bbox=False, f_dump_orig_pdf=False, f_dump_md=True, f_dump_content_list=True, f_dump_middle_json=True, f_dump_model_output=True, f_make_md_mode=MakeMode.MM_MD, middle_json=middle_json, model_output=infer_result, is_pipeline=False, ) path_md = f"{local_md_dir}/{pdf_file_name}.md" print(f"文档解析完成。MD 文件已保存到: {path_md}") return content_list, path_md, pdf_file_name @app.post("/parse", response_model=ParseResponse) async def parse_pdf(request: ParseRequest): """ 解析 PDF 文件 返回: - content_list: 内容列表 - md_path: MD 文件保存路径 - pdf_file_name: PDF 文件名 """ try: pdf_path = request.pdf_path if not os.path.exists(pdf_path): raise HTTPException(status_code=400, detail=f"PDF 文件不存在: {pdf_path}") content_list, md_path, pdf_file_name = await parse_pdf_with_retry( pdf_path=pdf_path, output_dir=request.output_dir, server_url=request.server_url ) return ParseResponse( code=200, message="解析成功", data={ "content_list": content_list, "md_path": md_path, "pdf_file_name": pdf_file_name } ) except Exception as e: traceback.print_exc() return ParseResponse( code=500, message=f"解析失败: {str(e)}", data=None ) @app.get("/health") async def health_check(): """健康检查""" return {"status": "ok"} if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--host", default="0.0.0.0", help="服务地址") parser.add_argument("--port", type=int, default=8120, help="服务端口") args = parser.parse_args() uvicorn.run(app, host=args.host, port=args.port)