| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 简化的RAG评估脚本
- """
- import json
- import aiohttp
- import asyncio
- import time
- from typing import List, Dict
- class SimpleRAGEvaluator:
- def __init__(self):
- self.api_url = "http://localhost:6000/rag/chat"
- self.request_data = {
- "appId": "2924812721300312064",
- "desc": "高井信息员工手册,出差管理制度等",
- "isDeepThink": "N",
- "knowledgeIds": [
- "a2963496869283893248",
- "a2963501316240183296"
- ],
- "knowledgeInfo": '{"param_desc":"strict","show_recall_result":true,"recall_method":"embedding","rerank_status":true,"rerank_model_name":"rerank","slice_config_type":"customized","rerank_index_type_list":[{"index_type_id":0,"knowledge_id":["a2963496869283893248","a2963501316240183296"]}],"recall_index_type_list":[{"index_type_id":0,"knowledge_id":["a2963496869283893248","a2963501316240183296"]}]}',
- "maxToken": 8192,
- "model": "Qwen3-30B",
- "name": "高井信息公司管理制度",
- "params": {},
- "prompt": "你是一位知识检索助手,你必须并且只能从我发送的众多知识片段中寻找能够解决用户输入问题的最优答案,并且在执行任务的过程中严格执行规定的要求。\n\n知识片段如下:\n{{知识}}\n\n规定要求:\n- 找到答案就仅使用知识片段中的原文回答用户的提问;\n- 找不到答案就用自身知识并且告诉用户该信息不是来自文档;\n- 所引用的文本片段中所包含的示意图占位符必须进行返回,占位符格式参考:【示意图序号_编号】\n - 严禁输出任何知识片段中不存在的示意图占位符;\n - 输出的内容必须删除其中包含的任何图注、序号等信息。例如:\"进入登录页面(图1.1)\"需要从文字中删除图序,回复效果为:\"进入登录页面\";\"如图所示1.1\",回复效果为:\"如图所示\";\n- 格式规范\n - 文档中会出现包含表格的情况,表格是以图片标识符的形式呈现,表格中缺失数据时候返回空单元格;\n - 如果需要用到表格中的数据,以markdown格式输出表格中的数据;\n - 避免使用代码块语法回复信息;\n - 回复的开头语不要输出诸如:\"我想\",\"我认为\",\"think\"等相关语义的文本。\n\n严格执行规定要求,不要复述问题,直接开始回答。\n\n用户输入问题:\n{{用户}}",
- "status": "3",
- "temperature": "0.01",
- "topP": "0.5",
- "typeId": 40,
- "updateBy": "1",
- "visible": "0",
- "embeddingId": "multilingual-e5-large-instruct",
- "enable_think": False
- }
-
- def load_data(self, file_path: str = "val_data.json") -> List[Dict]:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
-
- async def call_api(self, session: aiohttp.ClientSession, question: str) -> str:
- data = self.request_data.copy()
- data["query"] = question
-
- try:
- async with session.post(
- self.api_url,
- json=data,
- timeout=aiohttp.ClientTimeout(total=30),
- headers={'Accept': 'text/event-stream'}
- ) as response:
-
- if response.status == 200:
- full_answer = ""
-
- # 处理流式响应
- async for line in response.content:
- line_str = line.decode('utf-8').strip()
- if not line_str or line_str.startswith(':'):
- continue
-
- if line_str.startswith('data: '):
- data_str = line_str[6:] # 移除 'data: ' 前缀
- if data_str == '[DONE]':
- break
-
- # 尝试解析JSON格式
- try:
- event_data = json.loads(data_str)
-
- # 确保event_data是字典类型才进行处理
- if isinstance(event_data, dict):
- if event_data.get('event') == 'add':
- # 直接替换为最新的完整内容(而不是累积)
- chunk_data = event_data.get('data', '')
- if isinstance(chunk_data, str):
- full_answer = chunk_data # 直接替换,因为每个chunk包含完整内容
-
- elif event_data.get('event') == 'finish':
- break
-
- # 只有当没有event字段时才检查content字段
- elif 'content' in event_data and 'event' not in event_data:
- content_data = event_data['content']
- if isinstance(content_data, str):
- full_answer = content_data # 直接替换
-
- except json.JSONDecodeError:
- # 如果不是JSON格式,直接作为文本内容处理
- if data_str and data_str != '[DONE]':
- full_answer = data_str # 直接替换
- continue
- except Exception:
- continue
-
- return full_answer
- return ''
- except Exception as e:
- print(f"API调用失败: {e}")
- return ''
-
- async def call_openai_compatible_api(self, session: aiohttp.ClientSession, prompt: str) -> str:
- """调用 OpenAI 兼容 API 进行评估"""
- # OpenAI 兼容的 API 配置
- api_url = "https://api.deepseek.com/chat/completions"
- api_key = "sk-72f9d0e5bc894e1d828d73bdcc50ff0a"
- model_name = "deepseek-chat"
-
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {api_key}"
- }
-
- data = {
- "model": model_name,
- "messages": [{"role": "user", "content": prompt}],
- "temperature": 0.1,
- "max_tokens": 100,
- "stream": False
- }
-
- async with session.post(api_url, json=data, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as response:
- result = await response.json()
- return result['choices'][0]['message']['content'].strip()
-
- async def check_accuracy(self, session: aiohttp.ClientSession, actual: str, expected: str) -> float:
- prompt = f"""请评估以下两个答案的相似度和准确性,给出0到1之间的分数(保留2位小数)。
- 标准答案:{expected}
- 实际答案:{actual}
- 评估标准:
- 1. 内容准确性:实际答案是否正确回答了问题
- 2. 信息完整性:实际答案是否包含了标准答案的关键信息
- 3. 语义相似性:两个答案在语义上的相似程度
- 请只返回一个0到1之间的数字分数,不要包含其他文字说明。例如:0.85"""
-
- score_str = await self.call_openai_compatible_api(session, prompt)
-
- import re
- numbers = re.findall(r'\d+\.\d+|\d+', score_str)
- score = float(numbers[0])
- return min(max(score, 0.0), 1.0)
-
- async def evaluate_single_item(self, session: aiohttp.ClientSession, item: Dict, semaphore: asyncio.Semaphore) -> tuple:
- """评估单个问题项"""
- async with semaphore: # 限制并发数量
- question = item['question']
- expected = item['answer']
-
- # 并发调用API和评估
- actual = await self.call_api(session, question)
- accuracy_score = await self.check_accuracy(session, actual, expected)
-
- is_correct = accuracy_score > 0.5 # 阈值设为0.5
-
- print(f"问题: {question}")
- print(f"期望: {expected}")
- print(f"实际: {actual}")
- print(f"准确率: {accuracy_score:.2%}")
- print("-" * 50)
-
- return is_correct, accuracy_score
-
- async def evaluate(self, data_file: str = "val_data.json", max_concurrent: int = 5):
- """异步并发评估"""
- data = self.load_data(data_file)
- total = len(data)
-
- # 创建信号量来限制并发数量
- semaphore = asyncio.Semaphore(max_concurrent)
-
- # 创建aiohttp会话
- connector = aiohttp.TCPConnector(limit=20, limit_per_host=10)
- async with aiohttp.ClientSession(connector=connector) as session:
- # 创建所有任务
- tasks = [self.evaluate_single_item(session, item, semaphore) for item in data]
-
- # 并发执行所有任务
- results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # 统计结果
- correct = 0
- valid_results = 0
-
- for result in results:
- if isinstance(result, Exception):
- print(f"评估出错: {result}")
- continue
-
- is_correct, accuracy_score = result
- valid_results += 1
- if is_correct:
- correct += 1
-
- accuracy = correct / valid_results if valid_results > 0 else 0
- print(f"\n总体准确率: {correct}/{valid_results} = {accuracy:.2%}")
- return accuracy
-
- async def main():
- evaluator = SimpleRAGEvaluator()
- await evaluator.evaluate()
- if __name__ == "__main__":
- asyncio.run(main())
|