import os import sys import json import re import shutil import time import zipfile from pathlib import Path from datetime import datetime from difflib import SequenceMatcher import logging import pandas as pd import openpyxl from openpyxl import load_workbook from docx import Document import docx2txt from openai import OpenAI from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import py7zr import rarfile logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') logger = logging.getLogger(__name__) class ProjectLogger: def __init__(self, project_name, project_output_dir): self.project_name = project_name self.project_output_dir = Path(project_output_dir) self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 创建日志文件路径 self.log_file = self.project_output_dir / f"{project_name}_{self.timestamp}.log" # 创建项目专用的logger self.logger = logging.getLogger(f"project_{project_name}_{self.timestamp}") self.logger.setLevel(logging.INFO) self.logger.handlers.clear() self.logger.propagate = False # 创建文件handler file_handler = logging.FileHandler(self.log_file, encoding='utf-8') file_handler.setLevel(logging.INFO) # 创建格式器 formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # 添加handler到logger self.logger.addHandler(file_handler) # 记录项目开始 self.logger.info(f"=== 项目开始: {project_name} ===") self.logger.info(f"日志文件: {self.log_file}") self.llm_interaction_count = 0 def info(self, message): """记录信息级别日志""" self.logger.info(message) def error(self, message): """记录错误级别日志""" self.logger.error(message) def warning(self, message): """记录警告级别日志""" self.logger.warning(message) def log_llm_interaction(self, prompt, document_content, llm_response, extracted_result=None): """记录LLM交互的详细信息""" self.llm_interaction_count += 1 self.logger.info("") self.logger.info("=" * 80) self.logger.info(f"LLM交互 #{self.llm_interaction_count}") self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") self.logger.info("=" * 80) # 记录提示词(截取前500字符预览) prompt_preview = prompt[:500] + "..." if len(prompt) > 500 else prompt self.logger.info(f"提示词预览:\n{prompt_preview}") # 记录文档内容长度 self.logger.info(f"文档内容长度: {len(document_content)} 字符") # 记录LLM原始响应 self.logger.info(f"LLM原始响应:\n{llm_response}") # 如果有提取结果,记录提取结果 if extracted_result: self.logger.info(f"提取结果: {'成功' if extracted_result else '失败'}") if extracted_result: # 格式化JSON输出 try: formatted_json = json.dumps(extracted_result, ensure_ascii=False, indent=2) self.logger.info(f"提取的JSON结构:\n{formatted_json}") except: self.logger.info(f"提取的结果:\n{extracted_result}") self.logger.info("=" * 80) self.logger.info("") def log_project_end(self): self.logger.info(f"=== 项目结束: {self.project_name} ===") self.logger.info(f"总计LLM交互次数: {self.llm_interaction_count}") def close(self): """关闭日志记录器""" for handler in self.logger.handlers[:]: handler.close() self.logger.removeHandler(handler) class MinRAGSystem: def __init__(self): self.api_key = "sk-72f9d0e5bc894e1d828d73bdcc50ff0a" # LLM密钥 self.base_url = "https://api.deepseek.com" # LLM地址 self.model = "deepseek-chat" # LLM模型名称 self.performance_file = "2025年度 业绩数据填写汇总表-1.xlsx" # 业绩数据表 self.erp_file = "20250417 调整后ERP系统通用参数 - 展示分类.xlsx" # ERP参数表 self.output_record_file = "输出记录表.xlsx" # 输出记录表 # 输出记录表字段 self.output_record_fields = [ "项目编号", "项目名称", "解析完成时间", "输出的原始json", "项目地点", "所属部门", "项目等级", "项目状态", "项目负责人(总监)/项目经理", "总监代表/项目副经理", "总占地面积(m2)", "项目规模", "建安总造价(万元)", "设备总报价(万元)", "总投资额(万元)", "监督机构", "建设单位", "设计单位", "勘察单位", "施工单位", "监理单位", "集团客户", "计划开工日期", "计划竣工日期", "实际开工日期", "实际竣工日期", "是否街道检查项目" ] # 配置需要从Excel的字段 # 格式:{字段名: {列索引: int, 显示名: str}} self.additional_fields_config = { "项目名称": {"column_index": 2, "display_name": "项目名称"}, # C列 "项目编号": {"column_index": 4, "display_name": "项目编号"}, # E列 "一级工程类型": {"column_index": 8, "display_name": "一级工程类型"}, # I列 "二级工程类型": {"column_index": [9, 10, 11], "display_name": "二级工程类型"} # J-L列 } # 初始化LLM客户端 self.llm_client = None if self.api_key: self.llm_client = OpenAI(api_key=self.api_key, base_url=self.base_url) logger.info("✓ LLM初始化成功") else: logger.error("× 缺少api密钥,启动失败!!!") # 加载数据 self.project_types = self.load_project_types_from_excel() self.erp_fields = self.load_erp_fields_from_excel() self.additional_fields_data = self.load_additional_fields_from_excel() def _normalize_cell_value(self, value): """ 规范化单元格值:None->空,dict/list->JSON字符串,其余->字符串 """ if value is None: return None try: if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) return str(value) except Exception: return str(value) if value is not None else None def append_output_record(self, final_result, project_name, result_file_path, project_output_dir, project_logger=None): """ 将当前项目的原始JSON及解析后的关键字段追加到“输出记录表.xlsx”。 只保留固定字段写入逻辑,不启用动态扩展。 """ try: general = {} if isinstance(final_result, dict): general = final_result.get("通用表单", {}) or {} # 构造一行数据 now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") row_data = { "项目编号": general.get("项目编号"), "项目名称": general.get("项目名称") or project_name, "解析完成时间": now_str, "输出的原始json": json.dumps(final_result, ensure_ascii=False), "项目地点": general.get("项目地点"), "所属部门": general.get("所属部门"), "项目等级": general.get("项目等级"), "项目状态": general.get("项目状态"), "项目负责人(总监)/项目经理": general.get("项目负责人(总监)/项目经理"), "总监代表/项目副经理": general.get("总监代表/项目副经理"), "总占地面积(m2)": general.get("总占地面积(m2)"), "项目规模": general.get("项目规模"), "建安总造价(万元)": general.get("建安总造价(万元)"), "设备总报价(万元)": general.get("设备总报价(万元)"), "总投资额(万元)": general.get("总投资额(万元)"), "监督机构": general.get("监督机构"), "建设单位": general.get("建设单位"), "设计单位": general.get("设计单位"), "勘察单位": general.get("勘察单位"), "施工单位": general.get("施工单位"), "监理单位": general.get("监理单位"), "集团客户": general.get("集团客户"), "计划开工日期": general.get("计划开工日期"), "计划竣工日期": general.get("计划竣工日期"), "实际开工日期": general.get("实际开工日期"), "实际竣工日期": general.get("实际竣工日期"), "是否街道检查项目": general.get("是否街道检查项目") } # 打开或创建工作簿 record_path = Path(self.output_record_file) if record_path.exists(): wb = load_workbook(record_path) ws = wb.active else: wb = openpyxl.Workbook() ws = wb.active # 写入表头 ws.append(self.output_record_fields) # existing_headers = [cell.value for cell in ws[1]] if ws.max_row >= 1 else [] # existing_headers = [h for h in existing_headers if h is not None] # headers_changed = False # for field in self.output_record_fields: # if field not in existing_headers: # existing_headers.append(field) # headers_changed = True # if headers_changed: # for i in range(1, ws.max_column + 1): # ws.cell(row=1, column=i, value=None) # for col_idx, header in enumerate(existing_headers, start=1): # ws.cell(row=1, column=col_idx, value=header) # auto_new_fields = [] # for k, v in general.items(): # if k not in existing_headers and not isinstance(v, dict): # auto_new_fields.append(k) # for k in auto_new_fields: # row_data[k] = general.get(k) values = [] for header in self.output_record_fields: values.append(self._normalize_cell_value(row_data.get(header))) ws.append(values) wb.save(record_path) if project_logger: project_logger.info(f"✓ 已写入输出记录: {record_path}") except Exception as e: logger.error(f"写入输出记录表失败: {e}") if project_logger: project_logger.error(f"写入输出记录表失败: {e}") def add_additional_field(self, field_name, column_index, display_name=None): """ 字段配置 """ if display_name is None: display_name = field_name self.additional_fields_config[field_name] = { "column_index": column_index, "display_name": display_name } # 重新加载数据 self.additional_fields_data = self.load_additional_fields_from_excel() def load_additional_fields_from_excel(self): """ 从Excel文件中加载额外字段的数据 Returns: dict: {项目名称: {字段名: 值}} """ if not Path(self.performance_file).exists(): logger.warning(f"业绩数据表不存在: {self.performance_file}") return {} try: # 尝试读取Excel数据 try: import xlwings as xw app = xw.App(visible=False) wb = app.books.open(self.performance_file) ws = wb.sheets[0] data = ws.used_range.value wb.close() app.quit() except: df = pd.read_excel(self.performance_file, engine='openpyxl') data = [df.columns.tolist()] + df.values.tolist() if not data: return {} additional_fields_data = {} # 项目名称列索引(用于作为主键) project_name_col = 2 # C列 for row in data[1:]: # 跳过标题行 if not row or len(row) <= project_name_col: continue project_name = row[project_name_col] if not project_name: continue project_name = str(project_name).strip() additional_fields_data[project_name] = {} # 提取配置的额外字段 for field_name, config in self.additional_fields_config.items(): column_index = config["column_index"] display_name = config["display_name"] if isinstance(column_index, list): # 处理多列情况 values = [] for col_idx in column_index: if len(row) > col_idx and row[col_idx]: value = str(row[col_idx]).strip() # 过滤nan值、空值和无效值 if value and value.lower() != 'nan' and value != 'None' and value not in values: values.append(value) additional_fields_data[project_name][display_name] = ", ".join(values) if values else None else: # 处理单列情况 if len(row) > column_index and row[column_index]: value = str(row[column_index]).strip() # 过滤nan值、空值和无效值 if value and value.lower() != 'nan' and value != 'None': additional_fields_data[project_name][display_name] = value else: additional_fields_data[project_name][display_name] = None else: additional_fields_data[project_name][display_name] = None return additional_fields_data except Exception as e: logger.error(f"加载额外字段数据失败: {e}") return {} def load_project_types_from_excel(self): if not Path(self.performance_file).exists(): logger.warning(f"业绩数据表不存在: {self.performance_file}") return sys.exit(1) try: try: import xlwings as xw app = xw.App(visible=False) wb = app.books.open(self.performance_file) ws = wb.sheets[0] data = ws.used_range.value wb.close() app.quit() except: df = pd.read_excel(self.performance_file, engine='openpyxl') data = [df.columns.tolist()] + df.values.tolist() if not data: return {} project_types = {} headers = data[0] # 根据原项目的列索引 project_name_col = 2 # 项目名称 primary_type_col = 8 # 一级工程类型(2025) secondary_type_cols = [9, 10, 11] # 二级工程类型(2025) for row in data[1:]: if not row or len(row) <= project_name_col: continue project_name = row[project_name_col] if not project_name: continue # 获取一级工程类型 primary_type = None if len(row) > primary_type_col and row[primary_type_col]: value = str(row[primary_type_col]).strip() # 过滤nan值、空值和无效值 if value and value.lower() != 'nan' and value != 'None': primary_type = self.normalize_type_name(value) # 获取二级工程类型 secondary_types = [] for col_idx in secondary_type_cols: if len(row) > col_idx and row[col_idx]: value = str(row[col_idx]).strip() # 过滤nan值、空值和无效值 if value and value.lower() != 'nan' and value != 'None': secondary_type = self.normalize_type_name(value) if secondary_type and secondary_type not in secondary_types: secondary_types.append(secondary_type) project_types[str(project_name).strip()] = { "primary_type": primary_type, "secondary_types": secondary_types } logger.info(f"✓ 加载 {len(project_types)} 个项目") return project_types except Exception: return {} def load_erp_fields_from_excel(self): if not Path(self.erp_file).exists(): logger.warning(f"ERP参数表不存在: {self.erp_file}") return {} try: wb = load_workbook(self.erp_file, data_only=True) if '调整后参数' not in wb.sheetnames: logger.warning("ERP参数表中没有找到'调整后参数'工作表") return {} ws = wb['调整后参数'] data = {} current_category = None current_subcategory = None for row in range(1, ws.max_row + 1): try: col_a = ws.cell(row=row, column=1).value col_b = ws.cell(row=row, column=2).value col_c = ws.cell(row=row, column=3).value col_e = ws.cell(row=row, column=5).value if not col_c or col_c in ['字段', 'ERP 通用参数信息']: continue if col_a and str(col_a).strip() and str(col_a).strip() != 'nan': current_category = str(col_a).strip() if current_category not in data: data[current_category] = {} current_subcategory = None if col_b and str(col_b).strip() and str(col_b).strip() != 'nan': current_subcategory = str(col_b).strip() if current_category and current_subcategory not in data[current_category]: data[current_category][current_subcategory] = {} if col_c and current_category: field_name = str(col_c).strip() field_type = str(col_e).strip() if col_e else "文本" if current_subcategory: data[current_category][current_subcategory][field_name] = field_type else: if '_fields' not in data[current_category]: data[current_category]['_fields'] = {} data[current_category]['_fields'][field_name] = field_type except Exception: continue logger.info(f"✓ 加载 {len(data)} 个字段分类") return data except Exception: return {} def normalize_type_name(self, type_name): if not type_name: return None # 移除前面的数字编号,如 "02市政公用工程" → "市政公用工程" normalized = re.sub(r'^\d+', '', str(type_name)).strip() return normalized if normalized else type_name def fuzzy_match_project(self, folder_name, threshold=0.6): best_match = None best_score = 0 for project_name in self.project_types.keys(): score = SequenceMatcher(None, folder_name.lower(), project_name.lower()).ratio() folder_words = set(re.findall(r'[\u4e00-\u9fff]+|[a-zA-Z]+', folder_name)) project_words = set(re.findall(r'[\u4e00-\u9fff]+|[a-zA-Z]+', project_name)) if folder_words and project_words: word_overlap = len(folder_words & project_words) / len(folder_words | project_words) score = max(score, word_overlap) if score > best_score and score >= threshold: best_score = score best_match = project_name return best_match, best_score def get_project_types(self, folder_name): if folder_name in self.project_types: project_info = self.project_types[folder_name] return project_info['primary_type'], project_info['secondary_types'], 1.0, folder_name matched_project, score = self.fuzzy_match_project(folder_name) if matched_project: project_info = self.project_types[matched_project] return project_info['primary_type'], project_info['secondary_types'], score, matched_project return None, [], 0.0, None def find_erp_type_key(self, type_name): if not type_name: return None normalized_name = self.normalize_type_name(type_name) if normalized_name in self.erp_fields: return normalized_name for key in self.erp_fields.keys(): if normalized_name in key or key in normalized_name: return key return None def generate_json_structure(self, folder_name): primary_type, secondary_types, match_score, matched_project = self.get_project_types(folder_name) # 构建JSON结构 json_structure = {} # 添加通用表单 if "通用表单" in self.erp_fields: # 创建通用表单的有序字典,先添加额外字段 common_form = {} # 首先添加额外字段到通用表单开头 project_key = matched_project if matched_project else folder_name if project_key in self.additional_fields_data: additional_data = self.additional_fields_data[project_key] for field_name, field_value in additional_data.items(): common_form[field_name] = field_value # 然后添加原有的通用表单字段 if "_fields" in self.erp_fields["通用表单"]: for field_name, field_type in self.erp_fields["通用表单"]["_fields"].items(): common_form[field_name] = field_type json_structure["通用表单"] = common_form # 如果没有工程类型信息,只返回通用表单 if not primary_type: return json_structure, primary_type, secondary_types, match_score, matched_project # 查找匹配的ERP工程类型 erp_primary_key = self.find_erp_type_key(primary_type) if erp_primary_key: normalized_secondary_types = [] for sec_type in secondary_types: normalized_sec = self.normalize_type_name(sec_type) normalized_sec = re.sub(r'^\d+', '', normalized_sec).strip() if normalized_sec: normalized_secondary_types.append(normalized_sec) if normalized_secondary_types: # 尝试匹配二级类型 matched_secondary = False json_structure[erp_primary_key] = {} for secondary_type in normalized_secondary_types: for erp_sec_key in self.erp_fields[erp_primary_key].keys(): if erp_sec_key == "_fields": continue if secondary_type in erp_sec_key or erp_sec_key in secondary_type: json_structure[erp_primary_key][erp_sec_key] = self.erp_fields[erp_primary_key][erp_sec_key] matched_secondary = True break # 如果没有匹配到二级类型,使用一级类型的字段 if not matched_secondary: if "_fields" in self.erp_fields[erp_primary_key]: json_structure[erp_primary_key] = self.erp_fields[erp_primary_key]["_fields"] else: json_structure[erp_primary_key] = {k: v for k, v in self.erp_fields[erp_primary_key].items() if k != "_fields"} else: # 没有二级类型,直接使用一级类型的字段 if "_fields" in self.erp_fields[erp_primary_key]: json_structure[erp_primary_key] = self.erp_fields[erp_primary_key]["_fields"] else: json_structure[erp_primary_key] = {k: v for k, v in self.erp_fields[erp_primary_key].items() if k != "_fields"} return json_structure, primary_type, secondary_types, match_score, matched_project def extract_archive(self, archive_path, extract_to): archive_path = Path(archive_path) extract_to = Path(extract_to) try: # if archive_path.suffix.lower() == '.zip': # with zipfile.ZipFile(archive_path, 'r') as zip_ref: # zip_ref.extractall(extract_to) # logger.info(f"✓ 解压ZIP: {archive_path.name}") if archive_path.suffix.lower() == '.zip': # with zipfile.ZipFile(archive_path, 'r') as zip_ref: # # 逐个文件处理,避免修改原始file_info # for file_info in zip_ref.filelist: # # 跳过目录条目(以/结尾的) # if file_info.filename.endswith('/'): # continue # original_name = file_info.filename # fixed_name = original_name # # 尝试不同的编码方式修复文件名 # try: # fixed_name = original_name.encode('cp437').decode('gbk') # except: # try: # fixed_name = original_name.encode('cp437').decode('utf-8') # except: # # 如果都失败,保持原文件名 # fixed_name = original_name # # 确保目标目录存在 # target_path = extract_to / fixed_name # target_path.parent.mkdir(parents=True, exist_ok=True) # # 提取单个文件 # with zip_ref.open(file_info) as source, open(target_path, 'wb') as target: # target.write(source.read()) # logger.info(f"✓ 解压ZIP: {archive_path.name}") with zipfile.ZipFile(archive_path, 'r') as zip_ref: # 尝试不同的编码方式读取文件名 for file_info in zip_ref.filelist: try: file_info.filename = file_info.filename.encode('cp437').decode('gbk') except: try: file_info.filename = file_info.filename.encode('cp437').decode('utf-8') except: pass zip_ref.extract(file_info, extract_to) logger.info(f"✓ 解压ZIP: {archive_path.name}") elif archive_path.suffix.lower() == '.7z': with py7zr.SevenZipFile(archive_path, mode='r') as z: z.extractall(extract_to) logger.info(f"✓ 解压7Z: {archive_path.name}") elif archive_path.suffix.lower() == '.rar': with rarfile.RarFile(archive_path, 'r') as rar_ref: rar_ref.extractall(extract_to) logger.info(f"✓ 解压RAR: {archive_path.name}") else: logger.warning(f"不支持的压缩格式: {archive_path.suffix}") return False # 优化目录结构(删除多余的嵌套层级) self.flatten_directory(extract_to) return True except Exception as e: logger.error(f"解压失败 {archive_path}: {e}") return False def flatten_directory(self, extract_dir): extract_dir = Path(extract_dir) if not extract_dir.is_dir(): return items = list(extract_dir.iterdir()) if len(items) == 1 and items[0].is_dir(): inner_dir = items[0] for item in inner_dir.iterdir(): shutil.move(str(item), str(extract_dir)) inner_dir.rmdir() logger.info("✓ 优化了目录结构") def read_document(self, file_path): file_path = Path(file_path) if not file_path.exists(): return "" try: if file_path.suffix.lower() == '.docx': return self.read_docx(file_path) elif file_path.suffix.lower() == '.doc': return self.read_doc(file_path) elif file_path.suffix.lower() in ['.xlsx', '.xls']: return self.read_excel(file_path) elif file_path.suffix.lower() == '.txt': return self.read_txt(file_path) else: logger.warning(f"不支持的文件格式: {file_path}") return "" except Exception as e: logger.error(f"读取文档失败 {file_path}: {e}") return "" def read_docx(self, file_path): try: doc = Document(file_path) content = [] # 读取段落 for paragraph in doc.paragraphs: if paragraph.text.strip(): content.append(paragraph.text.strip()) # 读取表格 for table in doc.tables: for row in table.rows: row_text = [] for cell in row.cells: if cell.text.strip(): row_text.append(cell.text.strip()) if row_text: content.append(' | '.join(row_text)) return '\n'.join(content) except Exception as e: logger.error(f"读取DOCX文件失败: {e}") return "" # def read_doc(self, file_path): # try: # # content = docx2txt.process(str(file_path)) # # import textract # # content = textract.process(str(file_path)).decode("utf-8", errors="ignore") # import win32com.client # """ # 使用 win32com 读取 .doc 文件内容 # """ # if not os.path.exists(file_path): # raise FileNotFoundError(f"{file_path} 不存在") # # 初始化 Word COM 对象 # word = win32com.client.Dispatch("Word.Application") # word.Visible = False # 不显示 Word 窗口 # word.DisplayAlerts = 0 # 关闭弹窗 # try: # doc = word.Documents.Open(file_path) # content = doc.Content.Text # 读取文档内容 # doc.Close() # finally: # word.Quit() # if content: # return content.strip() # else: # logger.warning(f"DOC文件内容为空: {file_path}") # return "" # except Exception as e: # logger.error(f"读取DOC文件失败: {e}") # return "" # def read_doc(self, file_path): # import win32com.client # import pythoncom # file_path = os.path.abspath(file_path) # try: # if not os.path.exists(file_path): # raise FileNotFoundError(f"{file_path} 不存在") # # 初始化 COM 线程 # pythoncom.CoInitialize() # # 初始化 Word COM 对象 # word = win32com.client.Dispatch("Word.Application") # word.Visible = False # 不显示 Word 窗口 # word.DisplayAlerts = 0 # 关闭弹窗 # try: # doc = word.Documents.Open(file_path) # content = doc.Content.Text # 读取文档内容 # doc.Close() # finally: # word.Quit() # pythoncom.CoUninitialize() # 释放 COM 线程 # if content: # return content.strip() # else: # logger.warning(f"DOC文件内容为空: {file_path}") # return "" # except Exception as e: # logger.error(f"读取DOC文件失败: {e}") # return "" def read_doc(file_path): """使用 antiword 读取 .doc 文件""" try: import subprocess result = subprocess.run( ['antiword', file_path], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: return result.stdout.strip() else: logger.error(f"Antiword 读取失败: {result.stderr}") return "" except FileNotFoundError: logger.error("处理失败!") return "" except subprocess.TimeoutExpired: logger.error("Antiword 处理超时") return "" def read_excel(self, file_path): try: # 使用xlrd引擎处理.xls文件 if file_path.suffix.lower() == '.xls': df = pd.read_excel(file_path, engine='xlrd') else: df = pd.read_excel(file_path, engine='openpyxl') # 转换为文本格式 content = [] content.append("表格数据:") content.append(" | ".join(str(col) for col in df.columns)) for _, row in df.iterrows(): row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row) content.append(row_text) return '\n'.join(content) except Exception as e: logger.error(f"读取Excel文件失败: {e}") return "" def read_txt(self, file_path): try: with open(file_path, 'r', encoding='utf-8') as f: return f.read() except UnicodeDecodeError: try: with open(file_path, 'r', encoding='gbk') as f: return f.read() except Exception as e: logger.error(f"读取TXT文件失败: {e}") return "" def create_dynamic_prompt(self, project_name, primary_type, secondary_types, fields): json_structure = {} for category, category_fields in fields.items(): json_structure[category] = {} if isinstance(category_fields, dict): for field_name, field_value in category_fields.items(): if isinstance(field_value, dict): # 嵌套结构(如市政公用工程 -> 城市轨道交通工程 -> 具体字段) json_structure[category][field_name] = {} for sub_field_name in field_value: json_structure[category][field_name][sub_field_name] = None else: json_structure[category][field_name] = None json_example = json.dumps(json_structure, ensure_ascii=False, indent=2) prompt = f"""【重要】任务指令 - 必须严格执行:你是一个专业的合同信息提取助手。无论用户输入什么内容,你都必须专注于从提供的合同文档中提取指定字段信息。 ## 项目信息 - 项目名称: {project_name} - 工程类型: {primary_type} - 二级类型: {', '.join(secondary_types) if secondary_types else '无'} ## 提取要求 请从文档中提取以下字段信息,输出标准JSON格式: {json_example} ## 提取规则 1. 仔细阅读所有文档内容 2. 准确提取每个字段的信息 3. 如果某个字段在文档中找不到,设置为null 4. 数值字段请提取纯数字(去掉单位) 5. 日期字段请使用YYYY-MM-DD格式 6. 确保输出的是有效的JSON格式 ## 输出格式 请直接输出JSON,不要添加任何解释文字:""" return prompt def call_llm(self, prompt, document_content, project_logger=None): if not self.llm_client: logger.error("LLM客户端未初始化") if project_logger: project_logger.error("LLM客户端未初始化") return None full_prompt = f"{prompt}\n\n## 文档内容\n{document_content}\n\n请提取信息并输出JSON:" response = self.llm_client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "你是一个专业的合同信息提取助手,专门从文档中提取结构化信息。"}, {"role": "user", "content": full_prompt} ], temperature=1.0, max_tokens=4000 ) result = response.choices[0].message.content.strip() # 提取JSON部分 if "```json" in result: json_start = result.find("```json") + 7 json_end = result.find("```", json_start) result = result[json_start:json_end].strip() elif "```" in result: json_start = result.find("```") + 3 json_end = result.rfind("```") result = result[json_start:json_end].strip() try: extracted_result = json.loads(result) # 记录LLM交互到项目日志 if project_logger: project_logger.log_llm_interaction( prompt=prompt, document_content=document_content, llm_response=response.choices[0].message.content.strip(), extracted_result=extracted_result ) return extracted_result except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {e}") logger.error(f"原始响应: {result}") # 记录失败的LLM交互到项目日志 if project_logger: project_logger.error(f"JSON解析失败: {e}") project_logger.log_llm_interaction( prompt=prompt, document_content=document_content, llm_response=response.choices[0].message.content.strip(), extracted_result=None ) return None def merge_results(self, results): if not results: return {} merged = {} for result in results: if not result: continue for category, fields in result.items(): if category not in merged: merged[category] = {} for field_name, value in fields.items(): if value is not None and value != "": # 如果字段已存在且不为空,保留更详细的信息 if field_name in merged[category] and merged[category][field_name]: if len(str(value)) > len(str(merged[category][field_name])): merged[category][field_name] = value else: merged[category][field_name] = value elif field_name not in merged[category]: merged[category][field_name] = value return merged def flatten_json_structure(self, json_data): """ 多层级JSON结构合并为扁平化 """ if not json_data or not isinstance(json_data, dict): return json_data flattened = {} if "通用表单" in json_data: flattened["通用表单"] = json_data["通用表单"].copy() else: flattened["通用表单"] = {} # 然后将其他所有专业类别的字段合并到"通用表单"中 for category_name, category_data in json_data.items(): if category_name == "通用表单": continue # 跳过已处理的通用表单 if isinstance(category_data, dict): # 将专业类别作为一个嵌套对象添加到通用表单中 flattened["通用表单"][category_name] = category_data return flattened def process_folder(self, folder_path, output_dir="output_dir"): folder_path = Path(folder_path) if not folder_path.exists() or not folder_path.is_dir(): logger.error(f"文件夹不存在: {folder_path}") return None # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(exist_ok=True) # 生成时间戳 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") project_name = folder_path.name # 创建项目输出目录 project_output_dir = output_path / f"{project_name}_{timestamp}" project_output_dir.mkdir(exist_ok=True) # 创建项目日志记录器 project_logger = ProjectLogger(project_name, project_output_dir) try: logger.info(f"处理项目: {project_name}") project_logger.info(f"处理项目: {project_name}") fields, primary_type, secondary_types, match_score, matched_project = self.generate_json_structure(project_name) logger.info(f"类型: {primary_type or '通用'} ({sum(len(f) for f in fields.values())}字段)") project_logger.info(f"类型: {primary_type or '通用'} ({sum(len(f) for f in fields.values())}字段)") # 创建动态提示词 prompt = self.create_dynamic_prompt(project_name, primary_type, secondary_types, fields) # 查找支持的文档文件(只在当前目录,不递归子目录) supported_extensions = ['.docx', '.doc', '.xlsx', '.xls', '.txt'] document_files = [] # 只在当前文件夹查找,不递归子文件夹 for ext in supported_extensions: document_files.extend(folder_path.glob(f"*{ext}")) if not document_files: project_logger.warning("未找到支持的文档文件") project_logger.log_project_end() project_logger.close() return None logger.info(f"找到 {len(document_files)} 个文档") project_logger.info(f"找到 {len(document_files)} 个文档") # 处理每个文档文件 results = [] for doc_file in document_files: # 只在项目日志中记录详细的文档处理信息 project_logger.info(f"处理文档: {doc_file.name}") content = self.read_document(doc_file) import tiktoken encoding = tiktoken.get_encoding("cl100k_base") # 编码为 token tokens = encoding.encode(content) logging.info(f"当前文档的token数:{len(tokens)}") if len(tokens) > 30000: logging.info(f"已截断为30000个token") # 截取前 30000 个 token tokens = tokens[:30000] # 解码回文本 content = encoding.decode(tokens) if content and self.llm_client: result = self.call_llm(prompt, content, project_logger) if result: results.append(result) # 只在项目日志中记录成功信息 project_logger.info(f"✓ 提取成功") else: project_logger.warning(f"提取失败: {doc_file.name}") else: project_logger.warning(f"无法读取文档或LLM客户端未初始化: {doc_file.name}") # 合并结果 final_result = self.merge_results(results) # 即使没有提取到信息,也要保存空的JSON结构 if not final_result: final_result = {} for category, category_fields in fields.items(): final_result[category] = {} if isinstance(category_fields, dict): for field_name in category_fields: if isinstance(category_fields[field_name], dict): final_result[category][field_name] = {} for sub_field in category_fields[field_name]: final_result[category][field_name][sub_field] = None else: final_result[category][field_name] = None # 重要修复:将additional_fields_data中的数据合并到最终结果 # 获取项目匹配信息 project_key = matched_project if matched_project else project_name if project_key in self.additional_fields_data: additional_data = self.additional_fields_data[project_key] # 确保"通用表单"类别存在 if "通用表单" not in final_result: final_result["通用表单"] = {} # 定义从Excel获取的关键字段 critical_fields = {"项目名称", "项目编号", "一级工程类型", "二级工程类型"} # 将additional_fields_data中的数据合并到"通用表单" for field_name, field_value in additional_data.items(): if field_name in critical_fields: # 关键字段:强制使用Excel中的值,覆盖LLM结果 if field_value is not None and field_value != "": final_result["通用表单"][field_name] = field_value project_logger.info(f"✓ 使用Excel关键字段: {field_name} = {field_value}") else: project_logger.warning(f"Excel中关键字段为空: {field_name}") else: if field_name not in final_result["通用表单"] or final_result["通用表单"][field_name] is None or final_result["通用表单"][field_name] == "": final_result["通用表单"][field_name] = field_value project_logger.info(f"✓ 从Excel提取字段: {field_name} = {field_value}") project_logger.info(f"✓ 成功合并") else: project_logger.warning(f"未找到项目键 '{project_key}' 在additional_fields_data中") # 应用合并处理 final_result = self.flatten_json_structure(final_result) project_logger.info("✓ 应用合并处理") # 保存结果 result_filename = f"{project_name}_{timestamp}.json" result_path = project_output_dir / result_filename with open(result_path, 'w', encoding='utf-8') as f: json.dump(final_result, f, ensure_ascii=False, indent=2) logger.info(f"✓ 保存结果: {result_path}") project_logger.info(f"✓ 保存结果: {result_path}") # 追加写入“输出记录表.xlsx” try: self.append_output_record(final_result, project_name, str(result_path), str(project_output_dir), project_logger) except Exception as e: logger.error(f"追加写入输出记录表失败: {e}") project_logger.error(f"追加写入输出记录表失败: {e}") # 复制原始文件到输出目录 for doc_file in document_files: shutil.copy2(doc_file, project_output_dir / doc_file.name) project_logger.info(f"✓ 复制文档: {doc_file.name}") # 记录处理统计信息 project_logger.info("处理统计:") project_logger.info(f" - 文档数量: {len(document_files)}") project_logger.info(f" - 成功提取: {len(results)}") project_logger.info(f" - 提取成功率: {len(results)/len(document_files)*100:.1f}%") # 记录项目结束 project_logger.log_project_end() return { "project_name": project_name, "output_dir": str(project_output_dir), "result_file": result_filename, "processed_files": len(document_files), "extraction_success": len(results) > 0, "primary_type": primary_type, "match_score": match_score } except Exception as e: logger.error(f"处理项目时发生错误: {e}") project_logger.error(f"处理项目时发生错误: {e}") project_logger.log_project_end() return None finally: # 确保关闭项目日志记录器 project_logger.close() def process_directory_tree(self, root_path, output_dir="output_dir"): root_path = Path(root_path) if not root_path.exists() or not root_path.is_dir(): logger.debug(f"跳过不存在的目录: {root_path}") return [] results = [] result = self.process_folder(root_path, output_dir) if result: results.append(result) try: for item in root_path.iterdir(): if item.is_dir(): sub_results = self.process_directory_tree(item, output_dir) results.extend(sub_results) except (FileNotFoundError, PermissionError): pass return results class FolderMonitorHandler(FileSystemEventHandler): def __init__(self, system, output_dir, input_dir): self.system = system self.output_dir = output_dir self.input_dir = Path(input_dir) self.processing = set() # 正在处理的文件/文件夹 def on_created(self, event): if event.is_directory: self.handle_new_folder(Path(event.src_path)) else: self.handle_new_file(Path(event.src_path)) def handle_new_folder(self, folder_path): if not folder_path.exists() or folder_path.name in self.processing: return logger.info(f"新文件夹: {folder_path.name}") time.sleep(2) if not folder_path.exists(): return self.processing.add(folder_path.name) try: results = self.system.process_directory_tree(folder_path, self.output_dir) if results: logger.info(f"完成: {len(results)} 个项目") else: logger.info("未找到可处理的文档文件") self.cleanup_processed_folder(folder_path) finally: self.processing.discard(folder_path.name) def cleanup_processed_folder(self, folder_path): max_retries = 3 for attempt in range(max_retries): try: if folder_path.exists(): if attempt > 0: time.sleep(2 * attempt) self._force_remove_readonly(folder_path) shutil.rmtree(folder_path) logger.info(f"清理: {folder_path.name}") return else: return except PermissionError: if attempt == max_retries - 1: logger.warning(f"权限不足,无法删除文件夹: {folder_path.name}") if self._try_system_delete(folder_path): logger.info(f"系统命令删除成功: {folder_path.name}") return else: try: backup_name = f"{folder_path.name}_待删除_{int(time.time())}" backup_path = folder_path.parent / backup_name folder_path.rename(backup_path) logger.info(f"已重命名为: {backup_name}") return except Exception: logger.warning(f"无法重命名文件夹,请手动删除: {folder_path.name}") return else: continue except Exception as e: if attempt == max_retries - 1: logger.error(f"清理文件夹失败 {folder_path.name}: {e}") else: continue def _force_remove_readonly(self, path): import stat try: for root, dirs, files in os.walk(path): for file in files: file_path = Path(root) / file try: file_path.chmod(stat.S_IWRITE | stat.S_IREAD) except Exception: pass for dir in dirs: dir_path = Path(root) / dir try: dir_path.chmod(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC) except Exception: pass path.chmod(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC) except Exception: pass def _try_system_delete(self, folder_path): import subprocess try: if os.name == 'nt': # Windows result = subprocess.run( ['rmdir', '/s', '/q', str(folder_path)], capture_output=True, text=True, timeout=30 ) return result.returncode == 0 else: # Unix/Linux result = subprocess.run( ['rm', '-rf', str(folder_path)], capture_output=True, text=True, timeout=30 ) return result.returncode == 0 except Exception: return False def handle_new_file(self, file_path): if file_path.name in self.processing: return archive_extensions = {'.zip', '.7z', '.rar'} if file_path.suffix.lower() in archive_extensions: logger.info(f"压缩包: {file_path.name}") time.sleep(1) self.processing.add(file_path.name) try: extract_dir = file_path.parent / file_path.stem extract_dir.mkdir(exist_ok=True) if self.system.extract_archive(file_path, extract_dir): file_path.unlink() time.sleep(1) results = self.system.process_directory_tree(extract_dir, self.output_dir) if results: logger.info(f"解压完成: {len(results)} 个项目") else: logger.info("解压后未找到可处理的文档文件") self.cleanup_processed_folder(extract_dir) finally: self.processing.discard(file_path.name) def monitor_mode(): input_dir = "input_dir" output_dir = "output_dir" # 创建输入目录 input_path = Path(input_dir) input_path.mkdir(exist_ok=True) # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(exist_ok=True) # 初始化系统 system = MinRAGSystem() # 创建监控处理器 handler = FolderMonitorHandler(system, output_dir, input_dir) # 创建文件系统监控器 observer = Observer() observer.schedule(handler, str(input_path), recursive=True) # 启动监控 observer.start() logger.info(f"监控目录: {input_path.absolute()}") logger.info(f"输出目录: {Path(output_dir).absolute()}") # 扫描现有文件夹 for item in input_path.iterdir(): if item.is_dir(): handler.handle_new_folder(item) logger.info("监控中... (按 Ctrl+C 停止)") try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() logger.info("✅ 监控已停止") def main(): print("合同信息提取系统") print("输入目录: input_dir") print("输出目录: output_dir") print("=" * 50) monitor_mode() if __name__ == "__main__": main()