Browse Source

停止聊天修改

S0025136190 7 months ago
parent
commit
3980a27f74

+ 47 - 0
takai-ai/src/main/java/com/takai/ai/service/impl/TakaiAiServiceImpl.java

@@ -27,6 +27,7 @@ import okhttp3.sse.EventSources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.web.multipart.MultipartFile;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -38,7 +39,10 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -90,6 +94,9 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
     @Autowired
     private RedisCache redisCache;
 
+    @Autowired
+    public RedisTemplate redisTemplate;
+
     public static final String START_SIGN = "【";
     public static final String END_SIGN = "】";
     public static final String SYMBOL = "【示意图序号";
@@ -144,6 +151,33 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
             TakaiKnowledge knowledge = takaiKnowledgeMapper.selectTargetKnowledge(TakaiKnowledge.builder().knowledgeId(appInfo.getKnowledgeIds()).build());
             if (knowledge != null) {
                 SseEmitter sseEmitter = new SseEmitter(0L);
+                // 生成唯一会话ID(可从请求参数获取或自动生成)
+                String sessionId = UUID.randomUUID().toString();
+
+                redisTemplate.opsForValue().set(
+                        "sse:terminate:" + sessionId,
+                        "false",
+                        1, TimeUnit.HOURS
+                );
+
+                // 定时检查终止信号(每2秒检查一次)
+                ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+                scheduler.scheduleAtFixedRate(() -> {
+                    try {
+                        String flag = redisTemplate.opsForValue()
+                                .get("sse:terminate:" + sessionId).toString();
+
+                        if ("true".equals(flag)) {
+                            // 主动终止流程
+                            sseEmitter.complete();
+                            scheduler.shutdown();
+                            redisTemplate.delete("sse:terminate:" + sessionId);
+                        }
+                    } catch (Exception e) {
+                        log.error("终止检查失败", e);
+                    }
+                }, 0, 2, TimeUnit.SECONDS);
+
                 String url = deepseekConfig.getBaseurl() + deepseekConfig.getChat();
                 TakaiAppInfo info = takaiAppInfoMapper.selectAppInfoByAppId(sseParams.getAppId());
                 JSONObject json = JSONObject.parseObject(info.getAppInfo());
@@ -174,6 +208,11 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
 
                     @Override
                     public void onEvent(@NotNull EventSource eventSource, String id, String type, @NotNull String data) {
+                        // 检查Redis终止标记(双重保障)
+                        if ("true".equals(redisTemplate.opsForValue().get("sse:terminate:" + sessionId))) {
+                            eventSource.cancel();
+                            return;
+                        }
                         if (!StringUtils.isEmpty(data)) {
                             String newData = data.substring(preData.length());
                             if (com.takai.common.utils.StringUtils.isNotEmpty(type) && "finish".equals(type)) {
@@ -232,9 +271,17 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
                             sseEmitter.send(obj);
                         } catch (IOException e) {
                             log.error("deepseek 推送数据失败", e);
+                            throw new RuntimeException(e);
                         }
                     }
                 };
+
+                // 资源清理
+                sseEmitter.onCompletion(() -> {
+                    scheduler.shutdown();
+                    redisTemplate.delete("sse:terminate:" + sessionId);
+                });
+
                 OkHttpClient client = buildOkHttpClient();
                 EventSource.Factory factory = EventSources.createFactory(client);
                 factory.newEventSource(request, listener);

+ 0 - 1
takai-bigmodel/src/main/java/com/takai/bigmodel/service/IBigModelService.java

@@ -1,6 +1,5 @@
 package com.takai.bigmodel.service;
 
-import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.takai.bigmodel.domain.dto.DialogReqDTO;
 import com.takai.bigmodel.domain.dto.DialogRespDTO;

+ 49 - 1
takai-bigmodel/src/main/java/com/takai/bigmodel/service/impl/BigModelServiceImpl.java

@@ -22,6 +22,7 @@ import okhttp3.sse.EventSources;
 import org.apache.commons.io.FileUtils;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
@@ -35,6 +36,8 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -78,6 +81,9 @@ public class BigModelServiceImpl implements IBigModelService
     @Autowired
     private FileInfoMapper fileInfoMapper;
 
+    @Autowired
+    public RedisTemplate redisTemplate;
+
 
     @Override
     public List<BmMediaReplacement> selectMediaList(BmMediaReplacement mData) {
@@ -117,6 +123,34 @@ public class BigModelServiceImpl implements IBigModelService
     @Override
     public SseEmitter sseInvoke(SseParams sseParams) {
         SseEmitter sseEmitter = new SseEmitter(0L);
+
+        // 生成唯一会话ID(可从请求参数获取或自动生成)
+        String sessionId = UUID.randomUUID().toString();
+
+        redisTemplate.opsForValue().set(
+                "sse:terminate:" + sessionId,
+                "false",
+                1, TimeUnit.HOURS
+        );
+
+        // 定时检查终止信号(每2秒检查一次)
+        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+        scheduler.scheduleAtFixedRate(() -> {
+            try {
+                String flag = redisTemplate.opsForValue()
+                        .get("sse:terminate:" + sessionId).toString();
+
+                if ("true".equals(flag)) {
+                    // 主动终止流程
+                    sseEmitter.complete();
+                    scheduler.shutdown();
+                    redisTemplate.delete("sse:terminate:" + sessionId);
+                }
+            } catch (Exception e) {
+                log.error("终止检查失败", e);
+            }
+        }, 0, 2, TimeUnit.SECONDS);
+
         String url = bigModelConfig.getBaseurl() + bigModelConfig.getSse().replace("{id}",sseParams.getAppId());
         JSONObject json = new JSONObject();
         List<PromptObject> list = sseParams.getPrompt();
@@ -152,6 +186,11 @@ public class BigModelServiceImpl implements IBigModelService
             @Override
             public void onEvent(@NotNull EventSource eventSource,  String id,  String type, @NotNull String data) {
                 try {
+                    // 检查Redis终止标记(双重保障)
+                    if ("true".equals(redisTemplate.opsForValue().get("sse:terminate:" + sessionId))) {
+                        eventSource.cancel();
+                        return;
+                    }
                     String newData = data.substring(preData.length());
                     preData = data;
                     if(newData.indexOf(START_SIGN) > -1 || symbolData.length() > 0) {
@@ -203,6 +242,7 @@ public class BigModelServiceImpl implements IBigModelService
                     //sseEmitter.send(json);
                 } catch (Exception e) {
                     log.error("智谱 推送数据失败", e);
+                    throw new RuntimeException(e);
                 }
             }
             @Override
@@ -211,14 +251,22 @@ public class BigModelServiceImpl implements IBigModelService
                 sseEmitter.completeWithError(t);
             }
 
-            private void send(SseEmitter sseEmitter, Object obj) {
+            private void send(SseEmitter sseEmitter, Object obj) throws IOException {
                 try {
                     sseEmitter.send(obj);
                 } catch (IOException e) {
                     log.error("智谱 推送数据失败", e);
+                    throw new RuntimeException(e);
                 }
             }
         };
+
+        // 资源清理
+        sseEmitter.onCompletion(() -> {
+            scheduler.shutdown();
+            redisTemplate.delete("sse:terminate:" + sessionId);
+        });
+
         OkHttpClient client = buildOkHttpClient();
         EventSource.Factory factory = EventSources.createFactory(client);
         factory.newEventSource(request, listener);