import os import json import math import uuid import httpx import pandas as pd import requests from typing import List, Dict, Any from datasets import Dataset from ragas import evaluate, aevaluate from ragas.metrics import ( # faithfulness, # answer_correctness, # answer_relevancy, # context_precision, # context_recall, Faithfulness, AnswerCorrectness, AnswerRelevancy, ContextPrecision, ContextRecall, NoiseSensitivity ) from ragas.llms import LangchainLLMWrapper from ragas.embeddings import LangchainEmbeddingsWrapper from langchain_openai import ChatOpenAI, OpenAIEmbeddings from rag.chat_message import ChatRetrieverRag from utils.get_logger import setup_logger from utils.upload_file_to_oss import UploadMinio from rag.db import MysqlOperate logger = setup_logger(__name__) # 临时文件目录 TMP_EVAL_DIR = "./tmp_file/evaluation/" os.makedirs(TMP_EVAL_DIR, exist_ok=True) english_map_chinese = { "faithfulness": "忠实性", "answer_correctness": "答案正确性", "answer_relevancy": "答案相关性", "context_precision": "上下文精度", "context_recall": "上下文召回率", "noise_sensitivity(mode=relevant)": "噪声敏感性", "user_input": "用户问题", "reference": "标准答案", "response": "模型回答", "retrieved_contexts": "上下文" } def save_evaluation_to_excel(eval_result: Dict[str, Any], output_path: str = None, tenant_id: str = "000000") -> Dict[str, Any]: """ 将评估结果保存为Excel文件,上传至MinIO,并记录到数据库 Args: eval_result: 评估结果字典,包含metrics和details output_path: 输出文件路径,如果为None则自动生成 tenant_id: 租户ID,默认为"000000" Returns: 包含文件路径和oss_id的字典 """ if output_path is None: timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S") output_path = os.path.join(TMP_EVAL_DIR, f"evaluation_result_{timestamp}.xlsx") metrics = eval_result.get("metrics", {}) details = eval_result.get("details", []) # logger.info(details) # 创建DataFrame df = pd.DataFrame(details) df.rename(columns=english_map_chinese, inplace=True) # 创建Excel writer with pd.ExcelWriter(output_path, engine='openpyxl') as writer: # 写入详细数据 df.to_excel(writer, sheet_name='评估详情', index=False) # 获取worksheet以添加汇总数据 worksheet = writer.sheets['评估详情'] # 计算汇总数据的起始列(在数据右侧,间隔3列) summary_start_col = len(df.columns) + 4 # 写入汇总标题 # worksheet.cell(row=1, column=summary_start_col, value="指标汇总") worksheet.cell(row=2, column=summary_start_col, value="指标汇总") worksheet.cell(row=2, column=summary_start_col + 1, value="平均值") # 写入各指标的平均值 row_idx = 3 total_sum = 0 valid_count = 0 for metric_name, metric_value in metrics.items(): worksheet.cell(row=row_idx, column=summary_start_col, value=english_map_chinese.get(metric_name, metric_name)) if metric_value is not None: worksheet.cell(row=row_idx, column=summary_start_col + 1, value=metric_value) total_sum += metric_value valid_count += 1 else: worksheet.cell(row=row_idx, column=summary_start_col + 1, value="N/A") valid_count += 1 row_idx += 1 # 添加总平均值 if valid_count > 0: total_avg = round(total_sum / valid_count, 2) worksheet.cell(row=row_idx + 1, column=summary_start_col, value="总平均值") worksheet.cell(row=row_idx + 1, column=summary_start_col + 1, value=total_avg) logger.info(f"评估结果已保存到: {output_path}") # 上传文件到MinIO并记录数据库 oss_id = None try: from config import minio_config # 获取文件名和扩展名 file_name = os.path.basename(output_path) file_extension = os.path.splitext(file_name)[1] file_size = os.path.getsize(output_path) # 生成MinIO中的文件路径 minio_file_path = f"evaluation/{file_name}" # 上传到MinIO upload_client = UploadMinio() upload_success = upload_client.upload_file(output_path, minio_file_path) if upload_success: logger.info(f"文件已上传至MinIO: {minio_file_path}") # 构建完整的MinIO访问URL minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") full_url = f"{minio_url}/{minio_bucket}/{minio_file_path}" # 记录到数据库 mysql_client = MysqlOperate() success, oss_id = mysql_client.insert_oss_record( file_name=file_name, url=full_url, tenant_id=tenant_id, file_extension=file_extension, file_size=file_size ) if success: logger.info(f"OSS记录已插入数据库,oss_id: {oss_id}, URL: {full_url}") else: logger.error(f"插入OSS记录失败: {oss_id}") else: logger.error("文件上传至MinIO失败") except Exception as e: logger.error(f"上传文件或记录数据库时出错: {e}") return { "file_path": output_path, "oss_id": oss_id, "full_url": full_url } async def download_file(url: str) -> str: """从网络下载文件到临时目录,返回本地路径""" # 从 URL 中提取文件扩展名 url_path = url.split('?')[0] # 移除查询参数 file_ext = os.path.splitext(url_path)[1].lower() # 如果没有扩展名或不是支持的格式,默认为 .json if file_ext not in ['.json', '.xlsx', '.xls']: file_ext = '.json' file_name = f"{uuid.uuid4()}{file_ext}" local_path = os.path.join(TMP_EVAL_DIR, file_name) async with httpx.AsyncClient(timeout=60) as client: response = await client.get(url) response.raise_for_status() with open(local_path, "wb") as f: f.write(response.content) logger.info(f"文件下载成功: {url} -> {local_path}") return local_path def _clean_float_values(obj): """清理非法浮点数值(NaN, Infinity)为 None""" if isinstance(obj, float): if math.isnan(obj) or math.isinf(obj): return None return obj elif isinstance(obj, dict): return {k: _clean_float_values(v) for k, v in obj.items()} elif isinstance(obj, list): return [_clean_float_values(item) for item in obj] return obj def parse_excel_to_qa_data(file_path: str) -> List[Dict[str, str]]: """ 解析 Excel 文件为问答数据列表 Excel 格式要求: - 第一行为表头:question 和 ground_truth - 后续每行为一条问答数据 Args: file_path: Excel 文件路径 Returns: 问答数据列表,格式为 [{"question": "...", "ground_truth": "..."}, ...] """ try: # 读取 Excel 文件 df = pd.read_excel(file_path) # 检查必需的列 # required_columns = ['question', 'ground_truth'] required_columns = ['问题', '答案'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: raise ValueError(f"Excel 文件缺少必需的列: {missing_columns}") # 转换为字典列表,并清理空值 qa_data = [] for _, row in df.iterrows(): # question = str(row['question']).strip() if pd.notna(row['question']) else "" # ground_truth = str(row['ground_truth']).strip() if pd.notna(row['ground_truth']) else "" question = str(row['问题']).strip() if pd.notna(row['问题']) else "" ground_truth = str(row['答案']).strip() if pd.notna(row['答案']) else "" # 跳过空问题 if not question: continue qa_data.append({ "question": question, "ground_truth": ground_truth }) logger.info(f"从 Excel 文件解析出 {len(qa_data)} 条有效数据") return qa_data except Exception as e: logger.error(f"解析 Excel 文件失败: {e}") raise ValueError(f"Excel 文件解析失败: {str(e)}") class RAGEvaluationService: """RAG评估服务""" def __init__( self, knowledge_ids: List[str], embedding_id: str = "e5", temperature: float = 0.6, top_p: float = 0.7, max_tokens: int = 4096, model: str = "Qwen3-Coder-30B-loft", slice_count: int = 5, prompt: str | None = None, rerank_model_name: str = "Qwen3-Reranker-0.6B" ): self.knowledge_ids = knowledge_ids self.embedding_id = embedding_id self.temperature = temperature self.top_p = top_p self.max_tokens = max_tokens if model == "Qwen3-30B": model = "/opt/vllm/models/Qwen/Qwen3-30B-A3B-Instruct-2507" self.model = model self.slice_count = slice_count self.rag_config = { "knowledgeIds": knowledge_ids, "embeddingId": embedding_id, "sliceCount": slice_count, "knowledgeInfo": json.dumps({ "recall_method": "mixed", "rerank_status": True, "rerank_model_name": rerank_model_name }), "temperature": temperature, "topP": top_p, "maxToken": max_tokens, "enable_think": False, "prompt": prompt if prompt else """你是一位知识检索助手,你必须并且只能从我发送的众多知识片段中寻找能够解决用户输入问题的最优答案,并且在执行任务的过程中严格执行规定的要求。\n\n知识片段如下:\n{知识}\n\n规定要求:\n- 找到答案就仅使用知识片段中的原文回答用户的提问;\n- 找不到答案就用自身知识并且告诉用户该信息不是来自文档;\n- 所引用的文本片段中所包含的示意图占位符必须进行返回,占位符格式参考:【示意图序号_编号】\n - 严禁输出任何知识片段中不存在的示意图占位符;\n - 输出的内容必须删除其中包含的任何图注、序号等信息。例如:"进入登录页面(图1.1)"需要从文字中删除图序,回复效果为:"进入登录页面";"如图所示1.1",回复效果为:"如图所示";\n- 格式规范\n - 文档中会出现包含表格的情况,表格是以图片标识符的形式呈现,表格中缺失数据时候返回空单元格;\n - 如果需要用到表格中的数据,以markdown格式输出表格中的数据;\n - 避免使用代码块语法回复信息;\n - 回复的开头语不要输出诸如:"我想","我认为","think"等相关语义的文本。\n\n严格执行规定要求,不要复述问题,直接开始回答。\n\n用户输入问题:\n{用户}""" } async def get_rag_response(self, question: str) -> Dict[str, Any]: """通过 RAG 系统获取答案和上下文""" chat_json = self.rag_config.copy() chat_json["query"] = question rag_retriever = ChatRetrieverRag(chat_json=chat_json) retriever_result_list, search_doc_id_to_knowledge_id_dict = await rag_retriever.retriever_result(chat_json) logger.info(f"检索到的上下文:{retriever_result_list}") chunk_content, knowledge_info_dict = rag_retriever.parse_retriever_list( retriever_result_list, search_doc_id_to_knowledge_id_dict ) contexts = [item["content"] for item in retriever_result_list] answer = "" try: async for event in rag_retriever.generate_rag_response(chat_json, chunk_content): if event.get("event") == "add": answer = event.get("data", "") elif event.get("event") == "finish": pass except GeneratorExit: pass return { "answer": answer, "contexts": contexts, "retriever_count": len(retriever_result_list) } async def process_qa_data(self, qa_data: List[Dict[str, str]]) -> List[Dict[str, Any]]: """处理问答数据,通过 RAG 获取 contexts 和 answer""" full_data = [] for idx, item in enumerate(qa_data): # 处理键名可能带空格的情况 clean_item = {k.strip(): v for k, v in item.items()} question = clean_item.get("question", "").strip() ground_truth = clean_item.get("ground_truth", "").strip() logger.info(f"处理第 {idx + 1}/{len(qa_data)} 个问题: {question[:50]}...") try: rag_result = await self.get_rag_response(question) full_data.append({ "question": question, "contexts": rag_result["contexts"], "answer": rag_result["answer"], "ground_truth": ground_truth, }) logger.info(f"检索到 {rag_result['retriever_count']} 个上下文") except Exception as e: logger.error(f"处理失败: {e}") full_data.append({ "question": question, "contexts": [], "answer": "", "ground_truth": ground_truth, }) return full_data async def run_evaluation(self, dataset: Dataset) -> Dict[str, Any]: """执行 RAGAS 评估""" from config import model_name_vllm_url_dict from rag.load_model import bge_m3_base_url, bge_me_model, qwen_ed_base_url, qwen_ed_model embedding_mapping = { "bge-m3": (bge_m3_base_url, bge_me_model), "Qwen3-Embedding-0.6B": (qwen_ed_base_url, qwen_ed_model) } # 配置 VLLM 和 Embedding 服务 VLLM_LLM_BASE = model_name_vllm_url_dict.get(self.model) VLLM_LLM_KEY = "vllm-dummy-key" VLLM_EMBEDDING_BASE, VLLM_EMBEDDING_MODEL = embedding_mapping.get(self.embedding_id) # 初始化 RAGAS 评估使用的模型 vllm_generator = ChatOpenAI( model=self.model, base_url=VLLM_LLM_BASE, api_key=VLLM_LLM_KEY, temperature=0.1, max_tokens=51200, timeout=3000, request_timeout=3000, max_retries=5, ) vllm_embeddings = OpenAIEmbeddings( model=VLLM_EMBEDDING_MODEL, base_url=VLLM_EMBEDDING_BASE, api_key=VLLM_LLM_KEY, ) # 包装为 RAGAS 兼容的格式 ragas_llm = LangchainLLMWrapper(vllm_generator) ragas_embeddings = LangchainEmbeddingsWrapper(vllm_embeddings) # 评估指标 # metrics_to_run = [ # faithfulness, # answer_correctness, # answer_relevancy, # context_precision, # context_recall, # NoiseSensitivity(llm=ragas_llm) # ] metrics_to_run = [ Faithfulness(llm=ragas_llm), AnswerCorrectness(llm=ragas_llm, embeddings=ragas_embeddings), AnswerRelevancy(llm=ragas_llm, embeddings=ragas_embeddings), ContextPrecision(llm=ragas_llm), ContextRecall(llm=ragas_llm), NoiseSensitivity(llm=ragas_llm), ] logger.info("开始 RAGAS 评估...") final_metrics = list(metrics_to_run) if 'ground_truth' not in dataset.column_names: logger.warning("缺少 'ground_truth',将跳过 'context_recall' 指标") final_metrics.remove(context_recall) logger.info(f"使用指标: {[m.name for m in final_metrics]}, 数据集大小: {len(dataset)} 条") try: # result = evaluate( # dataset=dataset, # metrics=final_metrics, # llm=ragas_llm, # embeddings=ragas_embeddings, # raise_exceptions=False, # show_progress=True, # ) result = await aevaluate( dataset=dataset, metrics=final_metrics, llm=ragas_llm, embeddings=ragas_embeddings, raise_exceptions=False, show_progress=True, ) except Exception as e: logger.error(f"评估过程中出现错误: {e}") raise result_df = result.to_pandas() result_df = result_df.replace([float('inf'), float('-inf')], None) result_df = result_df.where(pd.notna(result_df), None) metric_names = [m.name for m in final_metrics] summary = {} # 遍历所有数值列,计算平均值 for col in result_df.columns: # 跳过非数值列 if result_df[col].dtype not in ['float64', 'float32', 'int64', 'int32']: # result_df[col] = 0 continue try: if col == "noise_sensitivity(mode=relevant)" and result_df[col] is not None: result_df[col] = 1.00 - result_df[col] # mean_val = result_df[col].mean(skipna=False) mean_val = result_df[col].fillna(0).mean(skipna=True) if pd.notna(mean_val) and math.isfinite(mean_val): summary[col] = round(float(mean_val),2) else: summary[col] = None except Exception: summary[col] = None logger.info("评估完成!") eval_result = { "metrics": _clean_float_values(summary), "details": _clean_float_values(result_df.to_dict(orient="records")), "metric_names": metric_names, "count": len(result_df), } # 保存评估结果到Excel并上传至MinIO try: save_result = save_evaluation_to_excel(eval_result) # eval_result["excel_path"] = save_result.get("file_path") eval_result["oss_id"] = save_result.get("oss_id", "") eval_result["full_url"] = save_result.get("full_url", "") except Exception as e: logger.error(f"保存Excel文件或上传失败: {e}") return eval_result # def send_post_request_sync(self, url, json_data=None, headers=None, timeout=30): # """ # 同步发送POST请求 # 参数: # url: 请求的URL地址 # json_data: JSON格式的请求体数据(字典类型) # headers: 可选的请求头字典 # timeout: 请求超时时间(秒),默认30秒 # 返回: # dict: 包含状态码、响应数据等信息 # """ # try: # # 设置默认的Content-Type为application/json # if headers is None: # headers = {} # if 'Content-Type' not in headers: # headers['Content-Type'] = 'application/json' # resp = requests.post(url, json=json_data, headers=headers, timeout=timeout, verify=False) # status_code = resp.status_code # response_data = resp.text # logger.info(f"同步POST请求成功 [url={url}]: {response_data} json_data={json_data}") # return { # "code": 200, # "status_code": status_code, # "data": response_data, # "message": "POST请求成功" # } # except requests.exceptions.Timeout as e: # logger.error(f"同步POST请求超时 [url={url}]: {e}") # return { # "code": 500, # "message": f"POST请求超时: {str(e)}" # } # except requests.exceptions.RequestException as e: # logger.error(f"同步POST请求失败 [url={url}]: {e}") # return { # "code": 500, # "message": f"POST请求失败: {str(e)}" # } def send_put_request_sync(url, json_data=None, headers=None, timeout=30): """ 同步发送PUT请求 参数: url: 请求的URL地址 json_data: JSON格式的请求体数据(字典类型) headers: 可选的请求头字典 timeout: 请求超时时间(秒),默认30秒 返回: dict: 包含状态码、响应数据等信息 """ try: # 设置默认的Content-Type为application/json if headers is None: headers = {} if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' resp = requests.put(url, json=json_data, headers=headers, timeout=timeout, verify=False) status_code = resp.status_code response_data = resp.text logger.info(f"同步PUT请求成功 [url={url}]: {response_data} json_data={json_data}") return { "code": 200, "status_code": status_code, "data": response_data, "message": "PUT请求成功" } except requests.exceptions.Timeout as e: logger.error(f"同步PUT请求超时 [url={url}]: {e}") return { "code": 500, "message": f"PUT请求超时: {str(e)}" } except requests.exceptions.RequestException as e: logger.error(f"同步PUT请求失败 [url={url}]: {e}") return { "code": 500, "message": f"PUT请求失败: {str(e)}" } async def run_rag_evaluation( file_url: str, knowledge_ids: List[str], embedding_id: str = "e5", temperature: float = 0.6, top_p: float = 0.7, max_tokens: int = 4096, model: str = "Qwen3-Coder-30B-loft", slice_count: int = 5, task_id = None, tenant_id = None, prompt = None, rerank_model_name = None, ) -> Dict[str, Any]: """ 执行RAG评估的主入口函数 Args: file_url: 网络文件路径(支持 JSON 或 Excel 格式) knowledge_ids: 知识库ID列表 embedding_id: 嵌入模型ID temperature: 温度参数 top_p: top_p参数 max_tokens: 最大token数 model: 模型名称 slice_count: 检索切片数量 Returns: 包含knowledge_ids和评估指标的字典 """ local_path = None try: # 1. 下载文件 local_path = await download_file(file_url) # 2. 根据文件扩展名解析数据 file_ext = os.path.splitext(local_path)[1].lower() if file_ext in ['.xlsx', '.xls']: # 解析 Excel 文件 logger.info(f"检测到 Excel 文件格式: {file_ext}") qa_data = parse_excel_to_qa_data(local_path) elif file_ext == '.json': # 解析 JSON 文件 logger.info("检测到 JSON 文件格式") with open(local_path, "r", encoding="utf-8") as f: qa_data = json.load(f) else: raise ValueError(f"不支持的文件格式: {file_ext},仅支持 .json, .xlsx, .xls") if not isinstance(qa_data, list) or not qa_data: raise ValueError("数据必须是非空数组格式") # 3. 初始化评估服务 service = RAGEvaluationService( knowledge_ids=knowledge_ids, embedding_id=embedding_id, temperature=temperature, top_p=top_p, max_tokens=max_tokens, model=model, slice_count=slice_count, prompt=prompt, rerank_model_name=rerank_model_name ) # 4. 处理问答数据 logger.info(f"开始处理 {len(qa_data)} 条数据...") full_data = await service.process_qa_data(qa_data) # 5. 创建 Dataset dataset = Dataset.from_dict({ "question": [item["question"] for item in full_data], "contexts": [item["contexts"] for item in full_data], "answer": [item["answer"] for item in full_data], "ground_truth": [item["ground_truth"] for item in full_data], }) # 6. 执行评估 eval_result = await service.run_evaluation(dataset) logger.info(f"knowledge_ids: {knowledge_ids}:\n评估结果: {eval_result}") metrics = eval_result.get("metrics", {}) metrics = {k: (v if v is not None else 0) for k, v in metrics.items()} # 得分(取所有指标的均值) avg_score = round(sum(metrics.values()) / len(metrics), 2) if metrics else 0 send_put_request_sync(url="http://10.1.14.17:9080/deepseek/evaluation/editEvaluationTaskByPython", json_data={"taskId":task_id, "tenantId":tenant_id, "status":"2", "resultStore":avg_score, "answerLoyalty":eval_result["metrics"].get("faithfulness",0), "correctAnswer":eval_result["metrics"].get("answer_correctness",0), "answerRelevance":eval_result["metrics"].get("answer_relevancy",0), "noiseSensitivity":eval_result["metrics"].get("noise_sensitivity(mode=relevant)",0), "contextRecall":eval_result["metrics"].get("context_recall",0), "contextRelevance":eval_result["metrics"].get("context_precision",0), "ossId": eval_result["oss_id"], "fullUrl": eval_result["full_url"]}) return { "code": 200, "message": "评估完成", "data": { "knowledge_ids": knowledge_ids, **eval_result } } except httpx.HTTPError as e: logger.error(f"文件下载失败: {e}") send_put_request_sync(url="http://10.1.14.17:9080/deepseek/evaluation/editEvaluationTaskByPython",json_data={"taskId":task_id,"tenantId":tenant_id,"status":"3"}) return {"code": 400, "message": f"文件下载失败: {str(e)}"} except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {e}") send_put_request_sync(url="http://10.1.14.17:9080/deepseek/evaluation/editEvaluationTaskByPython",json_data={"taskId":task_id,"tenantId":tenant_id,"status":"3"}) return {"code": 400, "message": f"JSON解析失败: {str(e)}"} except Exception as e: logger.error(f"评估失败: {e}") send_put_request_sync(url="http://10.1.14.17:9080/deepseek/evaluation/editEvaluationTaskByPython",json_data={"taskId":task_id,"tenantId":tenant_id,"status":"3"}) return {"code": 500, "message": f"评估失败: {str(e)}"} # finally: # # 清理临时文件 # if local_path and os.path.exists(local_path): # try: # os.remove(local_path) # except Exception: # pass