| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458 |
- import os
- import sys
- import json
- import re
- import shutil
- import time
- import zipfile
- from pathlib import Path
- from datetime import datetime
- from difflib import SequenceMatcher
- import logging
- import pandas as pd
- import openpyxl
- from openpyxl import load_workbook
- from docx import Document
- import docx2txt
- from openai import OpenAI
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
- import py7zr
- import rarfile
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
- logger = logging.getLogger(__name__)
- class ProjectLogger:
- def __init__(self, project_name, project_output_dir):
- self.project_name = project_name
- self.project_output_dir = Path(project_output_dir)
- self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- # 创建日志文件路径
- self.log_file = self.project_output_dir / f"{project_name}_{self.timestamp}.log"
- # 创建项目专用的logger
- self.logger = logging.getLogger(f"project_{project_name}_{self.timestamp}")
- self.logger.setLevel(logging.INFO)
- self.logger.handlers.clear()
- self.logger.propagate = False
- # 创建文件handler
- file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
- file_handler.setLevel(logging.INFO)
- # 创建格式器
- formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- file_handler.setFormatter(formatter)
- # 添加handler到logger
- self.logger.addHandler(file_handler)
- # 记录项目开始
- self.logger.info(f"=== 项目开始: {project_name} ===")
- self.logger.info(f"日志文件: {self.log_file}")
- self.llm_interaction_count = 0
- def info(self, message):
- """记录信息级别日志"""
- self.logger.info(message)
- def error(self, message):
- """记录错误级别日志"""
- self.logger.error(message)
- def warning(self, message):
- """记录警告级别日志"""
- self.logger.warning(message)
- def log_llm_interaction(self, prompt, document_content, llm_response, extracted_result=None):
- """记录LLM交互的详细信息"""
- self.llm_interaction_count += 1
- self.logger.info("")
- self.logger.info("=" * 80)
- self.logger.info(f"LLM交互 #{self.llm_interaction_count}")
- self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- self.logger.info("=" * 80)
- # 记录提示词(截取前500字符预览)
- prompt_preview = prompt[:500] + "..." if len(prompt) > 500 else prompt
- self.logger.info(f"提示词预览:\n{prompt_preview}")
- # 记录文档内容长度
- self.logger.info(f"文档内容长度: {len(document_content)} 字符")
- # 记录LLM原始响应
- self.logger.info(f"LLM原始响应:\n{llm_response}")
- # 如果有提取结果,记录提取结果
- if extracted_result:
- self.logger.info(f"提取结果: {'成功' if extracted_result else '失败'}")
- if extracted_result:
- # 格式化JSON输出
- try:
- formatted_json = json.dumps(extracted_result, ensure_ascii=False, indent=2)
- self.logger.info(f"提取的JSON结构:\n{formatted_json}")
- except:
- self.logger.info(f"提取的结果:\n{extracted_result}")
- self.logger.info("=" * 80)
- self.logger.info("")
- def log_project_end(self):
- self.logger.info(f"=== 项目结束: {self.project_name} ===")
- self.logger.info(f"总计LLM交互次数: {self.llm_interaction_count}")
- def close(self):
- """关闭日志记录器"""
- for handler in self.logger.handlers[:]:
- handler.close()
- self.logger.removeHandler(handler)
- class MinRAGSystem:
-
- def __init__(self):
- self.api_key = "sk-72f9d0e5bc894e1d828d73bdcc50ff0a" # LLM密钥
- self.base_url = "https://api.deepseek.com" # LLM地址
- self.model = "deepseek-chat" # LLM模型名称
- self.performance_file = "2025年度 业绩数据填写汇总表-1.xlsx" # 业绩数据表
- self.erp_file = "20250417 调整后ERP系统通用参数 - 展示分类.xlsx" # ERP参数表
- self.output_record_file = "输出记录表.xlsx" # 输出记录表
- # 输出记录表字段
- self.output_record_fields = [
- "项目编号",
- "项目名称",
- "解析完成时间",
- "输出的原始json",
- "项目地点",
- "所属部门",
- "项目等级",
- "项目状态",
- "项目负责人(总监)/项目经理",
- "总监代表/项目副经理",
- "总占地面积(m2)",
- "项目规模",
- "建安总造价(万元)",
- "设备总报价(万元)",
- "总投资额(万元)",
- "监督机构",
- "建设单位",
- "设计单位",
- "勘察单位",
- "施工单位",
- "监理单位",
- "集团客户",
- "计划开工日期",
- "计划竣工日期",
- "实际开工日期",
- "实际竣工日期",
- "是否街道检查项目"
- ]
-
- # 配置需要从Excel的字段
- # 格式:{字段名: {列索引: int, 显示名: str}}
- self.additional_fields_config = {
- "项目名称": {"column_index": 2, "display_name": "项目名称"}, # C列
- "项目编号": {"column_index": 4, "display_name": "项目编号"}, # E列
- "一级工程类型": {"column_index": 8, "display_name": "一级工程类型"}, # I列
- "二级工程类型": {"column_index": [9, 10, 11], "display_name": "二级工程类型"} # J-L列
- }
-
- # 初始化LLM客户端
- self.llm_client = None
- if self.api_key:
- self.llm_client = OpenAI(api_key=self.api_key, base_url=self.base_url)
- logger.info("✓ LLM初始化成功")
- else:
- logger.error("× 缺少api密钥,启动失败!!!")
-
- # 加载数据
- self.project_types = self.load_project_types_from_excel()
- self.erp_fields = self.load_erp_fields_from_excel()
- self.additional_fields_data = self.load_additional_fields_from_excel()
- def _normalize_cell_value(self, value):
- """
- 规范化单元格值:None->空,dict/list->JSON字符串,其余->字符串
- """
- if value is None:
- return None
- try:
- if isinstance(value, (dict, list)):
- return json.dumps(value, ensure_ascii=False)
- return str(value)
- except Exception:
- return str(value) if value is not None else None
- def append_output_record(self, final_result, project_name, result_file_path, project_output_dir, project_logger=None):
- """
- 将当前项目的原始JSON及解析后的关键字段追加到“输出记录表.xlsx”。
- 只保留固定字段写入逻辑,不启用动态扩展。
- """
- try:
- general = {}
- if isinstance(final_result, dict):
- general = final_result.get("通用表单", {}) or {}
- # 构造一行数据
- now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- row_data = {
- "项目编号": general.get("项目编号"),
- "项目名称": general.get("项目名称") or project_name,
- "解析完成时间": now_str,
- "输出的原始json": json.dumps(final_result, ensure_ascii=False),
- "项目地点": general.get("项目地点"),
- "所属部门": general.get("所属部门"),
- "项目等级": general.get("项目等级"),
- "项目状态": general.get("项目状态"),
- "项目负责人(总监)/项目经理": general.get("项目负责人(总监)/项目经理"),
- "总监代表/项目副经理": general.get("总监代表/项目副经理"),
- "总占地面积(m2)": general.get("总占地面积(m2)"),
- "项目规模": general.get("项目规模"),
- "建安总造价(万元)": general.get("建安总造价(万元)"),
- "设备总报价(万元)": general.get("设备总报价(万元)"),
- "总投资额(万元)": general.get("总投资额(万元)"),
- "监督机构": general.get("监督机构"),
- "建设单位": general.get("建设单位"),
- "设计单位": general.get("设计单位"),
- "勘察单位": general.get("勘察单位"),
- "施工单位": general.get("施工单位"),
- "监理单位": general.get("监理单位"),
- "集团客户": general.get("集团客户"),
- "计划开工日期": general.get("计划开工日期"),
- "计划竣工日期": general.get("计划竣工日期"),
- "实际开工日期": general.get("实际开工日期"),
- "实际竣工日期": general.get("实际竣工日期"),
- "是否街道检查项目": general.get("是否街道检查项目")
- }
- # 打开或创建工作簿
- record_path = Path(self.output_record_file)
- if record_path.exists():
- wb = load_workbook(record_path)
- ws = wb.active
- else:
- wb = openpyxl.Workbook()
- ws = wb.active
- # 写入表头
- ws.append(self.output_record_fields)
- # existing_headers = [cell.value for cell in ws[1]] if ws.max_row >= 1 else []
- # existing_headers = [h for h in existing_headers if h is not None]
- # headers_changed = False
- # for field in self.output_record_fields:
- # if field not in existing_headers:
- # existing_headers.append(field)
- # headers_changed = True
- # if headers_changed:
- # for i in range(1, ws.max_column + 1):
- # ws.cell(row=1, column=i, value=None)
- # for col_idx, header in enumerate(existing_headers, start=1):
- # ws.cell(row=1, column=col_idx, value=header)
- # auto_new_fields = []
- # for k, v in general.items():
- # if k not in existing_headers and not isinstance(v, dict):
- # auto_new_fields.append(k)
- # for k in auto_new_fields:
- # row_data[k] = general.get(k)
- values = []
- for header in self.output_record_fields:
- values.append(self._normalize_cell_value(row_data.get(header)))
- ws.append(values)
- wb.save(record_path)
- if project_logger:
- project_logger.info(f"✓ 已写入输出记录: {record_path}")
- except Exception as e:
- logger.error(f"写入输出记录表失败: {e}")
- if project_logger:
- project_logger.error(f"写入输出记录表失败: {e}")
- def add_additional_field(self, field_name, column_index, display_name=None):
- """
- 字段配置
-
- """
- if display_name is None:
- display_name = field_name
-
- self.additional_fields_config[field_name] = {
- "column_index": column_index,
- "display_name": display_name
- }
-
- # 重新加载数据
- self.additional_fields_data = self.load_additional_fields_from_excel()
- def load_additional_fields_from_excel(self):
- """
- 从Excel文件中加载额外字段的数据
-
- Returns:
- dict: {项目名称: {字段名: 值}}
- """
- if not Path(self.performance_file).exists():
- logger.warning(f"业绩数据表不存在: {self.performance_file}")
- return {}
-
- try:
- # 尝试读取Excel数据
- try:
- import xlwings as xw
- app = xw.App(visible=False)
- wb = app.books.open(self.performance_file)
- ws = wb.sheets[0]
- data = ws.used_range.value
- wb.close()
- app.quit()
- except:
- df = pd.read_excel(self.performance_file, engine='openpyxl')
- data = [df.columns.tolist()] + df.values.tolist()
-
- if not data:
- return {}
-
- additional_fields_data = {}
-
- # 项目名称列索引(用于作为主键)
- project_name_col = 2 # C列
-
- for row in data[1:]: # 跳过标题行
- if not row or len(row) <= project_name_col:
- continue
-
- project_name = row[project_name_col]
- if not project_name:
- continue
-
- project_name = str(project_name).strip()
- additional_fields_data[project_name] = {}
-
- # 提取配置的额外字段
- for field_name, config in self.additional_fields_config.items():
- column_index = config["column_index"]
- display_name = config["display_name"]
-
- if isinstance(column_index, list):
- # 处理多列情况
- values = []
- for col_idx in column_index:
- if len(row) > col_idx and row[col_idx]:
- value = str(row[col_idx]).strip()
- # 过滤nan值、空值和无效值
- if value and value.lower() != 'nan' and value != 'None' and value not in values:
- values.append(value)
- additional_fields_data[project_name][display_name] = ", ".join(values) if values else None
- else:
- # 处理单列情况
- if len(row) > column_index and row[column_index]:
- value = str(row[column_index]).strip()
- # 过滤nan值、空值和无效值
- if value and value.lower() != 'nan' and value != 'None':
- additional_fields_data[project_name][display_name] = value
- else:
- additional_fields_data[project_name][display_name] = None
- else:
- additional_fields_data[project_name][display_name] = None
-
- return additional_fields_data
- except Exception as e:
- logger.error(f"加载额外字段数据失败: {e}")
- return {}
- def load_project_types_from_excel(self):
- if not Path(self.performance_file).exists():
- logger.warning(f"业绩数据表不存在: {self.performance_file}")
- return sys.exit(1)
-
- try:
- try:
- import xlwings as xw
- app = xw.App(visible=False)
- wb = app.books.open(self.performance_file)
- ws = wb.sheets[0]
- data = ws.used_range.value
- wb.close()
- app.quit()
- except:
- df = pd.read_excel(self.performance_file, engine='openpyxl')
- data = [df.columns.tolist()] + df.values.tolist()
-
- if not data:
- return {}
-
- project_types = {}
- headers = data[0]
-
- # 根据原项目的列索引
- project_name_col = 2 # 项目名称
- primary_type_col = 8 # 一级工程类型(2025)
- secondary_type_cols = [9, 10, 11] # 二级工程类型(2025)
-
- for row in data[1:]:
- if not row or len(row) <= project_name_col:
- continue
-
- project_name = row[project_name_col]
- if not project_name:
- continue
-
- # 获取一级工程类型
- primary_type = None
- if len(row) > primary_type_col and row[primary_type_col]:
- value = str(row[primary_type_col]).strip()
- # 过滤nan值、空值和无效值
- if value and value.lower() != 'nan' and value != 'None':
- primary_type = self.normalize_type_name(value)
-
- # 获取二级工程类型
- secondary_types = []
- for col_idx in secondary_type_cols:
- if len(row) > col_idx and row[col_idx]:
- value = str(row[col_idx]).strip()
- # 过滤nan值、空值和无效值
- if value and value.lower() != 'nan' and value != 'None':
- secondary_type = self.normalize_type_name(value)
- if secondary_type and secondary_type not in secondary_types:
- secondary_types.append(secondary_type)
-
- project_types[str(project_name).strip()] = {
- "primary_type": primary_type,
- "secondary_types": secondary_types
- }
-
- logger.info(f"✓ 加载 {len(project_types)} 个项目")
- return project_types
- except Exception:
- return {}
-
- def load_erp_fields_from_excel(self):
- if not Path(self.erp_file).exists():
- logger.warning(f"ERP参数表不存在: {self.erp_file}")
- return {}
- try:
- wb = load_workbook(self.erp_file, data_only=True)
- if '调整后参数' not in wb.sheetnames:
- logger.warning("ERP参数表中没有找到'调整后参数'工作表")
- return {}
- ws = wb['调整后参数']
- data = {}
- current_category = None
- current_subcategory = None
- for row in range(1, ws.max_row + 1):
- try:
- col_a = ws.cell(row=row, column=1).value
- col_b = ws.cell(row=row, column=2).value
- col_c = ws.cell(row=row, column=3).value
- col_e = ws.cell(row=row, column=5).value
- if not col_c or col_c in ['字段', 'ERP 通用参数信息']:
- continue
- if col_a and str(col_a).strip() and str(col_a).strip() != 'nan':
- current_category = str(col_a).strip()
- if current_category not in data:
- data[current_category] = {}
- current_subcategory = None
- if col_b and str(col_b).strip() and str(col_b).strip() != 'nan':
- current_subcategory = str(col_b).strip()
- if current_category and current_subcategory not in data[current_category]:
- data[current_category][current_subcategory] = {}
- if col_c and current_category:
- field_name = str(col_c).strip()
- field_type = str(col_e).strip() if col_e else "文本"
- if current_subcategory:
- data[current_category][current_subcategory][field_name] = field_type
- else:
- if '_fields' not in data[current_category]:
- data[current_category]['_fields'] = {}
- data[current_category]['_fields'][field_name] = field_type
- except Exception:
- continue
- logger.info(f"✓ 加载 {len(data)} 个字段分类")
- return data
- except Exception:
- return {}
-
- def normalize_type_name(self, type_name):
- if not type_name:
- return None
- # 移除前面的数字编号,如 "02市政公用工程" → "市政公用工程"
- normalized = re.sub(r'^\d+', '', str(type_name)).strip()
- return normalized if normalized else type_name
-
- def fuzzy_match_project(self, folder_name, threshold=0.6):
- best_match = None
- best_score = 0
- for project_name in self.project_types.keys():
- score = SequenceMatcher(None, folder_name.lower(), project_name.lower()).ratio()
- folder_words = set(re.findall(r'[\u4e00-\u9fff]+|[a-zA-Z]+', folder_name))
- project_words = set(re.findall(r'[\u4e00-\u9fff]+|[a-zA-Z]+', project_name))
- if folder_words and project_words:
- word_overlap = len(folder_words & project_words) / len(folder_words | project_words)
- score = max(score, word_overlap)
- if score > best_score and score >= threshold:
- best_score = score
- best_match = project_name
- return best_match, best_score
- def get_project_types(self, folder_name):
- if folder_name in self.project_types:
- project_info = self.project_types[folder_name]
- return project_info['primary_type'], project_info['secondary_types'], 1.0, folder_name
- matched_project, score = self.fuzzy_match_project(folder_name)
- if matched_project:
- project_info = self.project_types[matched_project]
- return project_info['primary_type'], project_info['secondary_types'], score, matched_project
- return None, [], 0.0, None
-
- def find_erp_type_key(self, type_name):
- if not type_name:
- return None
- normalized_name = self.normalize_type_name(type_name)
- if normalized_name in self.erp_fields:
- return normalized_name
- for key in self.erp_fields.keys():
- if normalized_name in key or key in normalized_name:
- return key
- return None
- def generate_json_structure(self, folder_name):
- primary_type, secondary_types, match_score, matched_project = self.get_project_types(folder_name)
- # 构建JSON结构
- json_structure = {}
- # 添加通用表单
- if "通用表单" in self.erp_fields:
- # 创建通用表单的有序字典,先添加额外字段
- common_form = {}
-
- # 首先添加额外字段到通用表单开头
- project_key = matched_project if matched_project else folder_name
- if project_key in self.additional_fields_data:
- additional_data = self.additional_fields_data[project_key]
- for field_name, field_value in additional_data.items():
- common_form[field_name] = field_value
-
- # 然后添加原有的通用表单字段
- if "_fields" in self.erp_fields["通用表单"]:
- for field_name, field_type in self.erp_fields["通用表单"]["_fields"].items():
- common_form[field_name] = field_type
-
- json_structure["通用表单"] = common_form
- # 如果没有工程类型信息,只返回通用表单
- if not primary_type:
- return json_structure, primary_type, secondary_types, match_score, matched_project
- # 查找匹配的ERP工程类型
- erp_primary_key = self.find_erp_type_key(primary_type)
- if erp_primary_key:
- normalized_secondary_types = []
- for sec_type in secondary_types:
- normalized_sec = self.normalize_type_name(sec_type)
- normalized_sec = re.sub(r'^\d+', '', normalized_sec).strip()
- if normalized_sec:
- normalized_secondary_types.append(normalized_sec)
- if normalized_secondary_types:
- # 尝试匹配二级类型
- matched_secondary = False
- json_structure[erp_primary_key] = {}
- for secondary_type in normalized_secondary_types:
- for erp_sec_key in self.erp_fields[erp_primary_key].keys():
- if erp_sec_key == "_fields":
- continue
- if secondary_type in erp_sec_key or erp_sec_key in secondary_type:
- json_structure[erp_primary_key][erp_sec_key] = self.erp_fields[erp_primary_key][erp_sec_key]
- matched_secondary = True
- break
- # 如果没有匹配到二级类型,使用一级类型的字段
- if not matched_secondary:
- if "_fields" in self.erp_fields[erp_primary_key]:
- json_structure[erp_primary_key] = self.erp_fields[erp_primary_key]["_fields"]
- else:
- json_structure[erp_primary_key] = {k: v for k, v in self.erp_fields[erp_primary_key].items() if k != "_fields"}
- else:
- # 没有二级类型,直接使用一级类型的字段
- if "_fields" in self.erp_fields[erp_primary_key]:
- json_structure[erp_primary_key] = self.erp_fields[erp_primary_key]["_fields"]
- else:
- json_structure[erp_primary_key] = {k: v for k, v in self.erp_fields[erp_primary_key].items() if k != "_fields"}
- return json_structure, primary_type, secondary_types, match_score, matched_project
- def extract_archive(self, archive_path, extract_to):
- archive_path = Path(archive_path)
- extract_to = Path(extract_to)
- try:
- # if archive_path.suffix.lower() == '.zip':
- # with zipfile.ZipFile(archive_path, 'r') as zip_ref:
- # zip_ref.extractall(extract_to)
- # logger.info(f"✓ 解压ZIP: {archive_path.name}")
- if archive_path.suffix.lower() == '.zip':
- # with zipfile.ZipFile(archive_path, 'r') as zip_ref:
- # # 逐个文件处理,避免修改原始file_info
- # for file_info in zip_ref.filelist:
- # # 跳过目录条目(以/结尾的)
- # if file_info.filename.endswith('/'):
- # continue
-
- # original_name = file_info.filename
- # fixed_name = original_name
-
- # # 尝试不同的编码方式修复文件名
- # try:
- # fixed_name = original_name.encode('cp437').decode('gbk')
- # except:
- # try:
- # fixed_name = original_name.encode('cp437').decode('utf-8')
- # except:
- # # 如果都失败,保持原文件名
- # fixed_name = original_name
-
- # # 确保目标目录存在
- # target_path = extract_to / fixed_name
- # target_path.parent.mkdir(parents=True, exist_ok=True)
-
- # # 提取单个文件
- # with zip_ref.open(file_info) as source, open(target_path, 'wb') as target:
- # target.write(source.read())
-
- # logger.info(f"✓ 解压ZIP: {archive_path.name}")
- with zipfile.ZipFile(archive_path, 'r') as zip_ref:
- # 尝试不同的编码方式读取文件名
- for file_info in zip_ref.filelist:
- try:
- file_info.filename = file_info.filename.encode('cp437').decode('gbk')
- except:
- try:
- file_info.filename = file_info.filename.encode('cp437').decode('utf-8')
- except:
- pass
-
- zip_ref.extract(file_info, extract_to)
- logger.info(f"✓ 解压ZIP: {archive_path.name}")
- elif archive_path.suffix.lower() == '.7z':
- with py7zr.SevenZipFile(archive_path, mode='r') as z:
- z.extractall(extract_to)
- logger.info(f"✓ 解压7Z: {archive_path.name}")
- elif archive_path.suffix.lower() == '.rar':
- with rarfile.RarFile(archive_path, 'r') as rar_ref:
- rar_ref.extractall(extract_to)
- logger.info(f"✓ 解压RAR: {archive_path.name}")
- else:
- logger.warning(f"不支持的压缩格式: {archive_path.suffix}")
- return False
- # 优化目录结构(删除多余的嵌套层级)
- self.flatten_directory(extract_to)
- return True
- except Exception as e:
- logger.error(f"解压失败 {archive_path}: {e}")
- return False
- def flatten_directory(self, extract_dir):
- extract_dir = Path(extract_dir)
- if not extract_dir.is_dir():
- return
- items = list(extract_dir.iterdir())
- if len(items) == 1 and items[0].is_dir():
- inner_dir = items[0]
- for item in inner_dir.iterdir():
- shutil.move(str(item), str(extract_dir))
- inner_dir.rmdir()
- logger.info("✓ 优化了目录结构")
- def read_document(self, file_path):
- file_path = Path(file_path)
- if not file_path.exists():
- return ""
- try:
- if file_path.suffix.lower() == '.docx':
- return self.read_docx(file_path)
- elif file_path.suffix.lower() == '.doc':
- return self.read_doc(file_path)
- elif file_path.suffix.lower() in ['.xlsx', '.xls']:
- return self.read_excel(file_path)
- elif file_path.suffix.lower() == '.txt':
- return self.read_txt(file_path)
- else:
- logger.warning(f"不支持的文件格式: {file_path}")
- return ""
-
- except Exception as e:
- logger.error(f"读取文档失败 {file_path}: {e}")
- return ""
- def read_docx(self, file_path):
- try:
- doc = Document(file_path)
- content = []
- # 读取段落
- for paragraph in doc.paragraphs:
- if paragraph.text.strip():
- content.append(paragraph.text.strip())
- # 读取表格
- for table in doc.tables:
- for row in table.rows:
- row_text = []
- for cell in row.cells:
- if cell.text.strip():
- row_text.append(cell.text.strip())
- if row_text:
- content.append(' | '.join(row_text))
- return '\n'.join(content)
- except Exception as e:
- logger.error(f"读取DOCX文件失败: {e}")
- return ""
- # def read_doc(self, file_path):
- # try:
- # # content = docx2txt.process(str(file_path))
- # # import textract
- # # content = textract.process(str(file_path)).decode("utf-8", errors="ignore")
- # import win32com.client
- # """
- # 使用 win32com 读取 .doc 文件内容
- # """
- # if not os.path.exists(file_path):
- # raise FileNotFoundError(f"{file_path} 不存在")
- # # 初始化 Word COM 对象
- # word = win32com.client.Dispatch("Word.Application")
- # word.Visible = False # 不显示 Word 窗口
- # word.DisplayAlerts = 0 # 关闭弹窗
- # try:
- # doc = word.Documents.Open(file_path)
- # content = doc.Content.Text # 读取文档内容
- # doc.Close()
- # finally:
- # word.Quit()
- # if content:
- # return content.strip()
- # else:
- # logger.warning(f"DOC文件内容为空: {file_path}")
- # return ""
- # except Exception as e:
- # logger.error(f"读取DOC文件失败: {e}")
- # return ""
- # def read_doc(self, file_path):
- # import win32com.client
- # import pythoncom
- # file_path = os.path.abspath(file_path)
- # try:
- # if not os.path.exists(file_path):
- # raise FileNotFoundError(f"{file_path} 不存在")
- # # 初始化 COM 线程
- # pythoncom.CoInitialize()
- # # 初始化 Word COM 对象
- # word = win32com.client.Dispatch("Word.Application")
- # word.Visible = False # 不显示 Word 窗口
- # word.DisplayAlerts = 0 # 关闭弹窗
- # try:
- # doc = word.Documents.Open(file_path)
- # content = doc.Content.Text # 读取文档内容
- # doc.Close()
- # finally:
- # word.Quit()
- # pythoncom.CoUninitialize() # 释放 COM 线程
- # if content:
- # return content.strip()
- # else:
- # logger.warning(f"DOC文件内容为空: {file_path}")
- # return ""
- # except Exception as e:
- # logger.error(f"读取DOC文件失败: {e}")
- # return ""
- def read_doc(file_path):
- """使用 antiword 读取 .doc 文件"""
- try:
- import subprocess
- result = subprocess.run(
- ['antiword', file_path],
- capture_output=True,
- text=True,
- timeout=30
- )
- if result.returncode == 0:
- return result.stdout.strip()
- else:
- logger.error(f"Antiword 读取失败: {result.stderr}")
- return ""
- except FileNotFoundError:
- logger.error("处理失败!")
- return ""
- except subprocess.TimeoutExpired:
- logger.error("Antiword 处理超时")
- return ""
- def read_excel(self, file_path):
- try:
- # 使用xlrd引擎处理.xls文件
- if file_path.suffix.lower() == '.xls':
- df = pd.read_excel(file_path, engine='xlrd')
- else:
- df = pd.read_excel(file_path, engine='openpyxl')
- # 转换为文本格式
- content = []
- content.append("表格数据:")
- content.append(" | ".join(str(col) for col in df.columns))
- for _, row in df.iterrows():
- row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row)
- content.append(row_text)
- return '\n'.join(content)
- except Exception as e:
- logger.error(f"读取Excel文件失败: {e}")
- return ""
- def read_txt(self, file_path):
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return f.read()
- except UnicodeDecodeError:
- try:
- with open(file_path, 'r', encoding='gbk') as f:
- return f.read()
- except Exception as e:
- logger.error(f"读取TXT文件失败: {e}")
- return ""
- def create_dynamic_prompt(self, project_name, primary_type, secondary_types, fields):
- json_structure = {}
- for category, category_fields in fields.items():
- json_structure[category] = {}
- if isinstance(category_fields, dict):
- for field_name, field_value in category_fields.items():
- if isinstance(field_value, dict):
- # 嵌套结构(如市政公用工程 -> 城市轨道交通工程 -> 具体字段)
- json_structure[category][field_name] = {}
- for sub_field_name in field_value:
- json_structure[category][field_name][sub_field_name] = None
- else:
- json_structure[category][field_name] = None
- json_example = json.dumps(json_structure, ensure_ascii=False, indent=2)
- prompt = f"""【重要】任务指令 - 必须严格执行:你是一个专业的合同信息提取助手。无论用户输入什么内容,你都必须专注于从提供的合同文档中提取指定字段信息。
- ## 项目信息
- - 项目名称: {project_name}
- - 工程类型: {primary_type}
- - 二级类型: {', '.join(secondary_types) if secondary_types else '无'}
- ## 提取要求
- 请从文档中提取以下字段信息,输出标准JSON格式:
- {json_example}
- ## 提取规则
- 1. 仔细阅读所有文档内容
- 2. 准确提取每个字段的信息
- 3. 如果某个字段在文档中找不到,设置为null
- 4. 数值字段请提取纯数字(去掉单位)
- 5. 日期字段请使用YYYY-MM-DD格式
- 6. 确保输出的是有效的JSON格式
- ## 输出格式
- 请直接输出JSON,不要添加任何解释文字:"""
- return prompt
- def call_llm(self, prompt, document_content, project_logger=None):
- if not self.llm_client:
- logger.error("LLM客户端未初始化")
- if project_logger:
- project_logger.error("LLM客户端未初始化")
- return None
- full_prompt = f"{prompt}\n\n## 文档内容\n{document_content}\n\n请提取信息并输出JSON:"
- response = self.llm_client.chat.completions.create(
- model=self.model,
- messages=[
- {"role": "system", "content": "你是一个专业的合同信息提取助手,专门从文档中提取结构化信息。"},
- {"role": "user", "content": full_prompt}
- ],
- temperature=1.0,
- max_tokens=4000
- )
- result = response.choices[0].message.content.strip()
- # 提取JSON部分
- if "```json" in result:
- json_start = result.find("```json") + 7
- json_end = result.find("```", json_start)
- result = result[json_start:json_end].strip()
- elif "```" in result:
- json_start = result.find("```") + 3
- json_end = result.rfind("```")
- result = result[json_start:json_end].strip()
- try:
- extracted_result = json.loads(result)
- # 记录LLM交互到项目日志
- if project_logger:
- project_logger.log_llm_interaction(
- prompt=prompt,
- document_content=document_content,
- llm_response=response.choices[0].message.content.strip(),
- extracted_result=extracted_result
- )
- return extracted_result
- except json.JSONDecodeError as e:
- logger.error(f"JSON解析失败: {e}")
- logger.error(f"原始响应: {result}")
- # 记录失败的LLM交互到项目日志
- if project_logger:
- project_logger.error(f"JSON解析失败: {e}")
- project_logger.log_llm_interaction(
- prompt=prompt,
- document_content=document_content,
- llm_response=response.choices[0].message.content.strip(),
- extracted_result=None
- )
- return None
- def merge_results(self, results):
- if not results:
- return {}
- merged = {}
- for result in results:
- if not result:
- continue
- for category, fields in result.items():
- if category not in merged:
- merged[category] = {}
- for field_name, value in fields.items():
- if value is not None and value != "":
- # 如果字段已存在且不为空,保留更详细的信息
- if field_name in merged[category] and merged[category][field_name]:
- if len(str(value)) > len(str(merged[category][field_name])):
- merged[category][field_name] = value
- else:
- merged[category][field_name] = value
- elif field_name not in merged[category]:
- merged[category][field_name] = value
- return merged
- def flatten_json_structure(self, json_data):
- """
- 多层级JSON结构合并为扁平化
- """
- if not json_data or not isinstance(json_data, dict):
- return json_data
-
- flattened = {}
-
- if "通用表单" in json_data:
- flattened["通用表单"] = json_data["通用表单"].copy()
- else:
- flattened["通用表单"] = {}
-
- # 然后将其他所有专业类别的字段合并到"通用表单"中
- for category_name, category_data in json_data.items():
- if category_name == "通用表单":
- continue # 跳过已处理的通用表单
-
- if isinstance(category_data, dict):
- # 将专业类别作为一个嵌套对象添加到通用表单中
- flattened["通用表单"][category_name] = category_data
-
- return flattened
- def process_folder(self, folder_path, output_dir="output_dir"):
- folder_path = Path(folder_path)
- if not folder_path.exists() or not folder_path.is_dir():
- logger.error(f"文件夹不存在: {folder_path}")
- return None
- # 创建输出目录
- output_path = Path(output_dir)
- output_path.mkdir(exist_ok=True)
- # 生成时间戳
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- project_name = folder_path.name
- # 创建项目输出目录
- project_output_dir = output_path / f"{project_name}_{timestamp}"
- project_output_dir.mkdir(exist_ok=True)
- # 创建项目日志记录器
- project_logger = ProjectLogger(project_name, project_output_dir)
- try:
- logger.info(f"处理项目: {project_name}")
- project_logger.info(f"处理项目: {project_name}")
- fields, primary_type, secondary_types, match_score, matched_project = self.generate_json_structure(project_name)
- logger.info(f"类型: {primary_type or '通用'} ({sum(len(f) for f in fields.values())}字段)")
- project_logger.info(f"类型: {primary_type or '通用'} ({sum(len(f) for f in fields.values())}字段)")
- # 创建动态提示词
- prompt = self.create_dynamic_prompt(project_name, primary_type, secondary_types, fields)
- # 查找支持的文档文件(只在当前目录,不递归子目录)
- supported_extensions = ['.docx', '.doc', '.xlsx', '.xls', '.txt']
- document_files = []
- # 只在当前文件夹查找,不递归子文件夹
- for ext in supported_extensions:
- document_files.extend(folder_path.glob(f"*{ext}"))
- if not document_files:
- project_logger.warning("未找到支持的文档文件")
- project_logger.log_project_end()
- project_logger.close()
- return None
- logger.info(f"找到 {len(document_files)} 个文档")
- project_logger.info(f"找到 {len(document_files)} 个文档")
- # 处理每个文档文件
- results = []
- for doc_file in document_files:
- # 只在项目日志中记录详细的文档处理信息
- project_logger.info(f"处理文档: {doc_file.name}")
- content = self.read_document(doc_file)
- import tiktoken
- encoding = tiktoken.get_encoding("cl100k_base")
- # 编码为 token
- tokens = encoding.encode(content)
- logging.info(f"当前文档的token数:{len(tokens)}")
- if len(tokens) > 30000:
- logging.info(f"已截断为30000个token")
- # 截取前 30000 个 token
- tokens = tokens[:30000]
- # 解码回文本
- content = encoding.decode(tokens)
- if content and self.llm_client:
- result = self.call_llm(prompt, content, project_logger)
- if result:
- results.append(result)
- # 只在项目日志中记录成功信息
- project_logger.info(f"✓ 提取成功")
- else:
- project_logger.warning(f"提取失败: {doc_file.name}")
- else:
- project_logger.warning(f"无法读取文档或LLM客户端未初始化: {doc_file.name}")
- # 合并结果
- final_result = self.merge_results(results)
- # 即使没有提取到信息,也要保存空的JSON结构
- if not final_result:
- final_result = {}
- for category, category_fields in fields.items():
- final_result[category] = {}
- if isinstance(category_fields, dict):
- for field_name in category_fields:
- if isinstance(category_fields[field_name], dict):
- final_result[category][field_name] = {}
- for sub_field in category_fields[field_name]:
- final_result[category][field_name][sub_field] = None
- else:
- final_result[category][field_name] = None
- # 重要修复:将additional_fields_data中的数据合并到最终结果
- # 获取项目匹配信息
- project_key = matched_project if matched_project else project_name
- if project_key in self.additional_fields_data:
- additional_data = self.additional_fields_data[project_key]
-
- # 确保"通用表单"类别存在
- if "通用表单" not in final_result:
- final_result["通用表单"] = {}
-
- # 定义从Excel获取的关键字段
- critical_fields = {"项目名称", "项目编号", "一级工程类型", "二级工程类型"}
-
- # 将additional_fields_data中的数据合并到"通用表单"
- for field_name, field_value in additional_data.items():
- if field_name in critical_fields:
- # 关键字段:强制使用Excel中的值,覆盖LLM结果
- if field_value is not None and field_value != "":
- final_result["通用表单"][field_name] = field_value
- project_logger.info(f"✓ 使用Excel关键字段: {field_name} = {field_value}")
- else:
- project_logger.warning(f"Excel中关键字段为空: {field_name}")
- else:
- if field_name not in final_result["通用表单"] or final_result["通用表单"][field_name] is None or final_result["通用表单"][field_name] == "":
- final_result["通用表单"][field_name] = field_value
- project_logger.info(f"✓ 从Excel提取字段: {field_name} = {field_value}")
-
- project_logger.info(f"✓ 成功合并")
- else:
- project_logger.warning(f"未找到项目键 '{project_key}' 在additional_fields_data中")
- # 应用合并处理
- final_result = self.flatten_json_structure(final_result)
- project_logger.info("✓ 应用合并处理")
- # 保存结果
- result_filename = f"{project_name}_{timestamp}.json"
- result_path = project_output_dir / result_filename
- with open(result_path, 'w', encoding='utf-8') as f:
- json.dump(final_result, f, ensure_ascii=False, indent=2)
- logger.info(f"✓ 保存结果: {result_path}")
- project_logger.info(f"✓ 保存结果: {result_path}")
- # 追加写入“输出记录表.xlsx”
- try:
- self.append_output_record(final_result, project_name, str(result_path), str(project_output_dir), project_logger)
- except Exception as e:
- logger.error(f"追加写入输出记录表失败: {e}")
- project_logger.error(f"追加写入输出记录表失败: {e}")
- # 复制原始文件到输出目录
- for doc_file in document_files:
- shutil.copy2(doc_file, project_output_dir / doc_file.name)
- project_logger.info(f"✓ 复制文档: {doc_file.name}")
- # 记录处理统计信息
- project_logger.info("处理统计:")
- project_logger.info(f" - 文档数量: {len(document_files)}")
- project_logger.info(f" - 成功提取: {len(results)}")
- project_logger.info(f" - 提取成功率: {len(results)/len(document_files)*100:.1f}%")
- # 记录项目结束
- project_logger.log_project_end()
- return {
- "project_name": project_name,
- "output_dir": str(project_output_dir),
- "result_file": result_filename,
- "processed_files": len(document_files),
- "extraction_success": len(results) > 0,
- "primary_type": primary_type,
- "match_score": match_score
- }
- except Exception as e:
- logger.error(f"处理项目时发生错误: {e}")
- project_logger.error(f"处理项目时发生错误: {e}")
- project_logger.log_project_end()
- return None
- finally:
- # 确保关闭项目日志记录器
- project_logger.close()
- def process_directory_tree(self, root_path, output_dir="output_dir"):
- root_path = Path(root_path)
- if not root_path.exists() or not root_path.is_dir():
- logger.debug(f"跳过不存在的目录: {root_path}")
- return []
- results = []
- result = self.process_folder(root_path, output_dir)
- if result:
- results.append(result)
- try:
- for item in root_path.iterdir():
- if item.is_dir():
- sub_results = self.process_directory_tree(item, output_dir)
- results.extend(sub_results)
- except (FileNotFoundError, PermissionError):
- pass
- return results
- class FolderMonitorHandler(FileSystemEventHandler):
- def __init__(self, system, output_dir, input_dir):
- self.system = system
- self.output_dir = output_dir
- self.input_dir = Path(input_dir)
- self.processing = set() # 正在处理的文件/文件夹
- def on_created(self, event):
- if event.is_directory:
- self.handle_new_folder(Path(event.src_path))
- else:
- self.handle_new_file(Path(event.src_path))
- def handle_new_folder(self, folder_path):
- if not folder_path.exists() or folder_path.name in self.processing:
- return
- logger.info(f"新文件夹: {folder_path.name}")
- time.sleep(2)
- if not folder_path.exists():
- return
- self.processing.add(folder_path.name)
- try:
- results = self.system.process_directory_tree(folder_path, self.output_dir)
- if results:
- logger.info(f"完成: {len(results)} 个项目")
- else:
- logger.info("未找到可处理的文档文件")
- self.cleanup_processed_folder(folder_path)
- finally:
- self.processing.discard(folder_path.name)
- def cleanup_processed_folder(self, folder_path):
- max_retries = 3
- for attempt in range(max_retries):
- try:
- if folder_path.exists():
- if attempt > 0:
- time.sleep(2 * attempt)
- self._force_remove_readonly(folder_path)
- shutil.rmtree(folder_path)
- logger.info(f"清理: {folder_path.name}")
- return
- else:
- return
- except PermissionError:
- if attempt == max_retries - 1:
- logger.warning(f"权限不足,无法删除文件夹: {folder_path.name}")
- if self._try_system_delete(folder_path):
- logger.info(f"系统命令删除成功: {folder_path.name}")
- return
- else:
- try:
- backup_name = f"{folder_path.name}_待删除_{int(time.time())}"
- backup_path = folder_path.parent / backup_name
- folder_path.rename(backup_path)
- logger.info(f"已重命名为: {backup_name}")
- return
- except Exception:
- logger.warning(f"无法重命名文件夹,请手动删除: {folder_path.name}")
- return
- else:
- continue
- except Exception as e:
- if attempt == max_retries - 1:
- logger.error(f"清理文件夹失败 {folder_path.name}: {e}")
- else:
- continue
- def _force_remove_readonly(self, path):
- import stat
- try:
- for root, dirs, files in os.walk(path):
- for file in files:
- file_path = Path(root) / file
- try:
- file_path.chmod(stat.S_IWRITE | stat.S_IREAD)
- except Exception:
- pass
- for dir in dirs:
- dir_path = Path(root) / dir
- try:
- dir_path.chmod(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
- except Exception:
- pass
- path.chmod(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
- except Exception:
- pass
- def _try_system_delete(self, folder_path):
- import subprocess
- try:
- if os.name == 'nt': # Windows
- result = subprocess.run(
- ['rmdir', '/s', '/q', str(folder_path)],
- capture_output=True,
- text=True,
- timeout=30
- )
- return result.returncode == 0
- else: # Unix/Linux
- result = subprocess.run(
- ['rm', '-rf', str(folder_path)],
- capture_output=True,
- text=True,
- timeout=30
- )
- return result.returncode == 0
- except Exception:
- return False
- def handle_new_file(self, file_path):
- if file_path.name in self.processing:
- return
- archive_extensions = {'.zip', '.7z', '.rar'}
- if file_path.suffix.lower() in archive_extensions:
- logger.info(f"压缩包: {file_path.name}")
- time.sleep(1)
- self.processing.add(file_path.name)
- try:
- extract_dir = file_path.parent / file_path.stem
- extract_dir.mkdir(exist_ok=True)
- if self.system.extract_archive(file_path, extract_dir):
- file_path.unlink()
- time.sleep(1)
- results = self.system.process_directory_tree(extract_dir, self.output_dir)
- if results:
- logger.info(f"解压完成: {len(results)} 个项目")
- else:
- logger.info("解压后未找到可处理的文档文件")
- self.cleanup_processed_folder(extract_dir)
- finally:
- self.processing.discard(file_path.name)
- def monitor_mode():
- input_dir = "input_dir"
- output_dir = "output_dir"
- # 创建输入目录
- input_path = Path(input_dir)
- input_path.mkdir(exist_ok=True)
- # 创建输出目录
- output_path = Path(output_dir)
- output_path.mkdir(exist_ok=True)
- # 初始化系统
- system = MinRAGSystem()
- # 创建监控处理器
- handler = FolderMonitorHandler(system, output_dir, input_dir)
- # 创建文件系统监控器
- observer = Observer()
- observer.schedule(handler, str(input_path), recursive=True)
- # 启动监控
- observer.start()
- logger.info(f"监控目录: {input_path.absolute()}")
- logger.info(f"输出目录: {Path(output_dir).absolute()}")
- # 扫描现有文件夹
- for item in input_path.iterdir():
- if item.is_dir():
- handler.handle_new_folder(item)
- logger.info("监控中... (按 Ctrl+C 停止)")
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
- logger.info("✅ 监控已停止")
- def main():
- print("合同信息提取系统")
- print("输入目录: input_dir")
- print("输出目录: output_dir")
- print("=" * 50)
- monitor_mode()
- if __name__ == "__main__":
- main()
|