client.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. const nodeUtils = require('util')
  2. // eslint-disable-next-line
  3. var Native
  4. // eslint-disable-next-line no-useless-catch
  5. try {
  6. // Wrap this `require()` in a try-catch to avoid upstream bundlers from complaining that this might not be available since it is an optional import
  7. Native = require('pg-native')
  8. } catch (e) {
  9. throw e
  10. }
  11. const TypeOverrides = require('../type-overrides')
  12. const EventEmitter = require('events').EventEmitter
  13. const util = require('util')
  14. const ConnectionParameters = require('../connection-parameters')
  15. const NativeQuery = require('./query')
  16. const queryQueueLengthDeprecationNotice = nodeUtils.deprecate(
  17. () => {},
  18. 'Calling client.query() when the client is already executing a query is deprecated and will be removed in pg@9.0. Use async/await or an external async flow control mechanism instead.'
  19. )
  20. const Client = (module.exports = function (config) {
  21. EventEmitter.call(this)
  22. config = config || {}
  23. this._Promise = config.Promise || global.Promise
  24. this._types = new TypeOverrides(config.types)
  25. this.native = new Native({
  26. types: this._types,
  27. })
  28. this._queryQueue = []
  29. this._ending = false
  30. this._connecting = false
  31. this._connected = false
  32. this._queryable = true
  33. // keep these on the object for legacy reasons
  34. // for the time being. TODO: deprecate all this jazz
  35. const cp = (this.connectionParameters = new ConnectionParameters(config))
  36. if (config.nativeConnectionString) cp.nativeConnectionString = config.nativeConnectionString
  37. this.user = cp.user
  38. // "hiding" the password so it doesn't show up in stack traces
  39. // or if the client is console.logged
  40. Object.defineProperty(this, 'password', {
  41. configurable: true,
  42. enumerable: false,
  43. writable: true,
  44. value: cp.password,
  45. })
  46. this.database = cp.database
  47. this.host = cp.host
  48. this.port = cp.port
  49. // a hash to hold named queries
  50. this.namedQueries = {}
  51. })
  52. Client.Query = NativeQuery
  53. util.inherits(Client, EventEmitter)
  54. Client.prototype._errorAllQueries = function (err) {
  55. const enqueueError = (query) => {
  56. process.nextTick(() => {
  57. query.native = this.native
  58. query.handleError(err)
  59. })
  60. }
  61. if (this._hasActiveQuery()) {
  62. enqueueError(this._activeQuery)
  63. this._activeQuery = null
  64. }
  65. this._queryQueue.forEach(enqueueError)
  66. this._queryQueue.length = 0
  67. }
  68. // connect to the backend
  69. // pass an optional callback to be called once connected
  70. // or with an error if there was a connection error
  71. Client.prototype._connect = function (cb) {
  72. const self = this
  73. if (this._connecting) {
  74. process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
  75. return
  76. }
  77. this._connecting = true
  78. this.connectionParameters.getLibpqConnectionString(function (err, conString) {
  79. if (self.connectionParameters.nativeConnectionString) conString = self.connectionParameters.nativeConnectionString
  80. if (err) return cb(err)
  81. self.native.connect(conString, function (err) {
  82. if (err) {
  83. self.native.end()
  84. return cb(err)
  85. }
  86. // set internal states to connected
  87. self._connected = true
  88. // handle connection errors from the native layer
  89. self.native.on('error', function (err) {
  90. self._queryable = false
  91. self._errorAllQueries(err)
  92. self.emit('error', err)
  93. })
  94. self.native.on('notification', function (msg) {
  95. self.emit('notification', {
  96. channel: msg.relname,
  97. payload: msg.extra,
  98. })
  99. })
  100. // signal we are connected now
  101. self.emit('connect')
  102. self._pulseQueryQueue(true)
  103. cb(null, this)
  104. })
  105. })
  106. }
  107. Client.prototype.connect = function (callback) {
  108. if (callback) {
  109. this._connect(callback)
  110. return
  111. }
  112. return new this._Promise((resolve, reject) => {
  113. this._connect((error) => {
  114. if (error) {
  115. reject(error)
  116. } else {
  117. resolve(this)
  118. }
  119. })
  120. })
  121. }
  122. // send a query to the server
  123. // this method is highly overloaded to take
  124. // 1) string query, optional array of parameters, optional function callback
  125. // 2) object query with {
  126. // string query
  127. // optional array values,
  128. // optional function callback instead of as a separate parameter
  129. // optional string name to name & cache the query plan
  130. // optional string rowMode = 'array' for an array of results
  131. // }
  132. Client.prototype.query = function (config, values, callback) {
  133. let query
  134. let result
  135. let readTimeout
  136. let readTimeoutTimer
  137. let queryCallback
  138. if (config === null || config === undefined) {
  139. throw new TypeError('Client was passed a null or undefined query')
  140. } else if (typeof config.submit === 'function') {
  141. readTimeout = config.query_timeout || this.connectionParameters.query_timeout
  142. result = query = config
  143. // accept query(new Query(...), (err, res) => { }) style
  144. if (typeof values === 'function') {
  145. config.callback = values
  146. }
  147. } else {
  148. readTimeout = config.query_timeout || this.connectionParameters.query_timeout
  149. query = new NativeQuery(config, values, callback)
  150. if (!query.callback) {
  151. let resolveOut, rejectOut
  152. result = new this._Promise((resolve, reject) => {
  153. resolveOut = resolve
  154. rejectOut = reject
  155. }).catch((err) => {
  156. Error.captureStackTrace(err)
  157. throw err
  158. })
  159. query.callback = (err, res) => (err ? rejectOut(err) : resolveOut(res))
  160. }
  161. }
  162. if (readTimeout) {
  163. queryCallback = query.callback || (() => {})
  164. readTimeoutTimer = setTimeout(() => {
  165. const error = new Error('Query read timeout')
  166. process.nextTick(() => {
  167. query.handleError(error, this.connection)
  168. })
  169. queryCallback(error)
  170. // we already returned an error,
  171. // just do nothing if query completes
  172. query.callback = () => {}
  173. // Remove from queue
  174. const index = this._queryQueue.indexOf(query)
  175. if (index > -1) {
  176. this._queryQueue.splice(index, 1)
  177. }
  178. this._pulseQueryQueue()
  179. }, readTimeout)
  180. query.callback = (err, res) => {
  181. clearTimeout(readTimeoutTimer)
  182. queryCallback(err, res)
  183. }
  184. }
  185. if (!this._queryable) {
  186. query.native = this.native
  187. process.nextTick(() => {
  188. query.handleError(new Error('Client has encountered a connection error and is not queryable'))
  189. })
  190. return result
  191. }
  192. if (this._ending) {
  193. query.native = this.native
  194. process.nextTick(() => {
  195. query.handleError(new Error('Client was closed and is not queryable'))
  196. })
  197. return result
  198. }
  199. if (this._queryQueue.length > 0) {
  200. queryQueueLengthDeprecationNotice()
  201. }
  202. this._queryQueue.push(query)
  203. this._pulseQueryQueue()
  204. return result
  205. }
  206. // disconnect from the backend server
  207. Client.prototype.end = function (cb) {
  208. const self = this
  209. this._ending = true
  210. if (!this._connected) {
  211. this.once('connect', this.end.bind(this, cb))
  212. }
  213. let result
  214. if (!cb) {
  215. result = new this._Promise(function (resolve, reject) {
  216. cb = (err) => (err ? reject(err) : resolve())
  217. })
  218. }
  219. this.native.end(function () {
  220. self._connected = false
  221. self._errorAllQueries(new Error('Connection terminated'))
  222. process.nextTick(() => {
  223. self.emit('end')
  224. if (cb) cb()
  225. })
  226. })
  227. return result
  228. }
  229. Client.prototype._hasActiveQuery = function () {
  230. return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end'
  231. }
  232. Client.prototype._pulseQueryQueue = function (initialConnection) {
  233. if (!this._connected) {
  234. return
  235. }
  236. if (this._hasActiveQuery()) {
  237. return
  238. }
  239. const query = this._queryQueue.shift()
  240. if (!query) {
  241. if (!initialConnection) {
  242. this.emit('drain')
  243. }
  244. return
  245. }
  246. this._activeQuery = query
  247. query.submit(this)
  248. const self = this
  249. query.once('_done', function () {
  250. self._pulseQueryQueue()
  251. })
  252. }
  253. // attempt to cancel an in-progress query
  254. Client.prototype.cancel = function (query) {
  255. if (this._activeQuery === query) {
  256. this.native.cancel(function () {})
  257. } else if (this._queryQueue.indexOf(query) !== -1) {
  258. this._queryQueue.splice(this._queryQueue.indexOf(query), 1)
  259. }
  260. }
  261. Client.prototype.ref = function () {}
  262. Client.prototype.unref = function () {}
  263. Client.prototype.setTypeParser = function (oid, format, parseFn) {
  264. return this._types.setTypeParser(oid, format, parseFn)
  265. }
  266. Client.prototype.getTypeParser = function (oid, format) {
  267. return this._types.getTypeParser(oid, format)
  268. }
  269. Client.prototype.isConnected = function () {
  270. return this._connected
  271. }