| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- """
- RAGAS评估脚本 - 集成RAG检索功能
- 输入:只需要 question 和 ground_truth 的 JSON 文件
- 功能:通过 RAG 系统自动获取 contexts 和 answer,然后进行评估
- 说明:
- - 使用全部 5 个 RAGAS 指标进行评估
- - 通过增加超时时间和重试机制来减少 NaN 值
- - NaN 值通常由超时或网络问题导致,而非指标本身问题
- """
- import os
- import sys
- import json
- import pandas as pd
- import asyncio
- from datasets import Dataset
- from typing import List, Dict, Any
- # 添加项目根目录到 Python 路径
- project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.insert(0, project_root)
- # RAGAS 评估相关
- from ragas import evaluate
- from ragas.metrics import (
- faithfulness,
- answer_correctness,
- answer_relevancy,
- context_precision,
- context_recall,
- )
- from langchain_openai import ChatOpenAI, OpenAIEmbeddings
- # RAG 相关导入
- from rag.chat_message import ChatRetrieverRag
- from rag.llm import VllmApi
- # 配置 VLLM 和 Embedding 服务
- VLLM_LLM_BASE = "http://xia0miduo.gicp.net:8102/v1"
- VLLM_LLM_KEY = "vllm-dummy-key"
- VLLM_EMBEDDING_BASE = "http://10.168.100.17:8787/v1/"
- # 初始化 RAGAS 评估使用的模型
- vllm_generator = ChatOpenAI(
- model="Qwen3-Coder-30B-loft",
- base_url=VLLM_LLM_BASE,
- api_key=VLLM_LLM_KEY,
- temperature=0.1, # 降低温度提高稳定性
- max_tokens=51200,
- timeout=3000, # 增加超时到 300 秒
- request_timeout=3000,
- max_retries=5, # 增加重试次数
- )
- vllm_embeddings = OpenAIEmbeddings(
- model="",
- base_url=VLLM_EMBEDDING_BASE,
- api_key=VLLM_LLM_KEY,
- )
- # 定义评估指标 - 使用全部 5 个指标
- metrics_to_run = [
- faithfulness, # 忠实度:答案是否基于上下文
- answer_correctness, # 答案正确性:答案与标准答案的相似度
- answer_relevancy, # 答案相关性:答案是否回答了问题
- context_precision, # 上下文精确度:检索上下文的精确度
- context_recall, # 上下文召回:上下文是否包含答案信息
- ]
- class RAGEvaluator:
- """集成 RAG 功能的评估器"""
-
- def __init__(self, knowledge_ids: List[str], embedding_id: str = "e5"):
- """
- 初始化评估器
-
- Args:
- knowledge_ids: 知识库 ID 列表
- embedding_id: 嵌入模型 ID (可选值: "e5", "multilingual-e5-large-instruct")
- """
- self.knowledge_ids = knowledge_ids
- self.embedding_id = embedding_id
-
- # RAG 配置参数
- self.rag_config = {
- "knowledgeIds": knowledge_ids,
- "embeddingId": embedding_id,
- "sliceCount": 5, # 检索的切片数量
- "knowledgeInfo": json.dumps({
- "recall_method": "mixed", # 检索模式:embedding/keyword/mixed
- "rerank_status": True, # 是否启用重排序
- "rerank_model_name": "bce_rerank_model"
- }),
- "temperature": 0.6,
- "topP": 0.7,
- "maxToken": 4096,
- "enable_think": False,
- "prompt":
- """
- 你是一位知识检索助手,你必须并且只能从我发送的众多知识片段中寻找能够解决用户输入问题的最优答案,并且在执行任务的过程中严格执行规定的要求。\n\n知识片段如下:\n{知识}\n\n规定要求:\n- 找到答案就仅使用知识片段中的原文回答用户的提问;\n- 找不到答案就用自身知识并且告诉用户该信息不是来自文档;\n- 所引用的文本片段中所包含的示意图占位符必须进行返回,占位符格式参考:【示意图序号_编号】\n - 严禁输出任何知识片段中不存在的示意图占位符;\n - 输出的内容必须删除其中包含的任何图注、序号等信息。例如:“进入登录页面(图1.1)”需要从文字中删除图序,回复效果为:“进入登录页面”;“如图所示1.1”,回复效果为:“如图所示”;\n- 格式规范\n - 文档中会出现包含表格的情况,表格是以图片标识符的形式呈现,表格中缺失数据时候返回空单元格;\n - 如果需要用到表格中的数据,以markdown格式输出表格中的数据;\n - 避免使用代码块语法回复信息;\n - 回复的开头语不要输出诸如:“我想”,“我认为”,“think”等相关语义的文本。\n\n严格执行规定要求,不要复述问题,直接开始回答。\n\n用户输入问题:\n{用户}
- """
- # "请根据以下知识内容回答用户的问题。\n\n知识内容:\n{知识}\n\n用户问题:\n{用户}\n\n请给出准确、详细的回答:",
- }
-
- async def get_rag_response(self, question: str) -> Dict[str, Any]:
- """
- 通过 RAG 系统获取问题的答案和上下文
-
- Args:
- question: 用户问题
-
- Returns:
- 包含 answer 和 contexts 的字典
- """
- # 构建 RAG 请求参数
- chat_json = self.rag_config.copy()
- chat_json["query"] = question
-
- # 创建 RAG 检索器
- rag_retriever = ChatRetrieverRag(chat_json=chat_json)
-
- # 获取检索结果
- retriever_result_list, search_doc_id_to_knowledge_id_dict = rag_retriever.retriever_result(chat_json)
-
- # 解析检索结果,获取上下文内容
- chunk_content, knowledge_info_dict = rag_retriever.parse_retriever_list(
- retriever_result_list,
- search_doc_id_to_knowledge_id_dict
- )
-
- # 提取 contexts(每个检索到的切片内容)
- contexts = [item["content"] for item in retriever_result_list]
-
- # 生成答案
- answer = ""
- async for event in rag_retriever.generate_rag_response(chat_json, chunk_content):
- if event.get("event") == "add":
- answer = event.get("data", "")
- elif event.get("event") == "finish":
- break
-
- return {
- "answer": answer,
- "contexts": contexts,
- "retriever_count": len(retriever_result_list)
- }
-
- async def process_qa_data(self, qa_data: List[Dict[str, str]]) -> List[Dict[str, Any]]:
- """
- 处理问答数据,通过 RAG 获取 contexts 和 answer
-
- Args:
- qa_data: 包含 question 和 ground_truth 的列表
-
- Returns:
- 完整的评估数据集(包含 question, contexts, answer, ground_truth)
- """
- full_data = []
-
- for idx, item in enumerate(qa_data):
- question = item.get("question", "")
- ground_truth = item.get("ground_truth", "")
-
- print(f"\n处理第 {idx + 1}/{len(qa_data)} 个问题: {question[:50]}...")
-
- try:
- # 通过 RAG 获取答案和上下文
- rag_result = await self.get_rag_response(question)
-
- full_item = {
- "question": question,
- "contexts": rag_result["contexts"],
- "answer": rag_result["answer"],
- "ground_truth": ground_truth,
- }
-
- full_data.append(full_item)
-
- print(f" ✓ 检索到 {rag_result['retriever_count']} 个上下文")
- print(f" ✓ 生成答案长度: {len(rag_result['answer'])} 字符")
-
- except Exception as e:
- print(f" ✗ 处理失败: {e}")
- # 失败时使用空值
- full_data.append({
- "question": question,
- "contexts": [],
- "answer": "",
- "ground_truth": ground_truth,
- })
-
- return full_data
- def load_qa_json(json_path: str) -> List[Dict[str, str]]:
- """
- 加载只包含 question 和 ground_truth 的 JSON 文件
-
- Args:
- json_path: JSON 文件路径
-
- Returns:
- 问答数据列表
- """
- try:
- with open(json_path, 'r', encoding='utf-8') as f:
- data_list = json.load(f)
-
- # 验证数据格式
- if not isinstance(data_list, list):
- raise ValueError("JSON 文件必须是一个数组")
-
- for item in data_list:
- if "question" not in item or "ground_truth" not in item:
- raise ValueError("每个数据项必须包含 'question' 和 'ground_truth' 字段")
-
- return data_list
-
- except Exception as e:
- print(f"加载 JSON 文件失败: {e}")
- raise
- def save_full_dataset(data: List[Dict[str, Any]], output_path: str):
- """
- 保存完整的数据集(包含 RAG 获取的 contexts 和 answer)
-
- Args:
- data: 完整数据集
- output_path: 输出文件路径
- """
- try:
- with open(output_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f"\n完整数据集已保存到: {output_path}")
- except Exception as e:
- print(f"保存数据集失败: {e}")
- def run_evaluation(dataset: Dataset) -> Dict[str, Any]:
- """
- 执行 RAGAS 评估
-
- Args:
- dataset: 评估数据集
-
- Returns:
- 评估结果
- """
- print("\n" + "="*60)
- print("开始 RAGAS 评估...")
- print("="*60)
-
- # 确定要使用的指标
- final_metrics = list(metrics_to_run)
- if 'ground_truth' not in dataset.column_names:
- print("警告: 缺少 'ground_truth',将跳过 'context_recall' 指标")
- final_metrics.remove(context_recall)
-
- # 执行评估
- print(f"使用指标: {[m.name for m in final_metrics]}")
- print(f"数据集大小: {len(dataset)} 条")
- print(f"超时设置: 300 秒")
- print(f"重试次数: 3 次")
- print()
-
- try:
- result = evaluate(
- dataset=dataset,
- metrics=final_metrics,
- llm=vllm_generator,
- embeddings=vllm_embeddings,
- raise_exceptions=False, # 不因单个样本失败而中断
- show_progress=True, # 显示进度
- )
- except Exception as e:
- print(f"\n评估过程中出现错误: {e}")
- print("建议:检查 VLLM 服务是否正常运行")
- raise
-
- # 转换为 DataFrame
- result_df = result.to_pandas()
-
- # 计算平均指标(跳过 NaN 值)并统计 NaN 情况
- summary = {}
- nan_stats = {}
-
- print("\n" + "="*60)
- print("NaN 值统计:")
- print("="*60)
-
- for col in result_df.columns:
- if col not in ['question', 'contexts', 'answer', 'ground_truth', 'user_input', 'retrieved_contexts', 'response', 'reference']:
- try:
- # # 统计 NaN 数量
- # nan_count = result_df[col].isna().sum()
- # total_count = len(result_df)
- # nan_stats[col] = {
- # 'nan_count': int(nan_count),
- # 'total_count': total_count,
- # 'nan_ratio': float(nan_count / total_count) if total_count > 0 else 0
- # }
-
- # # 打印统计信息
- # if nan_count > 0:
- # print(f"⚠️ {col}: {nan_count}/{total_count} 个 NaN ({nan_count/total_count*100:.1f}%)")
- # else:
- # print(f"✅ {col}: 无 NaN 值")
-
- # 使用 nanmean 跳过 NaN 值
- mean_val = result_df[col].mean(skipna=True)
- if pd.notna(mean_val):
- summary[col] = float(mean_val)
- else:
- summary[col] = None
- print(f" 警告: 指标 '{col}' 全部为 NaN,无法计算平均值")
- except Exception as e:
- print(f" 错误: 无法处理指标 '{col}': {e}")
- pass
-
- print("\n" + "="*60)
- print("评估完成!")
- print("="*60)
-
- return {
- "rows": result_df.to_dict(orient="records"),
- "summary": summary,
- "metrics": [m.name for m in final_metrics],
- "count": len(result_df),
- }
- async def main_async(input_json: str, knowledge_ids: List[str], save_full_data: bool = True):
- """
- 主函数(异步)
-
- Args:
- input_json: 输入 JSON 文件路径(只包含 question 和 ground_truth)
- knowledge_ids: 知识库 ID 列表
- save_full_data: 是否保存完整数据集
- """
- print("="*60)
- print("RAGAS 评估脚本 - 集成 RAG 检索")
- print("="*60)
-
- # 1. 加载问答数据
- print(f"\n步骤 1: 加载问答数据从 {input_json}")
- qa_data = load_qa_json(input_json)
- print(f" ✓ 成功加载 {len(qa_data)} 条问答数据")
-
- # 2. 通过 RAG 获取 contexts 和 answer
- print(f"\n步骤 2: 通过 RAG 系统获取上下文和答案")
- print(f" 使用知识库: {knowledge_ids}")
-
- evaluator = RAGEvaluator(knowledge_ids=knowledge_ids)
- full_data = await evaluator.process_qa_data(qa_data)
-
- # 3. 保存完整数据集(可选)
- if save_full_data:
- output_path = input_json.replace(".json", "_full.json")
- save_full_dataset(full_data, output_path)
-
- # 4. 转换为 Dataset 格式
- print(f"\n步骤 3: 准备评估数据集")
- df = pd.DataFrame(full_data)
- dataset = Dataset.from_pandas(df)
- print(f" ✓ 数据集准备完成,共 {len(dataset)} 条数据")
-
- # 5. 执行评估
- print(f"\n步骤 4: 执行 RAGAS 评估")
- result = run_evaluation(dataset)
-
- # 6. 输出结果
- print("\n" + "="*60)
- print("评估结果汇总")
- print("="*60)
- for metric, value in result["summary"].items():
- print(f" {metric}: {value:.4f}")
-
- # 保存评估结果
- result_path = input_json.replace(".json", "_result.json")
- with open(result_path, 'w', encoding='utf-8') as f:
- json.dump(result, f, ensure_ascii=False, indent=2)
- print(f"\n评估结果已保存到: {result_path}")
-
- return result
- def main():
- """主函数入口"""
- # 配置参数
- input_json_file = "dataset_qa.json" # 输入文件(只包含 question 和 ground_truth)
- knowledge_ids = ["a2963496869283893248"] # 需要配置实际的知识库 ID
-
- # 检查输入文件
- if not os.path.exists(input_json_file):
- print(f"错误: 找不到输入文件 {input_json_file}")
- print("\n请创建一个 JSON 文件,格式如下:")
- print("""
- [
- {
- "question": "谁发明了电话?",
- "ground_truth": "电话的主要发明者是亚历山大·格拉汉姆·贝尔,他于1876年获得了电话的发明专利。"
- },
- {
- "question": "地球上最大的哺乳动物是什么?",
- "ground_truth": "地球上最大的哺乳动物是蓝鲸,成年蓝鲸体长可达30米,重量可达180吨。"
- }
- ]
- """)
- return
-
- # 运行异步主函数
- asyncio.run(main_async(input_json_file, knowledge_ids, save_full_data=True))
- if __name__ == "__main__":
- main()
|