瀏覽代碼

sse接入 上传文档修改

yangkaixuan 1 月之前
父節點
當前提交
20a81579c0
共有 21 個文件被更改,包括 1624 次插入328 次删除
  1. 28 6
      takai-admin/src/main/java/com/takai/web/controller/takaiai/TakaiAiController.java
  2. 9 60
      takai-admin/src/main/resources/application-dev.yml
  3. 28 3
      takai-admin/src/main/resources/application.yml
  4. 21 0
      takai-ai/src/main/java/com/takai/ai/config/ThreadPoolConfig.java
  5. 27 0
      takai-ai/src/main/java/com/takai/ai/domain/dto/JkDocumentPythonParams.java
  6. 13 1
      takai-ai/src/main/java/com/takai/ai/domain/entity/TakaiDocument.java
  7. 10 0
      takai-ai/src/main/java/com/takai/ai/domain/entity/TakaiDocumentParams.java
  8. 5 5
      takai-ai/src/main/java/com/takai/ai/service/ITakaiAiService.java
  9. 139 68
      takai-ai/src/main/java/com/takai/ai/service/impl/TakaiAiServiceImpl.java
  10. 17 7
      takai-ai/src/main/resources/mapper/takaiai/TakaiDocumentMapper.xml
  11. 28 3
      takai-common/pom.xml
  12. 37 0
      takai-common/src/main/java/com/takai/common/config/SseAutoConfiguration.java
  13. 21 0
      takai-common/src/main/java/com/takai/common/config/SseProperties.java
  14. 91 0
      takai-common/src/main/java/com/takai/common/core/controller/SseController.java
  15. 28 0
      takai-common/src/main/java/com/takai/common/core/domain/SseMessageDto.java
  16. 169 174
      takai-common/src/main/java/com/takai/common/core/redis/RedisCache.java
  17. 50 0
      takai-common/src/main/java/com/takai/common/listener/SseTopicListener.java
  18. 585 0
      takai-common/src/main/java/com/takai/common/utils/RedisUtils.java
  19. 234 0
      takai-common/src/main/java/com/takai/common/utils/SseEmitterManager.java
  20. 83 0
      takai-common/src/main/java/com/takai/common/utils/SseMessageUtils.java
  21. 1 1
      takai-framework/src/main/java/com/takai/framework/config/SecurityConfig.java

+ 28 - 6
takai-admin/src/main/java/com/takai/web/controller/takaiai/TakaiAiController.java

@@ -3,9 +3,7 @@ package com.takai.web.controller.takaiai;
 import com.alibaba.fastjson2.JSONObject;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
-import com.takai.ai.domain.dto.TakaiDialogReqDTO;
-import com.takai.ai.domain.dto.TakaiQuestionDTO;
-import com.takai.ai.domain.dto.TakaiSliceImage;
+import com.takai.ai.domain.dto.*;
 import com.takai.ai.domain.entity.*;
 import com.takai.ai.service.ITakaiAiService;
 import com.takai.bigmodel.domain.entity.PageParams;
@@ -13,12 +11,12 @@ import com.takai.common.annotation.Log;
 import com.takai.common.constant.HttpStatus;
 import com.takai.common.core.controller.BaseController;
 import com.takai.common.core.domain.AjaxResult;
+import com.takai.common.core.domain.R;
 import com.takai.common.core.domain.TreeStruct;
 import com.takai.common.core.domain.entity.SysDictData;
 import com.takai.common.core.domain.entity.SysUser;
 import com.takai.common.core.domain.model.LoginUser;
 import com.takai.common.core.page.TableDataInfo;
-import com.takai.ai.domain.dto.TakaiDialogRespDTO;
 import com.takai.common.enums.BusinessType;
 import com.takai.common.enums.ProjectTypeEnum;
 import com.takai.common.exception.ServiceException;
@@ -40,6 +38,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -344,13 +343,36 @@ public class TakaiAiController extends BaseController {
     public AjaxResult uploadDocument(@RequestParam("files") MultipartFile[] files,
                                      @PathVariable String knowledgeId) throws Exception {
         try {
-            int i = takaiAisService.uploadDocument(files, knowledgeId);
-            return success(i);
+            // 调用异步方法
+            CompletableFuture<Integer> future = takaiAisService.uploadDocument(files, knowledgeId);
+            // 处理结果(非阻塞)
+            future.whenComplete((successCount, ex) -> {
+                if (ex != null) {
+                    log.error("上传文档异步任务失败", ex);
+                } else {
+                    log.info("上传文档完成,成功处理{}个文件", successCount);
+                    // 可触发后续逻辑,比如通知前端
+                }
+            });
+            return success(1);
         } catch (Exception e) {
             return error("0");
         }
     }
 
+    /**
+     * python文档解析回调接口
+     *
+     * @param params
+     * @return
+     * @throws Exception
+     */
+    @PostMapping("/updateDocumentByPython") //@RequestPart DocumentParams documentParams,
+    public R<Void> updateDocumentByPython(@RequestBody JkDocumentPythonParams params){
+        takaiAisService.updateDocumentByPython(params);
+        return R.ok();
+    }
+
 
     /**
      * 上传切片图片

+ 9 - 60
takai-admin/src/main/resources/application-dev.yml

@@ -6,15 +6,21 @@ spring:
         druid:
             # 主库数据源
             master:
-                url: jdbc:mysql://localhost:3306/chat_deepseek?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
-                username: root
+                #                url: jdbc:mysql://localhost:3306/ai_master?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+                #                username: root
+                #                password: password
+                url: jdbc:mysql://10.168.100.10:3306/deepseek_local?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+                username: takai
                 password: T@kai2025
                 hikari:
                     pool-name: master-Pool
             # 从库数据源
             slave:
                 # 从数据源开关/默认关闭
-                url: jdbc:mysql://localhost:3306/chat_zhipu?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+                #                url: jdbc:mysql://localhost:3306/tk-vue?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+                #                username: root
+                #                password: password
+                url: jdbc:mysql://xia0miduo.gicp.net:3336/chat_zhipu?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
                 username: root
                 password: T@kai2025
                 hikari:
@@ -70,60 +76,3 @@ spring:
             hibernate:
                 dialect: org.hibernate.dialect.MySQL8Dialect
                 format_sql: true
-deepseek:
-    #base url
-    baseurl: http://xia0miduo.gicp.net:6001
-#    baseurl: http://192.168.3.209:18079
-    #创建知识库
-    createKnowledge: /rag/create_collection
-    #删除知识库
-    deleteKnowledge: /rag/delete_knowledge
-    #获取切片列表
-    slicePage: /rag/search_word
-    #删除切片
-    deleteSlice: /rag/delete_slice
-    #切片详情
-    sliceDetail: /rag/search
-    #编辑切片信息
-    updateSlice: /rag/update_slice
-    #上传文件
-    uploadKnowledge: /upload_knowledge
-    #聊天
-    chat: /rag/chat
-    #异步提示词(还想问)
-    asyncCompletions: /rag/query
-    #删除知识文件
-    deleteDoc: /rag/delete_doc
-    #新增切片
-    addSlice: /rag/insert_slice
-    #获取排序切片列表
-    searchSlice: /rag/slice/search
-    #联网搜索结果
-    webSearch: /web/search
-    #deepseek单独聊天
-    baseChat: http://xia0miduo.gicp.net:6001
-    #提示词
-    prompt: 你是总结和提问大师。你只根据用户的对话记录,根据对话记录生成可能会问的问题,不要杜撰问题。问题的答案必须来自上面的对话记录。 你必须遵守以下要求:1. 不要输出用户问过的问题;2. 你需要输出3个问题供用户选择。3. 你只需要输出问题,不需要解释,不需要提问。4. 你的问题可以是空的,但你不能杜撰问题。5. 问题需要站在使用这个应用的人的视角提出,因此你要注意提问的语气和人称代词。不要用您这个字。6.生成的问题,必须能从提供的对话记录中找到答案。你一定要按照以下格式输出:{'问题':['xxx','xxx','xxx']}
-#jk:
-#    #Appid
-#    iamAppid: e971e84b574c40b2
-#    #Appsecret
-#    iamAppsecret: 51e7f274f71f4887b25015fc0743e871
-#    #clientID
-#    iamClientID: e97f94cf93761f4d69e8
-#    #clientSecret
-#    iamClientSecret: 13845f824b5d9f4f2a58159109aac317263d
-#    #用户接口url
-#    iamUserUrl: https://esctest.sribs.com.cn/esc-idm/api/v1/account/list
-#    #用户回调接口,每次用户接口调用成功后都要回调一次接口
-#    iamCallbackUrl: https://esctest.sribs.com.cn/esc-idm/api/v1/account/callback
-#    #部门接口url
-#    iamDeptUrl: https://esctest.sribs.com.cn/esc-idm/api/v1/org/listAll
-#    #岗位接口url
-#    iamPostUrl: https://esctest.sribs.com.cn/esc-idm/api/v1/job/listAll
-#    #code换取tokenurl
-#    iamTokenUrl: https://esctest.sribs.com.cn/esc-sso/oauth2.0/accessToken
-#    #token换取用户信息
-#    iamProfileUrl: https://esctest.sribs.com.cn/esc-sso/oauth2.0/profile
-#    #虚拟密码
-#    jkPsw: GybjcxhDe8XQtabXVP1A

+ 28 - 3
takai-admin/src/main/resources/application.yml

@@ -18,7 +18,7 @@ takai:
 # 开发环境配置
 server:
   # 服务器的HTTP端口,默认为8080
-  port: 8090
+  port: 8091
   servlet:
     # 应用的访问路径
     context-path: /
@@ -54,7 +54,7 @@ spring:
     # 国际化资源文件路径
     basename: i18n/messages
   profiles:
-    active: takaitest
+    active: dev
   # 文件上传
   servlet:
     multipart:
@@ -164,7 +164,8 @@ bigmodel:
   prompt: 你是总结和提问大师。 """应用的名称:{{应用名称}} 这是用户的历史对话记录:{{历史对话}}""" 你只根据用户的对话记录,推演出用户接下来可能提出的问题,不要杜撰问题。可以参考应用的名称和应用的简介。 你必须遵守以下要求:1. 不要输出用户问过的问题;2. 你需要输出3个问题供用户选择。3. 你只需要输出问题,不需要解释,不需要提问。4. 你的问题可以是空的,但你不能杜撰问题。5. 问题需要站在使用这个应用的人的视角提出,因此你要注意提问的语气和人称代词。不要用您这个字。你一定要按照以下格式输出:{"问题":["xxx","xxx","xxx"]}
 deepseek:
   #base url
-  baseurl: http://10.1.27.4:18079
+  baseurl: http://10.168.100.17:6666
+  #baseurl: http://10.1.27.4:18079
 #  baseurl: http://xia0miduo.gicp.net:6001
 #  baseurl: http://192.168.3.209:18078
   #创建知识库
@@ -221,3 +222,27 @@ jk:
   #虚拟密码
   jkPsw: GybjcxhDe8XQtabXVP1A
 
+# redisson 配置
+redisson:
+  # redis key前缀
+  keyPrefix:
+  # 线程池数量
+  threads: 4
+  # Netty线程池数量
+  nettyThreads: 8
+  # 单节点配置
+  singleServerConfig:
+  # 客户端名称 不能用中文
+  clientName: CHAT-BACKEND
+  # 最小空闲连接数
+  connectionMinimumIdleSize: 8
+  # 连接池大小
+  connectionPoolSize: 32
+  # 连接空闲超时,单位:毫秒
+  idleConnectionTimeout: 10000
+  # 命令等待超时,单位:毫秒
+  timeout: 3000
+  # 发布和订阅连接池大小
+  subscriptionConnectionPoolSize: 50
+
+

+ 21 - 0
takai-ai/src/main/java/com/takai/ai/config/ThreadPoolConfig.java

@@ -0,0 +1,21 @@
+package com.takai.ai.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+// 显式指定配置类的Bean名称为aiThreadPoolConfig,避免和framework的冲突
+@Configuration("aiThreadPoolConfig")
+public class ThreadPoolConfig {
+
+    @Bean("analysisFileExecutor") // 线程池Bean名称仍用这个
+    public ExecutorService analysisFileExecutor() {
+        return Executors.newFixedThreadPool(5, r -> {
+            Thread thread = new Thread(r);
+            thread.setName("analysis-file-pool-" + thread.getId());
+            thread.setDaemon(true);
+            return thread;
+        });
+    }
+}

+ 27 - 0
takai-ai/src/main/java/com/takai/ai/domain/dto/JkDocumentPythonParams.java

@@ -0,0 +1,27 @@
+package com.takai.ai.domain.dto;
+
+
+import lombok.Data;
+
+@Data
+public class JkDocumentPythonParams {
+
+    private String documentId;
+
+    private String knowledgeId;
+
+    private Integer length;
+
+    private Integer wordNum;
+
+    private Integer sliceTotal;
+
+    private String ossId;
+
+    private String mdMinIOUrl;
+
+    private String pdfUrl;
+
+    private String status;
+
+}

+ 13 - 1
takai-ai/src/main/java/com/takai/ai/domain/entity/TakaiDocument.java

@@ -5,9 +5,11 @@ import lombok.Builder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
+import java.util.List;
+
 /**
  * 智谱知识列表 bm_document
- * 
+ *
  * @author takai
  */
 @Builder
@@ -46,6 +48,16 @@ public class TakaiDocument extends BaseEntity
 
     private Integer sliceTotal;
 
+    private String status;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
     public String getDocumentId() {
         return documentId;
     }

+ 10 - 0
takai-ai/src/main/java/com/takai/ai/domain/entity/TakaiDocumentParams.java

@@ -34,6 +34,16 @@ public class TakaiDocumentParams {
 
     private String url;
 
+    private String status;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
     public MultipartFile[] getFiles() {
         return files;
     }

+ 5 - 5
takai-ai/src/main/java/com/takai/ai/service/ITakaiAiService.java

@@ -1,16 +1,14 @@
 package com.takai.ai.service;
 
 import com.alibaba.fastjson2.JSONObject;
-import com.takai.ai.domain.dto.TakaiDialogReqDTO;
-import com.takai.ai.domain.dto.TakaiDialogRespDTO;
-import com.takai.ai.domain.dto.TakaiQuestionDTO;
-import com.takai.ai.domain.dto.TakaiSliceImage;
+import com.takai.ai.domain.dto.*;
 import com.takai.ai.domain.entity.*;
 import com.takai.common.core.domain.entity.SysDictData;
 import org.springframework.web.multipart.MultipartFile;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * 高井 业务层
@@ -46,7 +44,9 @@ public interface ITakaiAiService
 
     int delKnowledge(String knowledgeId);
 
-    int uploadDocument(MultipartFile[] files, String knowledgeId) throws Exception;
+    CompletableFuture<Integer> uploadDocument(MultipartFile[] files, String knowledgeId) throws Exception;
+
+    void updateDocumentByPython(JkDocumentPythonParams parms);
 
     int updateDocument(TakaiDocumentParams documentParams, String documentId);
 

+ 139 - 68
takai-ai/src/main/java/com/takai/ai/service/impl/TakaiAiServiceImpl.java

@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.web.multipart.MultipartFile;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -49,7 +50,10 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -134,6 +138,10 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
     @Autowired
     private Base64DecodeUtil base64DecodeUtil;
 
+    // 注入步骤1配置的线程池
+    private final ExecutorService analysisFileExecutor;
+
+
     public static final String START_SIGN = "【";
     public static final String END_SIGN = "】";
     public static final String SYMBOL = "【示意图序号";
@@ -152,6 +160,10 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
     @Autowired
     private ISysUserService userService;
 
+    public TakaiAiServiceImpl(ExecutorService analysisFileExecutor) {
+        this.analysisFileExecutor = analysisFileExecutor;
+    }
+
     @Override
     public void initDeepseekInfo() {
         //获取应用列表
@@ -657,79 +669,131 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
     }
 
     @Override
-    public int uploadDocument(MultipartFile[] files, String knowledgeId) {
-        List<TakaiSysOss> result = null;
-        try {
-            result = minioUtil.uploadMultiple(files);
-        } catch (Exception e) {
-            logger.error("上传文件失败", e.getMessage());
-        }
-        if (result != null && result.size() > 0) {
-            List<UploadDocumentParams> params = new ArrayList<>();
-            for (int i = 0; i<result.size(); i++) {
-                SnowflakeDigitGenerator documentIdGenerator = new SnowflakeDigitGenerator(i, i);
-                long documentId = documentIdGenerator.nextId();
-                UploadDocumentParams param = new UploadDocumentParams();
-                param.setDocument_id("a" + documentId);
-                param.setName(result.get(i).getOriginalName());
-                param.setUrl(result.get(i).getUrl());
-                params.add(param);
+    /**
+     * 异步上传文档,analysisFile最大并发5个
+     * @return 成功处理的文件数量
+     */
+    public CompletableFuture<Integer> uploadDocument(MultipartFile[] files, String knowledgeId) {
+        // 整体逻辑异步执行(默认使用ForkJoinPool,也可指定自定义线程池)
+        return CompletableFuture.supplyAsync(() -> {
+            // 记录成功处理的文件数
+            AtomicInteger successCount = new AtomicInteger(0);
+            // 收集analysisFile的异步任务,用于等待全部完成
+            List<CompletableFuture<Void>> analysisTasks = new ArrayList<>();
+
+            // 1. 先同步上传文件到MinIO(IO操作,建议后续也异步,这里先保留)
+            List<TakaiSysOss> ossList;
+            try {
+                ossList = minioUtil.uploadMultiple(files);
+                if (ossList == null || ossList.isEmpty()) {
+                    log.error("MinIO上传文件为空,直接返回");
+                    return 0;
+                }
+            } catch (Exception e) {
+                log.error("MinIO上传文件失败", e);
+                return 0;
             }
 
-            TakaiDocumentSettings settings = new TakaiDocumentSettings();
-            settings.setKnowledgeId(knowledgeId);
-            JSONObject jsonObject = analysisFile(params, settings, "upload");
-            String updateBy = SecurityUtils.getLoginUser() == null ? "":SecurityUtils.getLoginUser().getUserId();
-            if (jsonObject != null && jsonObject.containsKey("code") && jsonObject.getInteger("code") == 200) {
-                for (UploadDocumentParams vo : params) {
-                    // 更新知识库文件大小,总字符数, 文件总数
-                    JSONObject docInfo = jsonObject.getJSONObject("doc_info");
-                    if(docInfo != null){
-                        Integer fileLen = docInfo.getInteger("file_size");
-                        Integer wordNum = docInfo.getInteger("total_char_len");
-                        Integer sliceTotal = docInfo.getInteger("slice_num");
-
-                        for (TakaiSysOss oss : result) {
-                            // oss保存到数据库
-                            takaiSysOssMapper.insertSysOss(oss);
-                        }
+            // 2. 循环处理每个文件
+            String updateBy = SecurityUtils.getLoginUser() == null ? "" : SecurityUtils.getLoginUser().getUserId();
+            for (int i = 0; i < ossList.size(); i++) {
+                TakaiSysOss oss = ossList.get(i);
+                try {
+                    // 生成文档ID
+                    SnowflakeDigitGenerator documentIdGenerator = new SnowflakeDigitGenerator(i, i);
+                    long documentId = documentIdGenerator.nextId();
+                    String docId = "a" + documentId;
+
+                    // 构建上传参数
+                    UploadDocumentParams param = new UploadDocumentParams();
+                    param.setDocument_id(docId);
+                    param.setName(oss.getOriginalName());
+                    param.setUrl(oss.getUrl());
+                    List<UploadDocumentParams> params = new ArrayList<>();
+                    params.add(param);
+
+                    // 3. 同步保存OSS到数据库
+                    takaiSysOssMapper.insertSysOss(oss);
+
+                    // 4. 保存文档信息到数据库
+                    TakaiDocument document = TakaiDocument.builder()
+                            .documentId(docId)
+                            .knowledgeId(knowledgeId)
+                            .customSeparator(String.format("[\"%s\"", "\\n") + "]")
+                            .sentenceSize("300")
+                            .name(oss.getOriginalName())
+                            .url(oss.getUrl())
+                            .status("0")
+                            .build();
+                    document.setCreateBy(updateBy);
+                    int insertResult = takaiDocumentMapper.insertDocument(document);
+
+                    // 5. 保存文档设置
+                    if (insertResult > 0) {
+                        SnowflakeDigitGenerator settingsIdGenerator = new SnowflakeDigitGenerator(1, 1);
+                        long settingsId = settingsIdGenerator.nextId();
+                        TakaiDocumentSettings dSettings = new TakaiDocumentSettings();
+                        dSettings.setId(String.valueOf(settingsId));
+                        dSettings.setKnowledgeId(knowledgeId);
+                        dSettings.setDocumentId(docId);
+                        dSettings.setSetSlice("0");   // 默认 按标题段落切片
+                        dSettings.setSetAnalyze("1"); // 默认 图片转换成标识符
+                        dSettings.setSetTable("0");   // 默认 table转图片
+                        takaiDocumentSettingsMapper.insertDocumentSettings(dSettings);
+                    }
 
-                        // 保存知识信息
-                        TakaiDocument document = TakaiDocument.builder()
-                                .documentId(vo.getDocument_id())
-                                .knowledgeId(knowledgeId)
-                                .customSeparator(String.format("[\"%s\"", "\\n") + "]")
-                                .sentenceSize("300")
-                                .name(vo.getName())
-                                .url(vo.getUrl())
-                                .sliceTotal(sliceTotal)
-                                .length(fileLen)
-                                .wordNum(wordNum)
-                                .build();
-                        document.setCreateBy(updateBy);
-                        int documentIdInsert = takaiDocumentMapper.insertDocument(document);
-
-                        if (documentIdInsert > 0) {
-                            // 保存知识设置信息
-                            SnowflakeDigitGenerator snowflakeDigitGenerator = new SnowflakeDigitGenerator(1, 1);
-                            long id = snowflakeDigitGenerator.nextId();
-                            TakaiDocumentSettings dSettings = new TakaiDocumentSettings();
-                            dSettings.setId(String.valueOf(id));
-                            dSettings.setKnowledgeId(knowledgeId);
-                            dSettings.setDocumentId(vo.getDocument_id());
-                            dSettings.setSetSlice("0");   // 默认 按标题段落切片
-                            dSettings.setSetAnalyze("1"); // 默认 图片转换成标识符
-                            dSettings.setSetTable("0");   // 默认 ttable转图片
-                            takaiDocumentSettingsMapper.insertDocumentSettings(dSettings);
+                    // 6. 异步调用analysisFile(限制5个并发)
+                    TakaiDocumentSettings settings = new TakaiDocumentSettings();
+                    settings.setKnowledgeId(knowledgeId);
+                    // 注意:捕获循环变量,避免闭包陷阱
+                    List<UploadDocumentParams> finalParams = new ArrayList<>(params);
+                    CompletableFuture<Void> analysisTask = CompletableFuture.runAsync(() -> {
+                        try {
+                            // 执行文件解析逻辑
+                            analysisFile(finalParams, settings, "upload");
+                            log.info("文件解析完成:{}", oss.getOriginalName());
+                        } catch (Exception e) {
+                            log.error("文件解析失败:{}", oss.getOriginalName(), e);
                         }
-                    }
+                    }, analysisFileExecutor); // 使用5线程池执行
+
+                    // 添加到任务列表
+                    analysisTasks.add(analysisTask);
+                    successCount.incrementAndGet();
+
+                } catch (Exception e) {
+                    log.error("处理文件失败:{}", oss.getOriginalName(), e);
+                    // 单个文件失败不影响其他文件,继续处理
+                    continue;
                 }
-                updateKnowledgeSize(knowledgeId);
-                setRedisCache(knowledgeId);
-                return 1;
             }
-        }
-        return 0;
+
+            // 7. 等待所有analysisFile异步任务完成(不阻塞主线程,仅等待任务结束)
+            try {
+                CompletableFuture.allOf(analysisTasks.toArray(new CompletableFuture[0])).join();
+                log.info("所有文件解析任务已完成,成功处理文件数:{}", successCount.get());
+            } catch (Exception e) {
+                log.error("等待解析任务完成失败", e);
+            }
+
+            // 返回成功处理的文件数
+            return successCount.get();
+        });
+    }
+
+    @Override
+    public void updateDocumentByPython(JkDocumentPythonParams parms) {
+        TakaiDocument document = TakaiDocument.builder()
+                .documentId(parms.getDocumentId())
+                .knowledgeId(parms.getKnowledgeId())
+                .status(parms.getStatus())
+                .sliceTotal(parms.getSliceTotal())
+                .length(parms.getLength())
+                .wordNum(parms.getWordNum())
+                .build();
+        takaiDocumentMapper.updateDocument(document);
+        updateKnowledgeSize(parms.getKnowledgeId());
+        setRedisCache(parms.getKnowledgeId());
     }
 
     private void updateKnowledgeSize(String knowledgeId) {
@@ -787,7 +851,14 @@ public class TakaiAiServiceImpl implements ITakaiAiService {
 
     @Override
     public List<TakaiDocument> documentList(TakaiDocumentParams documentParams) {
-        List<TakaiDocument> documentList = takaiDocumentMapper.selectDocumentList(TakaiDocument.builder().knowledgeId(documentParams.getKnowledge_id()).build());
+        List<String> statusList = new ArrayList<>();
+        if(null != documentParams.getStatus()) {
+            statusList.add(documentParams.getStatus());
+            if(documentParams.getStatus().equals("0")) {
+                statusList.add("2");
+            }
+        }
+        List<TakaiDocument> documentList = takaiDocumentMapper.selectDocumentList(TakaiDocument.builder().knowledgeId(documentParams.getKnowledge_id()).status(documentParams.getStatus()).build());
         replaceOssUrl(documentList);
         return documentList;
     }

+ 17 - 7
takai-ai/src/main/resources/mapper/takaiai/TakaiDocumentMapper.xml

@@ -3,7 +3,7 @@
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="com.takai.ai.mapper.TakaiDocumentMapper">
-	
+
 	<resultMap type="com.takai.ai.domain.entity.TakaiDocument" id="BmDocumentResult">
 		<id property="documentId"   column="document_id"   />
 		<result property="knowledgeId"  column="knowledge_id"  />
@@ -21,11 +21,12 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 		<result property="updateBy"   column="update_by"   />
 		<result property="updateTime" column="update_time" />
 		<result property="sliceTotal" column="slice_total" />
+		<result property="status" column="status" />
 	</resultMap>
-	
+
 	<sql id="selectSql">
 		select document_id, knowledge_id,custom_separator, knowledge_type, sentence_size, length, word_num, name,url,
-			   parse_image, remark, create_by, create_time, update_by, update_time, slice_total
+			   parse_image, remark, create_by, create_time, update_by, update_time, slice_total,status
 		from bm_document
     </sql>
 
@@ -38,15 +39,21 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 			<if test="knowledgeId != null and knowledgeId != ''">
 				AND knowledge_id = #{knowledgeId}
 			</if>
+			<if test="status != null and status == 0">
+				AND status in ('0','2')
+			</if>
+			<if test="status != null and status != '' and status != 0">
+				AND status = '1'
+			</if>
 		</where>
 		order by create_time asc
 	</select>
-	
+
 	<select id="selectTargetDocument" parameterType="BmDocument" resultMap="BmDocumentResult">
 		<include refid="selectSql"/>
 		where document_id = #{documentId}
 	</select>
- 	
+
  	<insert id="insertDocument" parameterType="BmDocument">
  		insert into bm_document(
  			<if test="documentId != null  and documentId != ''">document_id,</if>
@@ -62,6 +69,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  			<if test="remark != null and remark != ''">remark,</if>
  			<if test="createBy != null and createBy != ''">create_by,</if>
 		    <if test="sliceTotal != null and sliceTotal != ''">slice_total,</if>
+			<if test="status != null and status != ''">status,</if>
  			create_time
  		)values(
 			<if test="documentId != null  and documentId != ''">#{documentId},</if>
@@ -77,6 +85,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  			<if test="remark != null and remark != ''">#{remark},</if>
  			<if test="createBy != null and createBy != ''">#{createBy},</if>
 		    <if test="sliceTotal != null and sliceTotal != ''">#{sliceTotal},</if>
+			<if test="status != null and status != ''">status,</if>
  			sysdate()
  		)
 	</insert>
@@ -96,6 +105,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 			<if test="updateBy != null and updateBy != ''">update_by = #{updateBy},</if>
 			<if test="remark != null">remark = #{remark},</if>
 			<if test="sliceTotal != null">slice_total = #{sliceTotal},</if>
+			<if test="status != null and status != ''">status = #{status},</if>
 			update_time = sysdate()
 		</set>
 		where document_id = #{documentId}
@@ -123,5 +133,5 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 		</where>
 		order by create_time asc
 	</select>
-	
-</mapper> 
+
+</mapper>

+ 28 - 3
takai-common/pom.xml

@@ -52,13 +52,13 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
-  
+
         <!-- JSON工具类 -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
-        
+
         <!-- 动态数据源 -->
 		<dependency>
 			<groupId>com.baomidou</groupId>
@@ -129,7 +129,32 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.8.9</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-webmvc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>cn.dev33</groupId>
+            <artifactId>sa-token-core</artifactId>
+            <version>1.44.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson</artifactId>
+            <version>3.51.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-autoconfigure</artifactId>
+            <version>3.5.7</version>
+        </dependency>
 
     </dependencies>
 
-</project>
+</project>

+ 37 - 0
takai-common/src/main/java/com/takai/common/config/SseAutoConfiguration.java

@@ -0,0 +1,37 @@
+package com.takai.common.config;
+
+
+import com.takai.common.core.controller.SseController;
+import com.takai.common.listener.SseTopicListener;
+import com.takai.common.utils.SseEmitterManager;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+/**
+ * SSE 自动装配
+ *
+ * @author jk
+ */
+@AutoConfiguration
+@ConditionalOnProperty(value = "true", havingValue = "true")
+@EnableConfigurationProperties(SseProperties.class)
+public class SseAutoConfiguration {
+
+    @Bean
+    public SseEmitterManager sseEmitterManager() {
+        return new SseEmitterManager();
+    }
+
+    @Bean
+    public SseTopicListener sseTopicListener() {
+        return new SseTopicListener();
+    }
+
+    @Bean
+    public SseController sseController(SseEmitterManager sseEmitterManager) {
+        return new SseController(sseEmitterManager);
+    }
+
+}

+ 21 - 0
takai-common/src/main/java/com/takai/common/config/SseProperties.java

@@ -0,0 +1,21 @@
+package com.takai.common.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * SSE 配置项
+ *
+ * @author jk
+ */
+@Data
+@ConfigurationProperties("sse")
+public class SseProperties {
+
+    private Boolean enabled;
+
+    /**
+     * 路径
+     */
+    private String path;
+}

+ 91 - 0
takai-common/src/main/java/com/takai/common/core/controller/SseController.java

@@ -0,0 +1,91 @@
+package com.takai.common.core.controller;
+
+import cn.dev33.satoken.annotation.SaIgnore;
+import com.takai.common.core.domain.R;
+import com.takai.common.core.domain.SseMessageDto;
+import com.takai.common.core.domain.model.LoginUser;
+import com.takai.common.utils.SecurityUtils;
+import com.takai.common.utils.SseEmitterManager;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+/**
+ * SSE 控制器
+ *
+ * @author jk
+ */
+@RestController
+public class SseController implements DisposableBean {
+
+    @Autowired
+    private SseEmitterManager sseEmitterManager;
+
+    public SseController(SseEmitterManager sseEmitterManager) {
+    }
+
+    /**
+     * 建立 SSE 连接
+     */
+    @GetMapping(value = "/resource/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+    public SseEmitter connect() {
+        LoginUser loginUser = SecurityUtils.getLoginUser();
+        if (loginUser == null) {
+            return null;
+        }
+        String tokenValue = loginUser.getToken();
+        String userId = loginUser.getUserId();
+        return sseEmitterManager.connect(Long.parseLong(userId), tokenValue);
+    }
+
+    /**
+     * 关闭 SSE 连接
+     */
+    @SaIgnore
+    @GetMapping(value = "/resource/sse/close")
+    public R<Void> close() {
+        LoginUser loginUser = SecurityUtils.getLoginUser();
+        String tokenValue = loginUser.getToken();
+        String userId = loginUser.getUserId();
+        sseEmitterManager.disconnect(Long.parseLong(userId), tokenValue);
+        return R.ok();
+    }
+
+    // 以下为demo仅供参考 禁止使用 请在业务逻辑中使用工具发送而不是用接口发送
+    /**
+     * 向特定用户发送消息
+     *
+     * @param sseMessageDto
+     */
+    @SaIgnore
+    @PostMapping(value = "/resource/sse/send")
+    public R<Void> send(@RequestBody SseMessageDto sseMessageDto) {
+        sseEmitterManager.publishMessage(sseMessageDto);
+        return R.ok();
+    }
+
+//    /**
+//     * 向所有用户发送消息
+//     *
+//     * @param msg 要发送的消息内容
+//     */
+//    @GetMapping(value = "${sse.path}/sendAll")
+//    public R<Void> send(String msg) {
+//        sseEmitterManager.publishAll(msg);
+//        return R.ok();
+//    }
+
+    /**
+     * 清理资源。此方法目前不执行任何操作,但避免因未实现而导致错误
+     */
+    @Override
+    public void destroy() throws Exception {
+        // 销毁时不需要做什么 此方法避免无用操作报错
+    }
+
+}

+ 28 - 0
takai-common/src/main/java/com/takai/common/core/domain/SseMessageDto.java

@@ -0,0 +1,28 @@
+package com.takai.common.core.domain;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 消息的dto
+ *
+ * @author jk
+ */
+@Data
+public class SseMessageDto implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 需要推送到的session key 列表
+     */
+    private List<Long> userIds;
+
+    /**
+     * 需要发送的消息
+     */
+    private String message;
+
+}

+ 169 - 174
takai-common/src/main/java/com/takai/common/core/redis/RedisCache.java

@@ -1,17 +1,24 @@
 package com.takai.common.core.redis;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.BoundSetOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.BoundSetOperations;
-import org.springframework.data.redis.core.HashOperations;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.ValueOperations;
-import org.springframework.stereotype.Component;
+import java.util.function.Consumer;
 
 /**
  * spring redis 工具类
@@ -25,244 +32,232 @@ public class RedisCache
     @Autowired
     public RedisTemplate redisTemplate;
 
+    // 手动创建的监听器容器
+    private RedisMessageListenerContainer listenerContainer;
+
+    // 存储订阅关系
+    private final Map<String, MessageListener> listeners = new ConcurrentHashMap<>();
+
+    // 订阅线程池(核心:解决subscriptionExecutor为null问题)
+    private ThreadPoolTaskExecutor subscriptionExecutor;
+
     /**
-     * 缓存基本的对象,Integer、String、实体类等
-     *
-     * @param key 缓存的键值
-     * @param value 缓存的值
+     * 初始化订阅线程池(Spring风格,避免硬编码)
      */
-    public <T> void setCacheObject(final String key, final T value)
-    {
-        redisTemplate.opsForValue().set(key, value);
+    private void initSubscriptionExecutor() {
+        if (subscriptionExecutor == null) {
+            subscriptionExecutor = new ThreadPoolTaskExecutor();
+            // 线程池核心参数(可根据业务调优)
+            subscriptionExecutor.setCorePoolSize(5); // 核心线程数
+            subscriptionExecutor.setMaxPoolSize(10); // 最大线程数
+            subscriptionExecutor.setQueueCapacity(20); // 队列容量
+            subscriptionExecutor.setKeepAliveSeconds(60); // 空闲线程存活时间
+            subscriptionExecutor.setThreadNamePrefix("redis-sub-"); // 线程名前缀
+            subscriptionExecutor.setRejectedExecutionHandler((r, executor) -> {
+                // 拒绝策略:打印日志 + 重试提交
+                System.err.println("Redis订阅任务被拒绝,任务:" + r.toString());
+                try {
+                    executor.getQueue().offer(r, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+            // 初始化线程池(必须调用,否则线程池未启动)
+            subscriptionExecutor.initialize();
+        }
     }
 
     /**
-     * 缓存基本的对象,Integer、String、实体类等
-     *
-     * @param key 缓存的键值
-     * @param value 缓存的值
-     * @param timeout 时间
-     * @param timeUnit 时间颗粒度
+     * 发布消息(使用 Redis 发布订阅)
      */
-    public <T> void setCacheObject(final String key, final T value, final Integer timeout, final TimeUnit timeUnit)
-    {
-        redisTemplate.opsForValue().set(key, value, timeout, timeUnit);
+    public <T> void publish(String channelKey, T msg) {
+        redisTemplate.convertAndSend(channelKey, msg);
     }
 
     /**
-     * 设置有效时间
-     *
-     * @param key Redis键
-     * @param timeout 超时时间
-     * @return true=设置成功;false=设置失败
+     * 发布消息,并执行回调
      */
-    public boolean expire(final String key, final long timeout)
-    {
-        return expire(key, timeout, TimeUnit.SECONDS);
+    public <T> void publish(String channelKey, T msg, Consumer<T> consumer) {
+        redisTemplate.convertAndSend(channelKey, msg);
+        if (consumer != null) {
+            consumer.accept(msg);
+        }
     }
 
     /**
-     * 设置有效时间
-     *
-     * @param key Redis键
-     * @param timeout 超时时间
-     * @param unit 时间单位
-     * @return true=设置成功;false=设置失败
+     * 订阅频道
      */
-    public boolean expire(final String key, final long timeout, final TimeUnit unit)
-    {
-        return redisTemplate.expire(key, timeout, unit);
+    public <T> void subscribe(String channelKey, Class<T> clazz, Consumer<T> consumer) {
+        // 1. 先初始化线程池
+        initSubscriptionExecutor();
+        // 2. 初始化监听器容器(含线程池配置)
+        if (listenerContainer == null) {
+            initListenerContainer();
+        }
+
+        MessageListener listener = new MessageListener() {
+            @Override
+            public void onMessage(Message message, byte[] pattern) {
+                try {
+                    Object value = redisTemplate.getValueSerializer().deserialize(message.getBody());
+                    if (clazz.isInstance(value)) {
+                        T typedValue = clazz.cast(value);
+                        consumer.accept(typedValue);
+                    } else if (value instanceof String && clazz.equals(String.class)) {
+                        @SuppressWarnings("unchecked")
+                        T typedValue = (T) value;
+                        consumer.accept(typedValue);
+                    }
+                } catch (Exception e) {
+                    System.err.println("处理消息异常: " + e.getMessage());
+                    e.printStackTrace();
+                }
+            }
+        };
+        // 现在添加监听器不会报subscriptionExecutor null了
+        listenerContainer.addMessageListener(listener, new ChannelTopic(channelKey));
+        String listenerKey = channelKey + "_" + System.currentTimeMillis();
+        listeners.put(listenerKey, listener);
     }
 
     /**
-     * 获取有效时间
-     *
-     * @param key Redis键
-     * @return 有效时间
+     * 取消订阅
      */
-    public long getExpire(final String key)
-    {
-        return redisTemplate.getExpire(key);
+    public void unsubscribe(String channelKey) {
+        if (listenerContainer == null) {
+            return;
+        }
+
+        listeners.entrySet().removeIf(entry -> {
+            if (entry.getKey().startsWith(channelKey + "_")) {
+                listenerContainer.removeMessageListener(entry.getValue());
+                return true;
+            }
+            return false;
+        });
     }
 
     /**
-     * 判断 key是否存在
-     *
-     * @param key 键
-     * @return true 存在 false不存在
+     * 初始化监听器容器(核心:配置线程池 + 连接工厂)
      */
-    public Boolean hasKey(String key)
-    {
-        return redisTemplate.hasKey(key);
+    private synchronized void initListenerContainer() {
+        if (listenerContainer == null) {
+            listenerContainer = new RedisMessageListenerContainer();
+            // 1. 设置连接工厂(原有)
+            listenerContainer.setConnectionFactory(redisTemplate.getConnectionFactory());
+            // 2. 设置订阅线程池(解决null问题的核心)
+            listenerContainer.setSubscriptionExecutor(subscriptionExecutor);
+            // 3. 可选:设置任务执行器(处理消息的线程池)
+            listenerContainer.setTaskExecutor(subscriptionExecutor);
+            // 4. 启动容器(必须在所有属性设置完成后调用)
+            listenerContainer.start();
+            System.out.println("RedisMessageListenerContainer 初始化成功(含线程池配置)");
+        }
     }
 
     /**
-     * 获得缓存的基本对象。
-     *
-     * @param key 缓存键值
-     * @return 缓存键值对应的数据
+     * 清理资源(优雅关闭线程池 + 容器)
      */
-    public <T> T getCacheObject(final String key)
-    {
-        ValueOperations<String, T> operation = redisTemplate.opsForValue();
-        return operation.get(key);
+    @PreDestroy
+    public void destroy() throws Exception {
+        // 1. 关闭监听器容器
+        if (listenerContainer != null) {
+            listenerContainer.stop();
+            listenerContainer.destroy();
+        }
+        // 2. 关闭订阅线程池
+        if (subscriptionExecutor != null) {
+            subscriptionExecutor.shutdown();
+        }
+        System.out.println("RedisCache 资源已清理");
     }
 
-    /**
-     * 删除单个对象
-     *
-     * @param key
-     */
-    public boolean deleteObject(final String key)
-    {
+    // ========== 以下为原有Redis操作方法(无需修改) ==========
+    public <T> void setCacheObject(final String key, final T value) {
+        redisTemplate.opsForValue().set(key, value);
+    }
+
+    public <T> void setCacheObject(final String key, final T value, final Integer timeout, final TimeUnit timeUnit) {
+        redisTemplate.opsForValue().set(key, value, timeout, timeUnit);
+    }
+
+    public boolean expire(final String key, final long timeout) {
+        return expire(key, timeout, TimeUnit.SECONDS);
+    }
+
+    public boolean expire(final String key, final long timeout, final TimeUnit unit) {
+        return redisTemplate.expire(key, timeout, unit);
+    }
+
+    public long getExpire(final String key) {
+        return redisTemplate.getExpire(key);
+    }
+
+    public Boolean hasKey(String key) {
+        return redisTemplate.hasKey(key);
+    }
+
+    public <T> T getCacheObject(final String key) {
+        return (T) redisTemplate.opsForValue().get(key);
+    }
+
+    public boolean deleteObject(final String key) {
         return redisTemplate.delete(key);
     }
 
-    /**
-     * 删除集合对象
-     *
-     * @param collection 多个对象
-     * @return
-     */
-    public boolean deleteObject(final Collection collection)
-    {
+    public boolean deleteObject(final Collection collection) {
         return redisTemplate.delete(collection) > 0;
     }
 
-    /**
-     * 缓存List数据
-     *
-     * @param key 缓存的键值
-     * @param dataList 待缓存的List数据
-     * @return 缓存的对象
-     */
-    public <T> long setCacheList(final String key, final List<T> dataList)
-    {
+    public <T> long setCacheList(final String key, final List<T> dataList) {
         Long count = redisTemplate.opsForList().rightPushAll(key, dataList);
         return count == null ? 0 : count;
     }
 
-    /**
-     * 获得缓存的list对象
-     *
-     * @param key 缓存的键值
-     * @return 缓存键值对应的数据
-     */
-    public <T> List<T> getCacheList(final String key)
-    {
+    public <T> List<T> getCacheList(final String key) {
         return redisTemplate.opsForList().range(key, 0, -1);
     }
 
-    /**
-     * 缓存Set
-     *
-     * @param key 缓存键值
-     * @param dataSet 缓存的数据
-     * @return 缓存数据的对象
-     */
-    public <T> BoundSetOperations<String, T> setCacheSet(final String key, final Set<T> dataSet)
-    {
+    public <T> BoundSetOperations<String, T> setCacheSet(final String key, final Set<T> dataSet) {
         BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key);
         Iterator<T> it = dataSet.iterator();
-        while (it.hasNext())
-        {
+        while (it.hasNext()) {
             setOperation.add(it.next());
         }
         return setOperation;
     }
 
-    /**
-     * 获得缓存的set
-     *
-     * @param key
-     * @return
-     */
-    public <T> Set<T> getCacheSet(final String key)
-    {
+    public <T> Set<T> getCacheSet(final String key) {
         return redisTemplate.opsForSet().members(key);
     }
 
-    /**
-     * 缓存Map
-     *
-     * @param key
-     * @param dataMap
-     */
-    public <T> void setCacheMap(final String key, final Map<String, T> dataMap)
-    {
+    public <T> void setCacheMap(final String key, final Map<String, T> dataMap) {
         if (dataMap != null) {
             redisTemplate.opsForHash().putAll(key, dataMap);
         }
     }
 
-    /**
-     * 获得缓存的Map
-     *
-     * @param key
-     * @return
-     */
-    public <T> Map<String, T> getCacheMap(final String key)
-    {
+    public <T> Map<String, T> getCacheMap(final String key) {
         return redisTemplate.opsForHash().entries(key);
     }
 
-    /**
-     * 往Hash中存入数据
-     *
-     * @param key Redis键
-     * @param hKey Hash键
-     * @param value 值
-     */
-    public <T> void setCacheMapValue(final String key, final String hKey, final T value)
-    {
+    public <T> void setCacheMapValue(final String key, final String hKey, final T value) {
         redisTemplate.opsForHash().put(key, hKey, value);
     }
 
-    /**
-     * 获取Hash中的数据
-     *
-     * @param key Redis键
-     * @param hKey Hash键
-     * @return Hash中的对象
-     */
-    public <T> T getCacheMapValue(final String key, final String hKey)
-    {
-        HashOperations<String, String, T> opsForHash = redisTemplate.opsForHash();
-        return opsForHash.get(key, hKey);
+    public <T> T getCacheMapValue(final String key, final String hKey) {
+        return (T) redisTemplate.opsForHash().get(key, hKey);
     }
 
-    /**
-     * 获取多个Hash中的数据
-     *
-     * @param key Redis键
-     * @param hKeys Hash键集合
-     * @return Hash对象集合
-     */
-    public <T> List<T> getMultiCacheMapValue(final String key, final Collection<Object> hKeys)
-    {
+    public <T> List<T> getMultiCacheMapValue(final String key, final Collection<Object> hKeys) {
         return redisTemplate.opsForHash().multiGet(key, hKeys);
     }
 
-    /**
-     * 删除Hash中的某条数据
-     *
-     * @param key Redis键
-     * @param hKey Hash键
-     * @return 是否成功
-     */
-    public boolean deleteCacheMapValue(final String key, final String hKey)
-    {
+    public boolean deleteCacheMapValue(final String key, final String hKey) {
         return redisTemplate.opsForHash().delete(key, hKey) > 0;
     }
 
-    /**
-     * 获得缓存的基本对象列表
-     *
-     * @param pattern 字符串前缀
-     * @return 对象列表
-     */
-    public Collection<String> keys(final String pattern)
-    {
+    public Collection<String> keys(final String pattern) {
         return redisTemplate.keys(pattern);
     }
 }

+ 50 - 0
takai-common/src/main/java/com/takai/common/listener/SseTopicListener.java

@@ -0,0 +1,50 @@
+package com.takai.common.listener;
+
+import cn.hutool.core.collection.CollUtil;
+import com.takai.common.utils.SseEmitterManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+import org.springframework.stereotype.Component;
+
+/**
+ * SSE 主题订阅监听器
+ *
+ * @author jk
+ */
+@Slf4j
+@Component
+public class SseTopicListener implements ApplicationRunner, Ordered {
+
+    @Autowired
+    private SseEmitterManager sseEmitterManager;
+
+    /**
+     * 在Spring Boot应用程序启动时初始化SSE主题订阅监听器
+     *
+     * @param args 应用程序参数
+     * @throws Exception 初始化过程中可能抛出的异常
+     */
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+        sseEmitterManager.subscribeMessage((message) -> {
+            log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
+            // 如果key不为空就按照key发消息 如果为空就群发
+            if (CollUtil.isNotEmpty(message.getUserIds())) {
+                message.getUserIds().forEach(key -> {
+                    sseEmitterManager.sendMessage(key, message.getMessage());
+                });
+            } else {
+                sseEmitterManager.sendMessage(message.getMessage());
+            }
+        });
+        log.info("初始化SSE主题订阅监听器成功");
+    }
+
+    @Override
+    public int getOrder() {
+        return -1;
+    }
+}

+ 585 - 0
takai-common/src/main/java/com/takai/common/utils/RedisUtils.java

@@ -0,0 +1,585 @@
+//package com.takai.common.utils;
+//
+//import com.takai.common.utils.spring.SpringUtils;
+//import lombok.AccessLevel;
+//import lombok.NoArgsConstructor;
+//import org.redisson.api.*;
+//import org.redisson.api.options.KeysScanOptions;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//
+//import java.time.Duration;
+//import java.util.Collection;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Set;
+//import java.util.function.Consumer;
+//import java.util.stream.Collectors;
+//import java.util.stream.Stream;
+//
+///**
+// * redis 工具类
+// *
+// * @author jk
+// * @version 3.1.0 新增
+// */
+//@NoArgsConstructor(access = AccessLevel.PRIVATE)
+//@Component
+//@SuppressWarnings(value = {"unchecked", "rawtypes"})
+//public class RedisUtils {
+//
+//    @Autowired
+//    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
+//
+//    /**
+//     * 限流
+//     *
+//     * @param key          限流key
+//     * @param rateType     限流类型
+//     * @param rate         速率
+//     * @param rateInterval 速率间隔
+//     * @return -1 表示失败
+//     */
+//    public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval) {
+//        return rateLimiter(key, rateType, rate, rateInterval, 0);
+//    }
+//
+//    /**
+//     * 限流
+//     *
+//     * @param key          限流key
+//     * @param rateType     限流类型
+//     * @param rate         速率
+//     * @param rateInterval 速率间隔
+//     * @param timeout      超时时间
+//     * @return -1 表示失败
+//     */
+//    public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval, int timeout) {
+//        RRateLimiter rateLimiter = CLIENT.getRateLimiter(key);
+//        rateLimiter.trySetRate(rateType, rate, Duration.ofSeconds(rateInterval), Duration.ofSeconds(timeout));
+//        if (rateLimiter.tryAcquire()) {
+//            return rateLimiter.availablePermits();
+//        } else {
+//            return -1L;
+//        }
+//    }
+//
+//    /**
+//     * 获取客户端实例
+//     */
+//    public static RedissonClient getClient() {
+//        return CLIENT;
+//    }
+//
+//    /**
+//     * 发布通道消息
+//     *
+//     * @param channelKey 通道key
+//     * @param msg        发送数据
+//     * @param consumer   自定义处理
+//     */
+//    public static <T> void publish(String channelKey, T msg, Consumer<T> consumer) {
+//        RTopic topic = CLIENT.getTopic(channelKey);
+//        topic.publish(msg);
+//        consumer.accept(msg);
+//    }
+//
+//    /**
+//     * 发布消息到指定的频道
+//     *
+//     * @param channelKey 通道key
+//     * @param msg        发送数据
+//     */
+//    public static <T> void publish(String channelKey, T msg) {
+//        RTopic topic = CLIENT.getTopic(channelKey);
+//        topic.publish(msg);
+//    }
+//
+//    /**
+//     * 订阅通道接收消息
+//     *
+//     * @param channelKey 通道key
+//     * @param clazz      消息类型
+//     * @param consumer   自定义处理
+//     */
+//    public static <T> void subscribe(String channelKey, Class<T> clazz, Consumer<T> consumer) {
+//        RTopic topic = CLIENT.getTopic(channelKey);
+//        topic.addListener(clazz, (channel, msg) -> consumer.accept(msg));
+//    }
+//
+//    /**
+//     * 缓存基本的对象,Integer、String、实体类等
+//     *
+//     * @param key   缓存的键值
+//     * @param value 缓存的值
+//     */
+//    public static <T> void setCacheObject(final String key, final T value) {
+//        setCacheObject(key, value, false);
+//    }
+//
+//    /**
+//     * 缓存基本的对象,保留当前对象 TTL 有效期
+//     *
+//     * @param key       缓存的键值
+//     * @param value     缓存的值
+//     * @param isSaveTtl 是否保留TTL有效期(例如: set之前ttl剩余90 set之后还是为90)
+//     * @since Redis 6.X 以上使用 setAndKeepTTL 兼容 5.X 方案
+//     */
+//    public static <T> void setCacheObject(final String key, final T value, final boolean isSaveTtl) {
+//        RBucket<T> bucket = CLIENT.getBucket(key);
+//        if (isSaveTtl) {
+//            try {
+//                bucket.setAndKeepTTL(value);
+//            } catch (Exception e) {
+//                long timeToLive = bucket.remainTimeToLive();
+//                if (timeToLive == -1) {
+//                    bucket.set(value);
+//                } else {
+//                    bucket.set(value, Duration.ofMillis(timeToLive));
+//                }
+//            }
+//        } else {
+//            bucket.set(value);
+//        }
+//    }
+//
+//    /**
+//     * 缓存基本的对象,Integer、String、实体类等
+//     *
+//     * @param key      缓存的键值
+//     * @param value    缓存的值
+//     * @param duration 时间
+//     */
+//    public static <T> void setCacheObject(final String key, final T value, final Duration duration) {
+//        RBucket<T> bucket = CLIENT.getBucket(key);
+//        bucket.set(value, duration);
+//    }
+//
+//    /**
+//     * 如果不存在则设置 并返回 true 如果存在则返回 false
+//     *
+//     * @param key   缓存的键值
+//     * @param value 缓存的值
+//     * @return set成功或失败
+//     */
+//    public static <T> boolean setObjectIfAbsent(final String key, final T value, final Duration duration) {
+//        RBucket<T> bucket = CLIENT.getBucket(key);
+//        return bucket.setIfAbsent(value, duration);
+//    }
+//
+//    /**
+//     * 如果存在则设置 并返回 true 如果存在则返回 false
+//     *
+//     * @param key   缓存的键值
+//     * @param value 缓存的值
+//     * @return set成功或失败
+//     */
+//    public static <T> boolean setObjectIfExists(final String key, final T value, final Duration duration) {
+//        RBucket<T> bucket = CLIENT.getBucket(key);
+//        return bucket.setIfExists(value, duration);
+//    }
+//
+//    /**
+//     * 注册对象监听器
+//     * <p>
+//     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
+//     *
+//     * @param key      缓存的键值
+//     * @param listener 监听器配置
+//     */
+//    public static <T> void addObjectListener(final String key, final ObjectListener listener) {
+//        RBucket<T> result = CLIENT.getBucket(key);
+//        result.addListener(listener);
+//    }
+//
+//    /**
+//     * 设置有效时间
+//     *
+//     * @param key     Redis键
+//     * @param timeout 超时时间
+//     * @return true=设置成功;false=设置失败
+//     */
+//    public static boolean expire(final String key, final long timeout) {
+//        return expire(key, Duration.ofSeconds(timeout));
+//    }
+//
+//    /**
+//     * 设置有效时间
+//     *
+//     * @param key      Redis键
+//     * @param duration 超时时间
+//     * @return true=设置成功;false=设置失败
+//     */
+//    public static boolean expire(final String key, final Duration duration) {
+//        RBucket rBucket = CLIENT.getBucket(key);
+//        return rBucket.expire(duration);
+//    }
+//
+//    /**
+//     * 获得缓存的基本对象。
+//     *
+//     * @param key 缓存键值
+//     * @return 缓存键值对应的数据
+//     */
+//    public static <T> T getCacheObject(final String key) {
+//        RBucket<T> rBucket = CLIENT.getBucket(key);
+//        return rBucket.get();
+//    }
+//
+//    /**
+//     * 获得key剩余存活时间
+//     *
+//     * @param key 缓存键值
+//     * @return 剩余存活时间
+//     */
+//    public static <T> long getTimeToLive(final String key) {
+//        RBucket<T> rBucket = CLIENT.getBucket(key);
+//        return rBucket.remainTimeToLive();
+//    }
+//
+//    /**
+//     * 删除单个对象
+//     *
+//     * @param key 缓存的键值
+//     */
+//    public static boolean deleteObject(final String key) {
+//        return CLIENT.getBucket(key).delete();
+//    }
+//
+//    /**
+//     * 删除集合对象
+//     *
+//     * @param collection 多个对象
+//     */
+//    public static void deleteObject(final Collection collection) {
+//        RBatch batch = CLIENT.createBatch();
+//        collection.forEach(t -> {
+//            batch.getBucket(t.toString()).deleteAsync();
+//        });
+//        batch.execute();
+//    }
+//
+//    /**
+//     * 检查缓存对象是否存在
+//     *
+//     * @param key 缓存的键值
+//     */
+//    public static boolean isExistsObject(final String key) {
+//        return CLIENT.getBucket(key).isExists();
+//    }
+//
+//    /**
+//     * 缓存List数据
+//     *
+//     * @param key      缓存的键值
+//     * @param dataList 待缓存的List数据
+//     * @return 缓存的对象
+//     */
+//    public static <T> boolean setCacheList(final String key, final List<T> dataList) {
+//        RList<T> rList = CLIENT.getList(key);
+//        return rList.addAll(dataList);
+//    }
+//
+//    /**
+//     * 追加缓存List数据
+//     *
+//     * @param key  缓存的键值
+//     * @param data 待缓存的数据
+//     * @return 缓存的对象
+//     */
+//    public static <T> boolean addCacheList(final String key, final T data) {
+//        RList<T> rList = CLIENT.getList(key);
+//        return rList.add(data);
+//    }
+//
+//    /**
+//     * 注册List监听器
+//     * <p>
+//     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
+//     *
+//     * @param key      缓存的键值
+//     * @param listener 监听器配置
+//     */
+//    public static <T> void addListListener(final String key, final ObjectListener listener) {
+//        RList<T> rList = CLIENT.getList(key);
+//        rList.addListener(listener);
+//    }
+//
+//    /**
+//     * 获得缓存的list对象
+//     *
+//     * @param key 缓存的键值
+//     * @return 缓存键值对应的数据
+//     */
+//    public static <T> List<T> getCacheList(final String key) {
+//        RList<T> rList = CLIENT.getList(key);
+//        return rList.readAll();
+//    }
+//
+//    /**
+//     * 获得缓存的list对象(范围)
+//     *
+//     * @param key  缓存的键值
+//     * @param form 起始下标
+//     * @param to   截止下标
+//     * @return 缓存键值对应的数据
+//     */
+//    public static <T> List<T> getCacheListRange(final String key, int form, int to) {
+//        RList<T> rList = CLIENT.getList(key);
+//        return rList.range(form, to);
+//    }
+//
+//    /**
+//     * 缓存Set
+//     *
+//     * @param key     缓存键值
+//     * @param dataSet 缓存的数据
+//     * @return 缓存数据的对象
+//     */
+//    public static <T> boolean setCacheSet(final String key, final Set<T> dataSet) {
+//        RSet<T> rSet = CLIENT.getSet(key);
+//        return rSet.addAll(dataSet);
+//    }
+//
+//    /**
+//     * 追加缓存Set数据
+//     *
+//     * @param key  缓存的键值
+//     * @param data 待缓存的数据
+//     * @return 缓存的对象
+//     */
+//    public static <T> boolean addCacheSet(final String key, final T data) {
+//        RSet<T> rSet = CLIENT.getSet(key);
+//        return rSet.add(data);
+//    }
+//
+//    /**
+//     * 注册Set监听器
+//     * <p>
+//     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
+//     *
+//     * @param key      缓存的键值
+//     * @param listener 监听器配置
+//     */
+//    public static <T> void addSetListener(final String key, final ObjectListener listener) {
+//        RSet<T> rSet = CLIENT.getSet(key);
+//        rSet.addListener(listener);
+//    }
+//
+//    /**
+//     * 获得缓存的set
+//     *
+//     * @param key 缓存的key
+//     * @return set对象
+//     */
+//    public static <T> Set<T> getCacheSet(final String key) {
+//        RSet<T> rSet = CLIENT.getSet(key);
+//        return rSet.readAll();
+//    }
+//
+//    /**
+//     * 缓存Map
+//     *
+//     * @param key     缓存的键值
+//     * @param dataMap 缓存的数据
+//     */
+//    public static <T> void setCacheMap(final String key, final Map<String, T> dataMap) {
+//        if (dataMap != null) {
+//            RMap<String, T> rMap = CLIENT.getMap(key);
+//            rMap.putAll(dataMap);
+//        }
+//    }
+//
+//    /**
+//     * 注册Map监听器
+//     * <p>
+//     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
+//     *
+//     * @param key      缓存的键值
+//     * @param listener 监听器配置
+//     */
+//    public static <T> void addMapListener(final String key, final ObjectListener listener) {
+//        RMap<String, T> rMap = CLIENT.getMap(key);
+//        rMap.addListener(listener);
+//    }
+//
+//    /**
+//     * 获得缓存的Map
+//     *
+//     * @param key 缓存的键值
+//     * @return map对象
+//     */
+//    public static <T> Map<String, T> getCacheMap(final String key) {
+//        RMap<String, T> rMap = CLIENT.getMap(key);
+//        return rMap.getAll(rMap.keySet());
+//    }
+//
+//    /**
+//     * 获得缓存Map的key列表
+//     *
+//     * @param key 缓存的键值
+//     * @return key列表
+//     */
+//    public static <T> Set<String> getCacheMapKeySet(final String key) {
+//        RMap<String, T> rMap = CLIENT.getMap(key);
+//        return rMap.keySet();
+//    }
+//
+//    /**
+//     * 往Hash中存入数据
+//     *
+//     * @param key   Redis键
+//     * @param hKey  Hash键
+//     * @param value 值
+//     */
+//    public static <T> void setCacheMapValue(final String key, final String hKey, final T value) {
+//        RMap<String, T> rMap = CLIENT.getMap(key);
+//        rMap.put(hKey, value);
+//    }
+//
+//    /**
+//     * 获取Hash中的数据
+//     *
+//     * @param key  Redis键
+//     * @param hKey Hash键
+//     * @return Hash中的对象
+//     */
+//    public static <T> T getCacheMapValue(final String key, final String hKey) {
+//        RMap<String, T> rMap = CLIENT.getMap(key);
+//        return rMap.get(hKey);
+//    }
+//
+//    /**
+//     * 删除Hash中的数据
+//     *
+//     * @param key  Redis键
+//     * @param hKey Hash键
+//     * @return Hash中的对象
+//     */
+//    public static <T> T delCacheMapValue(final String key, final String hKey) {
+//        RMap<String, T> rMap = CLIENT.getMap(key);
+//        return rMap.remove(hKey);
+//    }
+//
+//    /**
+//     * 删除Hash中的数据
+//     *
+//     * @param key   Redis键
+//     * @param hKeys Hash键
+//     */
+//    public static <T> void delMultiCacheMapValue(final String key, final Set<String> hKeys) {
+//        RBatch batch = CLIENT.createBatch();
+//        RMapAsync<String, T> rMap = batch.getMap(key);
+//        for (String hKey : hKeys) {
+//            rMap.removeAsync(hKey);
+//        }
+//        batch.execute();
+//    }
+//
+//    /**
+//     * 获取多个Hash中的数据
+//     *
+//     * @param key   Redis键
+//     * @param hKeys Hash键集合
+//     * @return Hash对象集合
+//     */
+//    public static <K, V> Map<K, V> getMultiCacheMapValue(final String key, final Set<K> hKeys) {
+//        RMap<K, V> rMap = CLIENT.getMap(key);
+//        return rMap.getAll(hKeys);
+//    }
+//
+//    /**
+//     * 设置原子值
+//     *
+//     * @param key   Redis键
+//     * @param value 值
+//     */
+//    public static void setAtomicValue(String key, long value) {
+//        RAtomicLong atomic = CLIENT.getAtomicLong(key);
+//        atomic.set(value);
+//    }
+//
+//    /**
+//     * 获取原子值
+//     *
+//     * @param key Redis键
+//     * @return 当前值
+//     */
+//    public static long getAtomicValue(String key) {
+//        RAtomicLong atomic = CLIENT.getAtomicLong(key);
+//        return atomic.get();
+//    }
+//
+//    /**
+//     * 递增原子值
+//     *
+//     * @param key Redis键
+//     * @return 当前值
+//     */
+//    public static long incrAtomicValue(String key) {
+//        RAtomicLong atomic = CLIENT.getAtomicLong(key);
+//        return atomic.incrementAndGet();
+//    }
+//
+//    /**
+//     * 递减原子值
+//     *
+//     * @param key Redis键
+//     * @return 当前值
+//     */
+//    public static long decrAtomicValue(String key) {
+//        RAtomicLong atomic = CLIENT.getAtomicLong(key);
+//        return atomic.decrementAndGet();
+//    }
+//
+//    /**
+//     * 获得缓存的基本对象列表(全局匹配忽略租户 自行拼接租户id)
+//     * <P>
+//     * limit-设置扫描的限制数量(默认为0,查询全部)
+//     * pattern-设置键的匹配模式(默认为null)
+//     * chunkSize-设置每次扫描的块大小(默认为0,本方法设置为1000)
+//     * type-设置键的类型(默认为null,查询全部类型)
+//     * </P>
+//     * @see KeysScanOptions
+//     * @param pattern 字符串前缀
+//     * @return 对象列表
+//     */
+//    public static Collection<String> keys(final String pattern) {
+//        return  keys(KeysScanOptions.defaults().pattern(pattern).chunkSize(1000));
+//    }
+//
+//    /**
+//     * 通过扫描参数获取缓存的基本对象列表
+//     * @param keysScanOptions 扫描参数
+//     * <P>
+//     * limit-设置扫描的限制数量(默认为0,查询全部)
+//     * pattern-设置键的匹配模式(默认为null)
+//     * chunkSize-设置每次扫描的块大小(默认为0)
+//     * type-设置键的类型(默认为null,查询全部类型)
+//     * </P>
+//     * @see KeysScanOptions
+//     */
+//    public static Collection<String> keys(final KeysScanOptions keysScanOptions) {
+//        Stream<String> keysStream = CLIENT.getKeys().getKeysStream(keysScanOptions);
+//        return keysStream.collect(Collectors.toList());
+//    }
+//
+//    /**
+//     * 删除缓存的基本对象列表(全局匹配忽略租户 自行拼接租户id)
+//     *
+//     * @param pattern 字符串前缀
+//     */
+//    public static void deleteKeys(final String pattern) {
+//        CLIENT.getKeys().deleteByPattern(pattern);
+//    }
+//
+//    /**
+//     * 检查redis中是否存在key
+//     *
+//     * @param key 键
+//     */
+//    public static Boolean hasKey(String key) {
+//        RKeys rKeys = CLIENT.getKeys();
+//        return rKeys.countExists(key) > 0;
+//    }
+//}

+ 234 - 0
takai-common/src/main/java/com/takai/common/utils/SseEmitterManager.java

@@ -0,0 +1,234 @@
+package com.takai.common.utils;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.map.MapUtil;
+import com.takai.common.core.domain.SseMessageDto;
+import com.takai.common.core.redis.RedisCache;
+import com.takai.common.utils.spring.SpringUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * 管理 Server-Sent Events (SSE) 连接
+ *
+ * @author jk
+ */
+@Slf4j
+@Component
+public class SseEmitterManager {
+
+    /**
+     * 订阅的频道
+     */
+    private final static String SSE_TOPIC = "global:sse";
+
+    private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
+
+    @Autowired
+    private RedisCache redisCache;
+    public SseEmitterManager() {
+        // 定时执行 SSE 心跳检测
+        SpringUtils.getBean(ScheduledExecutorService.class)
+            .scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS);
+    }
+
+    /**
+     * 建立与指定用户的 SSE 连接
+     *
+     * @param userId 用户的唯一标识符,用于区分不同用户的连接
+     * @param token  用户的唯一令牌,用于识别具体的连接
+     * @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件
+     */
+    public SseEmitter connect(Long userId, String token) {
+        // 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)
+        // 每个用户可以有多个 SSE 连接,通过 token 进行区分
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+
+        // 关闭已存在的SseEmitter,防止超过最大连接数
+        SseEmitter oldEmitter = emitters.remove(token);
+        if (oldEmitter != null) {
+            oldEmitter.complete();
+        }
+
+        // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞
+        SseEmitter emitter = new SseEmitter(86400000L);
+
+        emitters.put(token, emitter);
+
+        // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
+        emitter.onCompletion(() -> {
+            SseEmitter remove = emitters.remove(token);
+            if (remove != null) {
+                remove.complete();
+            }
+        });
+        emitter.onTimeout(() -> {
+            SseEmitter remove = emitters.remove(token);
+            if (remove != null) {
+                remove.complete();
+            }
+        });
+        emitter.onError((e) -> {
+            SseEmitter remove = emitters.remove(token);
+            if (remove != null) {
+                remove.complete();
+            }
+        });
+
+        try {
+            // 向客户端发送一条连接成功的事件
+            emitter.send(SseEmitter.event().comment("connected"));
+        } catch (IOException e) {
+            // 如果发送消息失败,则从映射表中移除 emitter
+            emitters.remove(token);
+        }
+        return emitter;
+    }
+
+    /**
+     * 断开指定用户的 SSE 连接
+     *
+     * @param userId 用户的唯一标识符,用于区分不同用户的连接
+     * @param token  用户的唯一令牌,用于识别具体的连接
+     */
+    public void disconnect(Long userId, String token) {
+        if (userId == null || token == null) {
+            return;
+        }
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+        if (MapUtil.isNotEmpty(emitters)) {
+            try {
+                SseEmitter sseEmitter = emitters.get(token);
+                sseEmitter.send(SseEmitter.event().comment("disconnected"));
+                sseEmitter.complete();
+            } catch (Exception ignore) {
+            }
+            emitters.remove(token);
+        } else {
+            USER_TOKEN_EMITTERS.remove(userId);
+        }
+    }
+
+    /**
+     * SSE 心跳检测,关闭无效连接
+     */
+    public void sseMonitor() {
+        final SseEmitter.SseEventBuilder heartbeat = SseEmitter.event().comment("heartbeat");
+        // 记录需要移除的用户ID
+        List<Long> toRemoveUsers = new ArrayList<>();
+
+        USER_TOKEN_EMITTERS.forEach((userId, emitterMap) -> {
+            if (CollUtil.isEmpty(emitterMap)) {
+                toRemoveUsers.add(userId);
+                return;
+            }
+
+            emitterMap.entrySet().removeIf(entry -> {
+                try {
+                    entry.getValue().send(heartbeat);
+                    return false;
+                } catch (Exception ex) {
+                    try {
+                        entry.getValue().complete();
+                    } catch (Exception ignore) {
+                        // 忽略重复关闭异常
+                    }
+                    return true; // 发送失败 → 移除该连接
+                }
+            });
+
+            // 移除空连接用户
+            if (emitterMap.isEmpty()) {
+                toRemoveUsers.add(userId);
+            }
+        });
+
+        // 循环结束后统一清理空用户,避免并发修改异常
+        toRemoveUsers.forEach(USER_TOKEN_EMITTERS::remove);
+    }
+
+    /**
+     * 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息
+     *
+     * @param consumer 处理SSE消息的消费者函数
+     */
+    public void subscribeMessage(Consumer<SseMessageDto> consumer) {
+        redisCache.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
+    }
+
+    /**
+     * 向指定的用户会话发送消息
+     *
+     * @param userId  要发送消息的用户id
+     * @param message 要发送的消息内容
+     */
+    public void sendMessage(Long userId, String message) {
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+        if (MapUtil.isNotEmpty(emitters)) {
+            for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
+                try {
+                    entry.getValue().send(SseEmitter.event()
+                        .name("message")
+                        .data(message));
+                } catch (Exception e) {
+                    SseEmitter remove = emitters.remove(entry.getKey());
+                    if (remove != null) {
+                        remove.complete();
+                    }
+                }
+            }
+        } else {
+            USER_TOKEN_EMITTERS.remove(userId);
+        }
+    }
+
+    /**
+     * 本机全用户会话发送消息
+     *
+     * @param message 要发送的消息内容
+     */
+    public void sendMessage(String message) {
+        for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
+            sendMessage(userId, message);
+        }
+    }
+
+    /**
+     * 发布SSE订阅消息
+     *
+     * @param sseMessageDto 要发布的SSE消息对象
+     */
+    public void publishMessage(SseMessageDto sseMessageDto) {
+        SseMessageDto broadcastMessage = new SseMessageDto();
+        broadcastMessage.setMessage(sseMessageDto.getMessage());
+        broadcastMessage.setUserIds(sseMessageDto.getUserIds());
+        redisCache.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+            log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
+                SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
+        });
+    }
+
+    /**
+     * 向所有的用户发布订阅的消息(群发)
+     *
+     * @param message 要发布的消息内容
+     */
+    public void publishAll(String message) {
+        SseMessageDto broadcastMessage = new SseMessageDto();
+        broadcastMessage.setMessage(message);
+        redisCache.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+            log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
+        });
+    }
+}

+ 83 - 0
takai-common/src/main/java/com/takai/common/utils/SseMessageUtils.java

@@ -0,0 +1,83 @@
+package com.takai.common.utils;
+
+import com.takai.common.core.domain.SseMessageDto;
+import com.takai.common.utils.spring.SpringUtils;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * SSE工具类
+ *
+ * @author jk
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class SseMessageUtils {
+
+    private final static Boolean SSE_ENABLE = true;
+    private static SseEmitterManager MANAGER;
+
+    static {
+        if (isEnable() && MANAGER == null) {
+            MANAGER = SpringUtils.getBean(SseEmitterManager.class);
+        }
+    }
+
+    /**
+     * 向指定的SSE会话发送消息
+     *
+     * @param userId  要发送消息的用户id
+     * @param message 要发送的消息内容
+     */
+    public static void sendMessage(Long userId, String message) {
+        if (!isEnable()) {
+            return;
+        }
+        MANAGER.sendMessage(userId, message);
+    }
+
+    /**
+     * 本机全用户会话发送消息
+     *
+     * @param message 要发送的消息内容
+     */
+    public static void sendMessage(String message) {
+        if (!isEnable()) {
+            return;
+        }
+        MANAGER.sendMessage(message);
+    }
+
+    /**
+     * 发布SSE订阅消息
+     *
+     * @param sseMessageDto 要发布的SSE消息对象
+     */
+    public static void publishMessage(SseMessageDto sseMessageDto) {
+        if (!isEnable()) {
+            return;
+        }
+        MANAGER.publishMessage(sseMessageDto);
+    }
+
+    /**
+     * 向所有的用户发布订阅的消息(群发)
+     *
+     * @param message 要发布的消息内容
+     */
+    public static void publishAll(String message) {
+        if (!isEnable()) {
+            return;
+        }
+        MANAGER.publishAll(message);
+    }
+
+    /**
+     * 是否开启
+     */
+    public static Boolean isEnable() {
+        return SSE_ENABLE;
+    }
+
+}

+ 1 - 1
takai-framework/src/main/java/com/takai/framework/config/SecurityConfig.java

@@ -116,7 +116,7 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter
                         "/checkToken/**", "/**/presets/**", "/**/index/**", "/**/createApplaction/**", "/**/createKnowledge/**",
                         "/**/updateKnowledge/**", "/**/detailKnowledge/**", "/**/delKnowledge/**", "/**/knowledgeList/**",
                         "/**//bigmodel/api/**", "/**/tool/gen/**","/**/deepseek/api/chat","/**/deepseek/api/free/**",
-                        "/admin/login", "/**/deepseek/getInfo/**", "/**/system/info/**", "/**/system/audit/config/**",
+                        "/admin/login", "/**/deepseek/getInfo/**", "/**/system/info/**", "/**/system/audit/config/**","/resource/**",
                         "/**/system/project/**").permitAll()
                 // 静态资源,可匿名访问
                 .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll()