| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import os
- import time
- import asyncio
- import traceback
- from pathlib import Path
- from fastapi import FastAPI, HTTPException
- from pydantic import BaseModel
- from typing import Optional
- import uvicorn
- from mineru.cli.common import read_fn, prepare_env, _process_output
- from mineru.data.data_reader_writer import FileBasedDataWriter
- from mineru.backend.vlm.vlm_analyze import aio_doc_analyze, ModelSingleton
- from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make
- from mineru.utils.enum_class import MakeMode
- from httpx import ReadError, ConnectError, ConnectTimeout
- os.environ["CUDA_VISIBLE_DEVICES"] = "4"
- app = FastAPI(title="MinerU Service")
- # 全局模型实例
- model_singleton = ModelSingleton()
- predictor = None
- MAX_RETRY = 5
- RETRY_INTERVAL = 2 # 秒
- class ParseRequest(BaseModel):
- pdf_path: str
- output_dir: Optional[str] = "./tmp_file"
- server_url: Optional[str] = "http://127.0.0.1:9999"
- class ParseResponse(BaseModel):
- code: int
- message: str
- data: Optional[dict] = None
- async def get_predictor(server_url: str):
- """获取或创建 MinerU predictor"""
- global predictor
- if predictor is None:
- os.environ["MINERU_MODEL_SOURCE"] = "modelscope"
- print(f"正在连接 vLLM 服务器: {server_url}...")
- predictor = model_singleton.get_model(
- backend="http-client",
- model_path=None,
- server_url=server_url,
- )
- print("服务器连接成功。")
- return predictor
- async def parse_pdf_with_retry(pdf_path: str, output_dir: str, server_url: str):
- """带重试的 PDF 解析"""
- pdf_bytes = read_fn(pdf_path)
- pdf_file_name = os.path.splitext(os.path.basename(pdf_path))[0] + time.strftime("_%Y%m%d%H%M%S")
- local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, "vlm")
-
- image_writer = FileBasedDataWriter(local_image_dir)
- md_writer = FileBasedDataWriter(local_md_dir)
-
- print(f"正在解析文档: {pdf_path}")
-
- # 重试逻辑
- for attempt in range(MAX_RETRY):
- try:
- pred = await get_predictor(server_url)
- middle_json, infer_result = await aio_doc_analyze(
- pdf_bytes,
- image_writer=image_writer,
- predictor=pred,
- backend="http-client",
- )
- break
- except (ReadError, ConnectError, ConnectTimeout) as e:
- if attempt == MAX_RETRY - 1:
- raise RuntimeError(f"VLM 连接失败,重试 {MAX_RETRY} 次后仍失败")
- print(f"VLM连接失败,重试 {attempt + 1}/{MAX_RETRY}")
- await asyncio.sleep(RETRY_INTERVAL ** attempt) # 指数退避
-
- # 生成内容
- pdf_info = middle_json.get("pdf_info", {})
- content_list = union_make(pdf_info, MakeMode.CONTENT_LIST, img_buket_path=local_image_dir)
-
- # 保存输出
- _process_output(
- pdf_info=pdf_info,
- pdf_bytes=pdf_bytes,
- pdf_file_name=pdf_file_name,
- local_md_dir=local_md_dir,
- local_image_dir=local_image_dir,
- md_writer=md_writer,
- f_draw_layout_bbox=False,
- f_draw_span_bbox=False,
- f_dump_orig_pdf=False,
- f_dump_md=True,
- f_dump_content_list=True,
- f_dump_middle_json=True,
- f_dump_model_output=True,
- f_make_md_mode=MakeMode.MM_MD,
- middle_json=middle_json,
- model_output=infer_result,
- is_pipeline=False,
- )
-
- path_md = f"{local_md_dir}/{pdf_file_name}.md"
- print(f"文档解析完成。MD 文件已保存到: {path_md}")
-
- return content_list, path_md, pdf_file_name
- @app.post("/parse", response_model=ParseResponse)
- async def parse_pdf(request: ParseRequest):
- """
- 解析 PDF 文件
-
- 返回:
- - content_list: 内容列表
- - md_path: MD 文件保存路径
- - pdf_file_name: PDF 文件名
- """
- try:
- pdf_path = request.pdf_path
-
- if not os.path.exists(pdf_path):
- raise HTTPException(status_code=400, detail=f"PDF 文件不存在: {pdf_path}")
-
- content_list, md_path, pdf_file_name = await parse_pdf_with_retry(
- pdf_path=pdf_path,
- output_dir=request.output_dir,
- server_url=request.server_url
- )
-
- return ParseResponse(
- code=200,
- message="解析成功",
- data={
- "content_list": content_list,
- "md_path": md_path,
- "pdf_file_name": pdf_file_name
- }
- )
-
- except Exception as e:
- traceback.print_exc()
- return ParseResponse(
- code=500,
- message=f"解析失败: {str(e)}",
- data=None
- )
- @app.get("/health")
- async def health_check():
- """健康检查"""
- return {"status": "ok"}
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument("--host", default="0.0.0.0", help="服务地址")
- parser.add_argument("--port", type=int, default=8120, help="服务端口")
- args = parser.parse_args()
-
- uvicorn.run(app, host=args.host, port=args.port)
|