import json import aiohttp import asyncio import re from typing import List, Dict, Tuple from datetime import datetime import os class RAGEvaluator: def __init__(self, rag_url: str = "http://localhost:6000/rag/chat", eval_url: str = "https://api.deepseek.com/chat/completions", eval_key: str = "sk-72f9d0e5bc894e1d828d73bdcc50ff0a"): self.rag_url = rag_url self.eval_url = eval_url self.eval_key = eval_key self.evaluation_logs = [] # RAG请求模板 self.rag_template = { # "appId": "2924812721300312064", "knowledgeIds": ["a2963496869283893248", "a2963501316240183296"], "knowledgeInfo": '{"param_desc":"strict","show_recall_result":true,"recall_method":"embedding","rerank_status":true,"rerank_model_name":"rerank","slice_config_type":"customized","rerank_index_type_list":[{"index_type_id":0,"knowledge_id":["a2963496869283893248","a2963501316240183296"]}],"recall_index_type_list":[{"index_type_id":0,"knowledge_id":["a2963496869283893248","a2963501316240183296"]}]}', "maxToken": 8192, "model": "Qwen3-30B", "temperature": "0.01", "topP": "0.5", "embeddingId": "multilingual-e5-large-instruct", "prompt": "你是一位知识检索助手,你必须并且只能从我发送的众多知识片段中寻找能够解决用户输入问题的最优答案,并且在执行任务的过程中严格执行规定的要求。\n\n知识片段如下:\n{{知识}}\n\n规定要求:\n- 找到答案就仅使用知识片段中的原文回答用户的提问;\n- 找不到答案就用自身知识并且告诉用户该信息不是来自文档;\n- 所引用的文本片段中所包含的示意图占位符必须进行返回,占位符格式参考:【示意图序号_编号】\n - 严禁输出任何知识片段中不存在的示意图占位符;\n - 输出的内容必须删除其中包含的任何图注、序号等信息。例如:\"进入登录页面(图1.1)\"需要从文字中删除图序,回复效果为:\"进入登录页面\";\"如图所示1.1\",回复效果为:\"如图所示\";\n- 格式规范\n - 文档中会出现包含表格的情况,表格是以图片标识符的形式呈现,表格中缺失数据时候返回空单元格;\n - 如果需要用到表格中的数据,以markdown格式输出表格中的数据;\n - 避免使用代码块语法回复信息;\n - 回复的开头语不要输出诸如:\"我想\",\"我认为\",\"think\"等相关语义的文本。\n\n严格执行规定要求,不要复述问题,直接开始回答。\n\n用户输入问题:\n{{用户}}" } self.eval_threshold = 0.9 # 是否准确的阈值 def load_test_data(self, file_path: str = "val_data.json") -> List[Dict]: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) async def query_rag(self, session: aiohttp.ClientSession, question: str) -> str: payload = self.rag_template.copy() payload["query"] = question async with session.post( self.rag_url, json=payload, timeout=aiohttp.ClientTimeout(total=30), headers={'Accept': 'text/event-stream'} ) as response: if response.status != 200: return "" answer = "" async for line in response.content: line_str = line.decode('utf-8').strip() if not line_str.startswith('data: '): continue data_str = line_str[6:] if data_str == '[DONE]': break try: event_data = json.loads(data_str) if isinstance(event_data, dict): if event_data.get('event') == 'add': answer = event_data.get('data', '') elif event_data.get('event') == 'finish': break elif 'content' in event_data: answer = event_data['content'] except json.JSONDecodeError: if data_str != '[DONE]': answer = data_str return answer async def evaluate_answer(self, session: aiohttp.ClientSession, actual: str, expected: str) -> float: prompt = f"""评估答案相似度,返回0-1分数。 标准答案:{expected} 实际答案:{actual} 评估标准:内容准确性、信息完整性、语义相似性 只返回数字分数,如:0.85""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.eval_key}" } payload = { "model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 50 } async with session.post(self.eval_url, json=payload, headers=headers) as response: result = await response.json() score_text = result['choices'][0]['message']['content'].strip() numbers = re.findall(r'\d+\.\d+|\d+', score_text) if numbers: score = float(numbers[0]) return min(max(score, 0.0), 1.0) return 0.0 async def evaluate_single(self, session: aiohttp.ClientSession, item: Dict, semaphore: asyncio.Semaphore) -> Tuple[bool, float]: async with semaphore: question = item['question'] expected = item['answer'] actual = await self.query_rag(session, question) score = await self.evaluate_answer(session, actual, expected) is_correct = score > self.eval_threshold log_entry = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "question": question, "expected_answer": expected, "actual_answer": actual, "score": round(score, 2), "is_correct": is_correct } self.evaluation_logs.append(log_entry) print(f"问题: {question}") print(f"期望: {expected}") print(f"实际: {actual}") print(f"得分: {score:.2f}") print("-" * 50) return is_correct, score async def run_evaluation(self, data_file: str = "val_data.json", max_concurrent: int = 3) -> float: data = self.load_test_data(data_file) print(f"开始评估 {len(data)} 个问题...") semaphore = asyncio.Semaphore(max_concurrent) connector = aiohttp.TCPConnector(limit=10, limit_per_host=5) async with aiohttp.ClientSession(connector=connector) as session: tasks = [self.evaluate_single(session, item, semaphore) for item in data] results = await asyncio.gather(*tasks, return_exceptions=True) correct_count = 0 total_score = 0.0 valid_count = 0 for result in results: if isinstance(result, Exception): print(f"评估异常: {result}") continue is_correct, score = result valid_count += 1 total_score += score if is_correct: correct_count += 1 if valid_count == 0: return 0.0 accuracy = correct_count / valid_count avg_score = total_score / valid_count print(f"\n=== 评估结果 ===") print(f"正确率: {correct_count}/{valid_count} = {accuracy:.2%}") print(f"平均分: {avg_score:.2f}") self.save_evaluation_results(accuracy, avg_score, correct_count, valid_count) return accuracy def save_evaluation_results(self, accuracy: float, avg_score: float, correct_count: int, valid_count: int): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"rag_evaluation_{timestamp}.txt" os.makedirs("logs", exist_ok=True) filepath = os.path.join("logs", filename) with open(filepath, 'w', encoding='utf-8') as f: f.write("=" * 60 + "\n") f.write("RAG系统评估结果报告\n") f.write("=" * 60 + "\n") f.write(f"评估时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"总问题数: {valid_count}\n") f.write(f"正确回答数: {correct_count}\n") f.write(f"正确率: {accuracy:.2%}\n") f.write(f"平均得分: {avg_score:.2f}\n") f.write(f"准确阈值: {self.eval_threshold}\n") f.write("=" * 60 + "\n\n") f.write("详细评估日志:\n") f.write("-" * 60 + "\n") for i, log in enumerate(self.evaluation_logs, 1): f.write(f"\n[{i}] 评估时间: {log['timestamp']}\n") f.write(f"问题: {log['question']}\n") f.write(f"期望答案: {log['expected_answer']}\n") f.write(f"实际答案: {log['actual_answer']}\n") f.write(f"评分: {log['score']}\n") f.write("-" * 60 + "\n") print(f"\n评估结果已保存到: {filepath}") async def main(): evaluator = RAGEvaluator() await evaluator.run_evaluation() if __name__ == "__main__": asyncio.run(main())