session.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import { once } from "node:events";
  2. import { NoopCache } from "../cache/core/index.js";
  3. import { Column } from "../column.js";
  4. import { entityKind, is } from "../entity.js";
  5. import { NoopLogger } from "../logger.js";
  6. import {
  7. SingleStorePreparedQuery,
  8. SingleStoreSession,
  9. SingleStoreTransaction
  10. } from "../singlestore-core/session.js";
  11. import { fillPlaceholders, sql } from "../sql/sql.js";
  12. import { mapResultRow } from "../utils.js";
  13. class SingleStoreDriverPreparedQuery extends SingleStorePreparedQuery {
  14. constructor(client, queryString, params, logger, cache, queryMetadata, cacheConfig, fields, customResultMapper, generatedIds, returningIds) {
  15. super(cache, queryMetadata, cacheConfig);
  16. this.client = client;
  17. this.params = params;
  18. this.logger = logger;
  19. this.fields = fields;
  20. this.customResultMapper = customResultMapper;
  21. this.generatedIds = generatedIds;
  22. this.returningIds = returningIds;
  23. this.rawQuery = {
  24. sql: queryString,
  25. // rowsAsArray: true,
  26. typeCast: function(field, next) {
  27. if (field.type === "TIMESTAMP" || field.type === "DATETIME" || field.type === "DATE") {
  28. return field.string();
  29. }
  30. return next();
  31. }
  32. };
  33. this.query = {
  34. sql: queryString,
  35. rowsAsArray: true,
  36. typeCast: function(field, next) {
  37. if (field.type === "TIMESTAMP" || field.type === "DATETIME" || field.type === "DATE") {
  38. return field.string();
  39. }
  40. return next();
  41. }
  42. };
  43. }
  44. static [entityKind] = "SingleStoreDriverPreparedQuery";
  45. rawQuery;
  46. query;
  47. async execute(placeholderValues = {}) {
  48. const params = fillPlaceholders(this.params, placeholderValues);
  49. this.logger.logQuery(this.rawQuery.sql, params);
  50. const { fields, client, rawQuery, query, joinsNotNullableMap, customResultMapper, returningIds, generatedIds } = this;
  51. if (!fields && !customResultMapper) {
  52. const res = await this.queryWithCache(rawQuery.sql, params, async () => {
  53. return await client.query(rawQuery, params);
  54. });
  55. const insertId = res[0].insertId;
  56. const affectedRows = res[0].affectedRows;
  57. if (returningIds) {
  58. const returningResponse = [];
  59. let j = 0;
  60. for (let i = insertId; i < insertId + affectedRows; i++) {
  61. for (const column of returningIds) {
  62. const key = returningIds[0].path[0];
  63. if (is(column.field, Column)) {
  64. if (column.field.primary && column.field.autoIncrement) {
  65. returningResponse.push({ [key]: i });
  66. }
  67. if (column.field.defaultFn && generatedIds) {
  68. returningResponse.push({ [key]: generatedIds[j][key] });
  69. }
  70. }
  71. }
  72. j++;
  73. }
  74. return returningResponse;
  75. }
  76. return res;
  77. }
  78. const result = await this.queryWithCache(query.sql, params, async () => {
  79. return await client.query(query, params);
  80. });
  81. const rows = result[0];
  82. if (customResultMapper) {
  83. return customResultMapper(rows);
  84. }
  85. return rows.map((row) => mapResultRow(fields, row, joinsNotNullableMap));
  86. }
  87. async *iterator(placeholderValues = {}) {
  88. const params = fillPlaceholders(this.params, placeholderValues);
  89. const conn = (isPool(this.client) ? await this.client.getConnection() : this.client).connection;
  90. const { fields, query, rawQuery, joinsNotNullableMap, client, customResultMapper } = this;
  91. const hasRowsMapper = Boolean(fields || customResultMapper);
  92. const driverQuery = hasRowsMapper ? conn.query(query, params) : conn.query(rawQuery, params);
  93. const stream = driverQuery.stream();
  94. function dataListener() {
  95. stream.pause();
  96. }
  97. stream.on("data", dataListener);
  98. try {
  99. const onEnd = once(stream, "end");
  100. const onError = once(stream, "error");
  101. while (true) {
  102. stream.resume();
  103. const row = await Promise.race([onEnd, onError, new Promise((resolve) => stream.once("data", resolve))]);
  104. if (row === void 0 || Array.isArray(row) && row.length === 0) {
  105. break;
  106. } else if (row instanceof Error) {
  107. throw row;
  108. } else {
  109. if (hasRowsMapper) {
  110. if (customResultMapper) {
  111. const mappedRow = customResultMapper([row]);
  112. yield Array.isArray(mappedRow) ? mappedRow[0] : mappedRow;
  113. } else {
  114. yield mapResultRow(fields, row, joinsNotNullableMap);
  115. }
  116. } else {
  117. yield row;
  118. }
  119. }
  120. }
  121. } finally {
  122. stream.off("data", dataListener);
  123. if (isPool(client)) {
  124. conn.end();
  125. }
  126. }
  127. }
  128. }
  129. class SingleStoreDriverSession extends SingleStoreSession {
  130. constructor(client, dialect, schema, options) {
  131. super(dialect);
  132. this.client = client;
  133. this.schema = schema;
  134. this.options = options;
  135. this.logger = options.logger ?? new NoopLogger();
  136. this.cache = options.cache ?? new NoopCache();
  137. }
  138. static [entityKind] = "SingleStoreDriverSession";
  139. logger;
  140. cache;
  141. prepareQuery(query, fields, customResultMapper, generatedIds, returningIds, queryMetadata, cacheConfig) {
  142. return new SingleStoreDriverPreparedQuery(
  143. this.client,
  144. query.sql,
  145. query.params,
  146. this.logger,
  147. this.cache,
  148. queryMetadata,
  149. cacheConfig,
  150. fields,
  151. customResultMapper,
  152. generatedIds,
  153. returningIds
  154. );
  155. }
  156. /**
  157. * @internal
  158. * What is its purpose?
  159. */
  160. async query(query, params) {
  161. this.logger.logQuery(query, params);
  162. const result = await this.client.query({
  163. sql: query,
  164. values: params,
  165. rowsAsArray: true,
  166. typeCast: function(field, next) {
  167. if (field.type === "TIMESTAMP" || field.type === "DATETIME" || field.type === "DATE") {
  168. return field.string();
  169. }
  170. return next();
  171. }
  172. });
  173. return result;
  174. }
  175. all(query) {
  176. const querySql = this.dialect.sqlToQuery(query);
  177. this.logger.logQuery(querySql.sql, querySql.params);
  178. return this.client.execute(querySql.sql, querySql.params).then((result) => result[0]);
  179. }
  180. async transaction(transaction, config) {
  181. const session = isPool(this.client) ? new SingleStoreDriverSession(
  182. await this.client.getConnection(),
  183. this.dialect,
  184. this.schema,
  185. this.options
  186. ) : this;
  187. const tx = new SingleStoreDriverTransaction(
  188. this.dialect,
  189. session,
  190. this.schema,
  191. 0
  192. );
  193. if (config) {
  194. const setTransactionConfigSql = this.getSetTransactionSQL(config);
  195. if (setTransactionConfigSql) {
  196. await tx.execute(setTransactionConfigSql);
  197. }
  198. const startTransactionSql = this.getStartTransactionSQL(config);
  199. await (startTransactionSql ? tx.execute(startTransactionSql) : tx.execute(sql`begin`));
  200. } else {
  201. await tx.execute(sql`begin`);
  202. }
  203. try {
  204. const result = await transaction(tx);
  205. await tx.execute(sql`commit`);
  206. return result;
  207. } catch (err) {
  208. await tx.execute(sql`rollback`);
  209. throw err;
  210. } finally {
  211. if (isPool(this.client)) {
  212. session.client.release();
  213. }
  214. }
  215. }
  216. }
  217. class SingleStoreDriverTransaction extends SingleStoreTransaction {
  218. static [entityKind] = "SingleStoreDriverTransaction";
  219. async transaction(transaction) {
  220. const savepointName = `sp${this.nestedIndex + 1}`;
  221. const tx = new SingleStoreDriverTransaction(
  222. this.dialect,
  223. this.session,
  224. this.schema,
  225. this.nestedIndex + 1
  226. );
  227. await tx.execute(sql.raw(`savepoint ${savepointName}`));
  228. try {
  229. const result = await transaction(tx);
  230. await tx.execute(sql.raw(`release savepoint ${savepointName}`));
  231. return result;
  232. } catch (err) {
  233. await tx.execute(sql.raw(`rollback to savepoint ${savepointName}`));
  234. throw err;
  235. }
  236. }
  237. }
  238. function isPool(client) {
  239. return "getConnection" in client;
  240. }
  241. export {
  242. SingleStoreDriverPreparedQuery,
  243. SingleStoreDriverSession,
  244. SingleStoreDriverTransaction
  245. };
  246. //# sourceMappingURL=session.js.map