DataHandler.js 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const Command_1 = require("./Command");
  4. const utils_1 = require("./utils");
  5. const RedisParser = require("redis-parser");
  6. const SubscriptionSet_1 = require("./SubscriptionSet");
  7. const debug = (0, utils_1.Debug)("dataHandler");
  8. class DataHandler {
  9. constructor(redis, parserOptions) {
  10. this.redis = redis;
  11. const parser = new RedisParser({
  12. stringNumbers: parserOptions.stringNumbers,
  13. returnBuffers: true,
  14. returnError: (err) => {
  15. this.returnError(err);
  16. },
  17. returnFatalError: (err) => {
  18. this.returnFatalError(err);
  19. },
  20. returnReply: (reply) => {
  21. this.returnReply(reply);
  22. },
  23. });
  24. // prependListener ensures the parser receives and processes data before socket timeout checks are performed
  25. redis.stream.prependListener("data", (data) => {
  26. parser.execute(data);
  27. });
  28. // prependListener() doesn't enable flowing mode automatically - we need to resume the stream manually
  29. redis.stream.resume();
  30. }
  31. returnFatalError(err) {
  32. err.message += ". Please report this.";
  33. this.redis.recoverFromFatalError(err, err, { offlineQueue: false });
  34. }
  35. returnError(err) {
  36. const item = this.shiftCommand(err);
  37. if (!item) {
  38. return;
  39. }
  40. err.command = {
  41. name: item.command.name,
  42. args: item.command.args,
  43. };
  44. if (item.command.name == "ssubscribe" && err.message.includes("MOVED")) {
  45. this.redis.emit("moved");
  46. return;
  47. }
  48. this.redis.handleReconnection(err, item);
  49. }
  50. returnReply(reply) {
  51. if (this.handleMonitorReply(reply)) {
  52. return;
  53. }
  54. if (this.handleSubscriberReply(reply)) {
  55. return;
  56. }
  57. const item = this.shiftCommand(reply);
  58. if (!item) {
  59. return;
  60. }
  61. if (Command_1.default.checkFlag("ENTER_SUBSCRIBER_MODE", item.command.name)) {
  62. this.redis.condition.subscriber = new SubscriptionSet_1.default();
  63. this.redis.condition.subscriber.add(item.command.name, reply[1].toString());
  64. if (!fillSubCommand(item.command, reply[2])) {
  65. this.redis.commandQueue.unshift(item);
  66. }
  67. }
  68. else if (Command_1.default.checkFlag("EXIT_SUBSCRIBER_MODE", item.command.name)) {
  69. if (!fillUnsubCommand(item.command, reply[2])) {
  70. this.redis.commandQueue.unshift(item);
  71. }
  72. }
  73. else {
  74. item.command.resolve(reply);
  75. }
  76. }
  77. handleSubscriberReply(reply) {
  78. if (!this.redis.condition.subscriber) {
  79. return false;
  80. }
  81. const replyType = Array.isArray(reply) ? reply[0].toString() : null;
  82. debug('receive reply "%s" in subscriber mode', replyType);
  83. switch (replyType) {
  84. case "message":
  85. if (this.redis.listeners("message").length > 0) {
  86. // Check if there're listeners to avoid unnecessary `toString()`.
  87. this.redis.emit("message", reply[1].toString(), reply[2] ? reply[2].toString() : "");
  88. }
  89. this.redis.emit("messageBuffer", reply[1], reply[2]);
  90. break;
  91. case "pmessage": {
  92. const pattern = reply[1].toString();
  93. if (this.redis.listeners("pmessage").length > 0) {
  94. this.redis.emit("pmessage", pattern, reply[2].toString(), reply[3].toString());
  95. }
  96. this.redis.emit("pmessageBuffer", pattern, reply[2], reply[3]);
  97. break;
  98. }
  99. case "smessage": {
  100. if (this.redis.listeners("smessage").length > 0) {
  101. this.redis.emit("smessage", reply[1].toString(), reply[2] ? reply[2].toString() : "");
  102. }
  103. this.redis.emit("smessageBuffer", reply[1], reply[2]);
  104. break;
  105. }
  106. case "ssubscribe":
  107. case "subscribe":
  108. case "psubscribe": {
  109. const channel = reply[1].toString();
  110. this.redis.condition.subscriber.add(replyType, channel);
  111. const item = this.shiftCommand(reply);
  112. if (!item) {
  113. return;
  114. }
  115. if (!fillSubCommand(item.command, reply[2])) {
  116. this.redis.commandQueue.unshift(item);
  117. }
  118. break;
  119. }
  120. case "sunsubscribe":
  121. case "unsubscribe":
  122. case "punsubscribe": {
  123. const channel = reply[1] ? reply[1].toString() : null;
  124. if (channel) {
  125. this.redis.condition.subscriber.del(replyType, channel);
  126. }
  127. const count = reply[2];
  128. if (Number(count) === 0) {
  129. this.redis.condition.subscriber = false;
  130. }
  131. const item = this.shiftCommand(reply);
  132. if (!item) {
  133. return;
  134. }
  135. if (!fillUnsubCommand(item.command, count)) {
  136. this.redis.commandQueue.unshift(item);
  137. }
  138. break;
  139. }
  140. default: {
  141. const item = this.shiftCommand(reply);
  142. if (!item) {
  143. return;
  144. }
  145. item.command.resolve(reply);
  146. }
  147. }
  148. return true;
  149. }
  150. handleMonitorReply(reply) {
  151. if (this.redis.status !== "monitoring") {
  152. return false;
  153. }
  154. const replyStr = reply.toString();
  155. if (replyStr === "OK") {
  156. // Valid commands in the monitoring mode are AUTH and MONITOR,
  157. // both of which always reply with 'OK'.
  158. // So if we got an 'OK', we can make certain that
  159. // the reply is made to AUTH & MONITOR.
  160. return false;
  161. }
  162. // Since commands sent in the monitoring mode will trigger an exception,
  163. // any replies we received in the monitoring mode should consider to be
  164. // realtime monitor data instead of result of commands.
  165. const len = replyStr.indexOf(" ");
  166. const timestamp = replyStr.slice(0, len);
  167. const argIndex = replyStr.indexOf('"');
  168. const args = replyStr
  169. .slice(argIndex + 1, -1)
  170. .split('" "')
  171. .map((elem) => elem.replace(/\\"/g, '"'));
  172. const dbAndSource = replyStr.slice(len + 2, argIndex - 2).split(" ");
  173. this.redis.emit("monitor", timestamp, args, dbAndSource[1], dbAndSource[0]);
  174. return true;
  175. }
  176. shiftCommand(reply) {
  177. const item = this.redis.commandQueue.shift();
  178. if (!item) {
  179. const message = "Command queue state error. If you can reproduce this, please report it.";
  180. const error = new Error(message +
  181. (reply instanceof Error
  182. ? ` Last error: ${reply.message}`
  183. : ` Last reply: ${reply.toString()}`));
  184. this.redis.emit("error", error);
  185. return null;
  186. }
  187. return item;
  188. }
  189. }
  190. exports.default = DataHandler;
  191. const remainingRepliesMap = new WeakMap();
  192. function fillSubCommand(command, count) {
  193. let remainingReplies = remainingRepliesMap.has(command)
  194. ? remainingRepliesMap.get(command)
  195. : command.args.length;
  196. remainingReplies -= 1;
  197. if (remainingReplies <= 0) {
  198. command.resolve(count);
  199. remainingRepliesMap.delete(command);
  200. return true;
  201. }
  202. remainingRepliesMap.set(command, remainingReplies);
  203. return false;
  204. }
  205. function fillUnsubCommand(command, count) {
  206. let remainingReplies = remainingRepliesMap.has(command)
  207. ? remainingRepliesMap.get(command)
  208. : command.args.length;
  209. if (remainingReplies === 0) {
  210. if (Number(count) === 0) {
  211. remainingRepliesMap.delete(command);
  212. command.resolve(count);
  213. return true;
  214. }
  215. return false;
  216. }
  217. remainingReplies -= 1;
  218. if (remainingReplies <= 0) {
  219. command.resolve(count);
  220. return true;
  221. }
  222. remainingRepliesMap.set(command, remainingReplies);
  223. return false;
  224. }