database_delete_update_response_mapping = { "delete_query_error": ("根据id查询出错", 500), "delete_no_result": ("向量库中未查询到对应的数据", 200), "delete_error": ("根据id删除数据失败", 500), "delete_success": ("删除成功", 200), "update_query_error": ("更新数据出错", 500), "update_query_no_result": ("未查询到需要更新的数据", 200), "update_error": ("更新数据时出错", 500), "update_success": ("更新成功", 200), "delete_collection_error": ("删除数据库失败", 500), "delete_collection_success": ("删除数据库成功", 200), "create_collection_error": ("创建数据库失败", 500), "create_collection_success": ("创建数据库成功", 200), "insert_success": ("插入切片成功", 200), "insert_error": ("插入切片失败", 500), } def generate_message(result): message, code = database_delete_update_response_mapping.get(result) response_dict = { "code": code, "message": message } return response_dict def generate_response(result, page_num=0, page_size=10): if page_num: if not result: resp_dict = {"code": 200, "rows": [], "total": 0} elif "code" in result[0]: resp_dict = {} resp_dict.update(result[0]) resp_dict["rows"] = [] resp_dict["message"] = "查询向量库出错" resp_dict["total"] = 0 # return resp_dict else: rows = [] total_len = len(result) skip = int(page_num-1) * int(page_size) iter_result = result[skip:skip+page_size] for i in iter_result: d = {} d["slice_id"] = i.get("chunk_id") d["document_id"] = i.get("doc_id") d["slice_text"] = i.get("content") d["slice_char_len"] = i.get("metadata").get("chunk_len") rows.append(d) resp_dict = { "code": 200, "rows": rows, "total": total_len } return resp_dict else: if not result: resp_dict = {"code": 200, "data": {}} elif "code" in result[0]: resp_dict = {} resp_dict.update(result[0]) resp_dict["data"] = {} resp_dict["message"] = "查询向量库出错" # return resp_dict else: d = {} for i in result: d["slice_id"] = i.get("chunk_id") d["document_id"] = i.get("doc_id") d["slice_text"] = i.get("content") # rows.append(d) resp_dict = { "code": 200, "data": d, # "total": total_len } return resp_dict