client.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. const EventEmitter = require('events').EventEmitter
  2. const utils = require('./utils')
  3. const nodeUtils = require('util')
  4. const sasl = require('./crypto/sasl')
  5. const TypeOverrides = require('./type-overrides')
  6. const ConnectionParameters = require('./connection-parameters')
  7. const Query = require('./query')
  8. const defaults = require('./defaults')
  9. const Connection = require('./connection')
  10. const crypto = require('./crypto/utils')
  11. const activeQueryDeprecationNotice = nodeUtils.deprecate(
  12. () => {},
  13. 'Client.activeQuery is deprecated and will be removed in pg@9.0'
  14. )
  15. const queryQueueDeprecationNotice = nodeUtils.deprecate(
  16. () => {},
  17. 'Client.queryQueue is deprecated and will be removed in pg@9.0.'
  18. )
  19. const pgPassDeprecationNotice = nodeUtils.deprecate(
  20. () => {},
  21. 'pgpass support is deprecated and will be removed in pg@9.0. ' +
  22. 'You can provide an async function as the password property to the Client/Pool constructor that returns a password instead. Within this function you can call the pgpass module in your own code.'
  23. )
  24. const byoPromiseDeprecationNotice = nodeUtils.deprecate(
  25. () => {},
  26. 'Passing a custom Promise implementation to the Client/Pool constructor is deprecated and will be removed in pg@9.0.'
  27. )
  28. const queryQueueLengthDeprecationNotice = nodeUtils.deprecate(
  29. () => {},
  30. 'Calling client.query() when the client is already executing a query is deprecated and will be removed in pg@9.0. Use async/await or an external async flow control mechanism instead.'
  31. )
  32. class Client extends EventEmitter {
  33. constructor(config) {
  34. super()
  35. this.connectionParameters = new ConnectionParameters(config)
  36. this.user = this.connectionParameters.user
  37. this.database = this.connectionParameters.database
  38. this.port = this.connectionParameters.port
  39. this.host = this.connectionParameters.host
  40. // "hiding" the password so it doesn't show up in stack traces
  41. // or if the client is console.logged
  42. Object.defineProperty(this, 'password', {
  43. configurable: true,
  44. enumerable: false,
  45. writable: true,
  46. value: this.connectionParameters.password,
  47. })
  48. this.replication = this.connectionParameters.replication
  49. const c = config || {}
  50. if (c.Promise) {
  51. byoPromiseDeprecationNotice()
  52. }
  53. this._Promise = c.Promise || global.Promise
  54. this._types = new TypeOverrides(c.types)
  55. this._ending = false
  56. this._ended = false
  57. this._connecting = false
  58. this._connected = false
  59. this._connectionError = false
  60. this._queryable = true
  61. this._activeQuery = null
  62. this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered
  63. this.connection =
  64. c.connection ||
  65. new Connection({
  66. stream: c.stream,
  67. ssl: this.connectionParameters.ssl,
  68. keepAlive: c.keepAlive || false,
  69. keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
  70. encoding: this.connectionParameters.client_encoding || 'utf8',
  71. })
  72. this._queryQueue = []
  73. this.binary = c.binary || defaults.binary
  74. this.processID = null
  75. this.secretKey = null
  76. this.ssl = this.connectionParameters.ssl || false
  77. // As with Password, make SSL->Key (the private key) non-enumerable.
  78. // It won't show up in stack traces
  79. // or if the client is console.logged
  80. if (this.ssl && this.ssl.key) {
  81. Object.defineProperty(this.ssl, 'key', {
  82. enumerable: false,
  83. })
  84. }
  85. this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0
  86. }
  87. get activeQuery() {
  88. activeQueryDeprecationNotice()
  89. return this._activeQuery
  90. }
  91. set activeQuery(val) {
  92. activeQueryDeprecationNotice()
  93. this._activeQuery = val
  94. }
  95. _getActiveQuery() {
  96. return this._activeQuery
  97. }
  98. _errorAllQueries(err) {
  99. const enqueueError = (query) => {
  100. process.nextTick(() => {
  101. query.handleError(err, this.connection)
  102. })
  103. }
  104. const activeQuery = this._getActiveQuery()
  105. if (activeQuery) {
  106. enqueueError(activeQuery)
  107. this._activeQuery = null
  108. }
  109. this._queryQueue.forEach(enqueueError)
  110. this._queryQueue.length = 0
  111. }
  112. _connect(callback) {
  113. const self = this
  114. const con = this.connection
  115. this._connectionCallback = callback
  116. if (this._connecting || this._connected) {
  117. const err = new Error('Client has already been connected. You cannot reuse a client.')
  118. process.nextTick(() => {
  119. callback(err)
  120. })
  121. return
  122. }
  123. this._connecting = true
  124. if (this._connectionTimeoutMillis > 0) {
  125. this.connectionTimeoutHandle = setTimeout(() => {
  126. con._ending = true
  127. con.stream.destroy(new Error('timeout expired'))
  128. }, this._connectionTimeoutMillis)
  129. if (this.connectionTimeoutHandle.unref) {
  130. this.connectionTimeoutHandle.unref()
  131. }
  132. }
  133. if (this.host && this.host.indexOf('/') === 0) {
  134. con.connect(this.host + '/.s.PGSQL.' + this.port)
  135. } else {
  136. con.connect(this.port, this.host)
  137. }
  138. // once connection is established send startup message
  139. con.on('connect', function () {
  140. if (self.ssl) {
  141. con.requestSsl()
  142. } else {
  143. con.startup(self.getStartupConf())
  144. }
  145. })
  146. con.on('sslconnect', function () {
  147. con.startup(self.getStartupConf())
  148. })
  149. this._attachListeners(con)
  150. con.once('end', () => {
  151. const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly')
  152. clearTimeout(this.connectionTimeoutHandle)
  153. this._errorAllQueries(error)
  154. this._ended = true
  155. if (!this._ending) {
  156. // if the connection is ended without us calling .end()
  157. // on this client then we have an unexpected disconnection
  158. // treat this as an error unless we've already emitted an error
  159. // during connection.
  160. if (this._connecting && !this._connectionError) {
  161. if (this._connectionCallback) {
  162. this._connectionCallback(error)
  163. } else {
  164. this._handleErrorEvent(error)
  165. }
  166. } else if (!this._connectionError) {
  167. this._handleErrorEvent(error)
  168. }
  169. }
  170. process.nextTick(() => {
  171. this.emit('end')
  172. })
  173. })
  174. }
  175. connect(callback) {
  176. if (callback) {
  177. this._connect(callback)
  178. return
  179. }
  180. return new this._Promise((resolve, reject) => {
  181. this._connect((error) => {
  182. if (error) {
  183. reject(error)
  184. } else {
  185. resolve(this)
  186. }
  187. })
  188. })
  189. }
  190. _attachListeners(con) {
  191. // password request handling
  192. con.on('authenticationCleartextPassword', this._handleAuthCleartextPassword.bind(this))
  193. // password request handling
  194. con.on('authenticationMD5Password', this._handleAuthMD5Password.bind(this))
  195. // password request handling (SASL)
  196. con.on('authenticationSASL', this._handleAuthSASL.bind(this))
  197. con.on('authenticationSASLContinue', this._handleAuthSASLContinue.bind(this))
  198. con.on('authenticationSASLFinal', this._handleAuthSASLFinal.bind(this))
  199. con.on('backendKeyData', this._handleBackendKeyData.bind(this))
  200. con.on('error', this._handleErrorEvent.bind(this))
  201. con.on('errorMessage', this._handleErrorMessage.bind(this))
  202. con.on('readyForQuery', this._handleReadyForQuery.bind(this))
  203. con.on('notice', this._handleNotice.bind(this))
  204. con.on('rowDescription', this._handleRowDescription.bind(this))
  205. con.on('dataRow', this._handleDataRow.bind(this))
  206. con.on('portalSuspended', this._handlePortalSuspended.bind(this))
  207. con.on('emptyQuery', this._handleEmptyQuery.bind(this))
  208. con.on('commandComplete', this._handleCommandComplete.bind(this))
  209. con.on('parseComplete', this._handleParseComplete.bind(this))
  210. con.on('copyInResponse', this._handleCopyInResponse.bind(this))
  211. con.on('copyData', this._handleCopyData.bind(this))
  212. con.on('notification', this._handleNotification.bind(this))
  213. }
  214. _getPassword(cb) {
  215. const con = this.connection
  216. if (typeof this.password === 'function') {
  217. this._Promise
  218. .resolve()
  219. .then(() => this.password(this.connectionParameters))
  220. .then((pass) => {
  221. if (pass !== undefined) {
  222. if (typeof pass !== 'string') {
  223. con.emit('error', new TypeError('Password must be a string'))
  224. return
  225. }
  226. this.connectionParameters.password = this.password = pass
  227. } else {
  228. this.connectionParameters.password = this.password = null
  229. }
  230. cb()
  231. })
  232. .catch((err) => {
  233. con.emit('error', err)
  234. })
  235. } else if (this.password !== null) {
  236. cb()
  237. } else {
  238. try {
  239. const pgPass = require('pgpass')
  240. pgPass(this.connectionParameters, (pass) => {
  241. if (undefined !== pass) {
  242. pgPassDeprecationNotice()
  243. this.connectionParameters.password = this.password = pass
  244. }
  245. cb()
  246. })
  247. } catch (e) {
  248. this.emit('error', e)
  249. }
  250. }
  251. }
  252. _handleAuthCleartextPassword(msg) {
  253. this._getPassword(() => {
  254. this.connection.password(this.password)
  255. })
  256. }
  257. _handleAuthMD5Password(msg) {
  258. this._getPassword(async () => {
  259. try {
  260. const hashedPassword = await crypto.postgresMd5PasswordHash(this.user, this.password, msg.salt)
  261. this.connection.password(hashedPassword)
  262. } catch (e) {
  263. this.emit('error', e)
  264. }
  265. })
  266. }
  267. _handleAuthSASL(msg) {
  268. this._getPassword(() => {
  269. try {
  270. this.saslSession = sasl.startSession(msg.mechanisms, this.enableChannelBinding && this.connection.stream)
  271. this.connection.sendSASLInitialResponseMessage(this.saslSession.mechanism, this.saslSession.response)
  272. } catch (err) {
  273. this.connection.emit('error', err)
  274. }
  275. })
  276. }
  277. async _handleAuthSASLContinue(msg) {
  278. try {
  279. await sasl.continueSession(
  280. this.saslSession,
  281. this.password,
  282. msg.data,
  283. this.enableChannelBinding && this.connection.stream
  284. )
  285. this.connection.sendSCRAMClientFinalMessage(this.saslSession.response)
  286. } catch (err) {
  287. this.connection.emit('error', err)
  288. }
  289. }
  290. _handleAuthSASLFinal(msg) {
  291. try {
  292. sasl.finalizeSession(this.saslSession, msg.data)
  293. this.saslSession = null
  294. } catch (err) {
  295. this.connection.emit('error', err)
  296. }
  297. }
  298. _handleBackendKeyData(msg) {
  299. this.processID = msg.processID
  300. this.secretKey = msg.secretKey
  301. }
  302. _handleReadyForQuery(msg) {
  303. if (this._connecting) {
  304. this._connecting = false
  305. this._connected = true
  306. clearTimeout(this.connectionTimeoutHandle)
  307. // process possible callback argument to Client#connect
  308. if (this._connectionCallback) {
  309. this._connectionCallback(null, this)
  310. // remove callback for proper error handling
  311. // after the connect event
  312. this._connectionCallback = null
  313. }
  314. this.emit('connect')
  315. }
  316. const activeQuery = this._getActiveQuery()
  317. this._activeQuery = null
  318. this.readyForQuery = true
  319. if (activeQuery) {
  320. activeQuery.handleReadyForQuery(this.connection)
  321. }
  322. this._pulseQueryQueue()
  323. }
  324. // if we receive an error event or error message
  325. // during the connection process we handle it here
  326. _handleErrorWhileConnecting(err) {
  327. if (this._connectionError) {
  328. // TODO(bmc): this is swallowing errors - we shouldn't do this
  329. return
  330. }
  331. this._connectionError = true
  332. clearTimeout(this.connectionTimeoutHandle)
  333. if (this._connectionCallback) {
  334. return this._connectionCallback(err)
  335. }
  336. this.emit('error', err)
  337. }
  338. // if we're connected and we receive an error event from the connection
  339. // this means the socket is dead - do a hard abort of all queries and emit
  340. // the socket error on the client as well
  341. _handleErrorEvent(err) {
  342. if (this._connecting) {
  343. return this._handleErrorWhileConnecting(err)
  344. }
  345. this._queryable = false
  346. this._errorAllQueries(err)
  347. this.emit('error', err)
  348. }
  349. // handle error messages from the postgres backend
  350. _handleErrorMessage(msg) {
  351. if (this._connecting) {
  352. return this._handleErrorWhileConnecting(msg)
  353. }
  354. const activeQuery = this._getActiveQuery()
  355. if (!activeQuery) {
  356. this._handleErrorEvent(msg)
  357. return
  358. }
  359. this._activeQuery = null
  360. activeQuery.handleError(msg, this.connection)
  361. }
  362. _handleRowDescription(msg) {
  363. const activeQuery = this._getActiveQuery()
  364. if (activeQuery == null) {
  365. const error = new Error('Received unexpected rowDescription message from backend.')
  366. this._handleErrorEvent(error)
  367. return
  368. }
  369. // delegate rowDescription to active query
  370. activeQuery.handleRowDescription(msg)
  371. }
  372. _handleDataRow(msg) {
  373. const activeQuery = this._getActiveQuery()
  374. if (activeQuery == null) {
  375. const error = new Error('Received unexpected dataRow message from backend.')
  376. this._handleErrorEvent(error)
  377. return
  378. }
  379. // delegate dataRow to active query
  380. activeQuery.handleDataRow(msg)
  381. }
  382. _handlePortalSuspended(msg) {
  383. const activeQuery = this._getActiveQuery()
  384. if (activeQuery == null) {
  385. const error = new Error('Received unexpected portalSuspended message from backend.')
  386. this._handleErrorEvent(error)
  387. return
  388. }
  389. // delegate portalSuspended to active query
  390. activeQuery.handlePortalSuspended(this.connection)
  391. }
  392. _handleEmptyQuery(msg) {
  393. const activeQuery = this._getActiveQuery()
  394. if (activeQuery == null) {
  395. const error = new Error('Received unexpected emptyQuery message from backend.')
  396. this._handleErrorEvent(error)
  397. return
  398. }
  399. // delegate emptyQuery to active query
  400. activeQuery.handleEmptyQuery(this.connection)
  401. }
  402. _handleCommandComplete(msg) {
  403. const activeQuery = this._getActiveQuery()
  404. if (activeQuery == null) {
  405. const error = new Error('Received unexpected commandComplete message from backend.')
  406. this._handleErrorEvent(error)
  407. return
  408. }
  409. // delegate commandComplete to active query
  410. activeQuery.handleCommandComplete(msg, this.connection)
  411. }
  412. _handleParseComplete() {
  413. const activeQuery = this._getActiveQuery()
  414. if (activeQuery == null) {
  415. const error = new Error('Received unexpected parseComplete message from backend.')
  416. this._handleErrorEvent(error)
  417. return
  418. }
  419. // if a prepared statement has a name and properly parses
  420. // we track that its already been executed so we don't parse
  421. // it again on the same client
  422. if (activeQuery.name) {
  423. this.connection.parsedStatements[activeQuery.name] = activeQuery.text
  424. }
  425. }
  426. _handleCopyInResponse(msg) {
  427. const activeQuery = this._getActiveQuery()
  428. if (activeQuery == null) {
  429. const error = new Error('Received unexpected copyInResponse message from backend.')
  430. this._handleErrorEvent(error)
  431. return
  432. }
  433. activeQuery.handleCopyInResponse(this.connection)
  434. }
  435. _handleCopyData(msg) {
  436. const activeQuery = this._getActiveQuery()
  437. if (activeQuery == null) {
  438. const error = new Error('Received unexpected copyData message from backend.')
  439. this._handleErrorEvent(error)
  440. return
  441. }
  442. activeQuery.handleCopyData(msg, this.connection)
  443. }
  444. _handleNotification(msg) {
  445. this.emit('notification', msg)
  446. }
  447. _handleNotice(msg) {
  448. this.emit('notice', msg)
  449. }
  450. getStartupConf() {
  451. const params = this.connectionParameters
  452. const data = {
  453. user: params.user,
  454. database: params.database,
  455. }
  456. const appName = params.application_name || params.fallback_application_name
  457. if (appName) {
  458. data.application_name = appName
  459. }
  460. if (params.replication) {
  461. data.replication = '' + params.replication
  462. }
  463. if (params.statement_timeout) {
  464. data.statement_timeout = String(parseInt(params.statement_timeout, 10))
  465. }
  466. if (params.lock_timeout) {
  467. data.lock_timeout = String(parseInt(params.lock_timeout, 10))
  468. }
  469. if (params.idle_in_transaction_session_timeout) {
  470. data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10))
  471. }
  472. if (params.options) {
  473. data.options = params.options
  474. }
  475. return data
  476. }
  477. cancel(client, query) {
  478. if (client.activeQuery === query) {
  479. const con = this.connection
  480. if (this.host && this.host.indexOf('/') === 0) {
  481. con.connect(this.host + '/.s.PGSQL.' + this.port)
  482. } else {
  483. con.connect(this.port, this.host)
  484. }
  485. // once connection is established send cancel message
  486. con.on('connect', function () {
  487. con.cancel(client.processID, client.secretKey)
  488. })
  489. } else if (client._queryQueue.indexOf(query) !== -1) {
  490. client._queryQueue.splice(client._queryQueue.indexOf(query), 1)
  491. }
  492. }
  493. setTypeParser(oid, format, parseFn) {
  494. return this._types.setTypeParser(oid, format, parseFn)
  495. }
  496. getTypeParser(oid, format) {
  497. return this._types.getTypeParser(oid, format)
  498. }
  499. // escapeIdentifier and escapeLiteral moved to utility functions & exported
  500. // on PG
  501. // re-exported here for backwards compatibility
  502. escapeIdentifier(str) {
  503. return utils.escapeIdentifier(str)
  504. }
  505. escapeLiteral(str) {
  506. return utils.escapeLiteral(str)
  507. }
  508. _pulseQueryQueue() {
  509. if (this.readyForQuery === true) {
  510. this._activeQuery = this._queryQueue.shift()
  511. const activeQuery = this._getActiveQuery()
  512. if (activeQuery) {
  513. this.readyForQuery = false
  514. this.hasExecuted = true
  515. const queryError = activeQuery.submit(this.connection)
  516. if (queryError) {
  517. process.nextTick(() => {
  518. activeQuery.handleError(queryError, this.connection)
  519. this.readyForQuery = true
  520. this._pulseQueryQueue()
  521. })
  522. }
  523. } else if (this.hasExecuted) {
  524. this._activeQuery = null
  525. this.emit('drain')
  526. }
  527. }
  528. }
  529. query(config, values, callback) {
  530. // can take in strings, config object or query object
  531. let query
  532. let result
  533. let readTimeout
  534. let readTimeoutTimer
  535. let queryCallback
  536. if (config === null || config === undefined) {
  537. throw new TypeError('Client was passed a null or undefined query')
  538. } else if (typeof config.submit === 'function') {
  539. readTimeout = config.query_timeout || this.connectionParameters.query_timeout
  540. result = query = config
  541. if (!query.callback) {
  542. if (typeof values === 'function') {
  543. query.callback = values
  544. } else if (callback) {
  545. query.callback = callback
  546. }
  547. }
  548. } else {
  549. readTimeout = config.query_timeout || this.connectionParameters.query_timeout
  550. query = new Query(config, values, callback)
  551. if (!query.callback) {
  552. result = new this._Promise((resolve, reject) => {
  553. query.callback = (err, res) => (err ? reject(err) : resolve(res))
  554. }).catch((err) => {
  555. // replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the
  556. // application that created the query
  557. Error.captureStackTrace(err)
  558. throw err
  559. })
  560. }
  561. }
  562. if (readTimeout) {
  563. queryCallback = query.callback || (() => {})
  564. readTimeoutTimer = setTimeout(() => {
  565. const error = new Error('Query read timeout')
  566. process.nextTick(() => {
  567. query.handleError(error, this.connection)
  568. })
  569. queryCallback(error)
  570. // we already returned an error,
  571. // just do nothing if query completes
  572. query.callback = () => {}
  573. // Remove from queue
  574. const index = this._queryQueue.indexOf(query)
  575. if (index > -1) {
  576. this._queryQueue.splice(index, 1)
  577. }
  578. this._pulseQueryQueue()
  579. }, readTimeout)
  580. query.callback = (err, res) => {
  581. clearTimeout(readTimeoutTimer)
  582. queryCallback(err, res)
  583. }
  584. }
  585. if (this.binary && !query.binary) {
  586. query.binary = true
  587. }
  588. if (query._result && !query._result._types) {
  589. query._result._types = this._types
  590. }
  591. if (!this._queryable) {
  592. process.nextTick(() => {
  593. query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
  594. })
  595. return result
  596. }
  597. if (this._ending) {
  598. process.nextTick(() => {
  599. query.handleError(new Error('Client was closed and is not queryable'), this.connection)
  600. })
  601. return result
  602. }
  603. if (this._queryQueue.length > 0) {
  604. queryQueueLengthDeprecationNotice()
  605. }
  606. this._queryQueue.push(query)
  607. this._pulseQueryQueue()
  608. return result
  609. }
  610. ref() {
  611. this.connection.ref()
  612. }
  613. unref() {
  614. this.connection.unref()
  615. }
  616. end(cb) {
  617. this._ending = true
  618. // if we have never connected, then end is a noop, callback immediately
  619. if (!this.connection._connecting || this._ended) {
  620. if (cb) {
  621. cb()
  622. } else {
  623. return this._Promise.resolve()
  624. }
  625. }
  626. if (this._getActiveQuery() || !this._queryable) {
  627. // if we have an active query we need to force a disconnect
  628. // on the socket - otherwise a hung query could block end forever
  629. this.connection.stream.destroy()
  630. } else {
  631. this.connection.end()
  632. }
  633. if (cb) {
  634. this.connection.once('end', cb)
  635. } else {
  636. return new this._Promise((resolve) => {
  637. this.connection.once('end', resolve)
  638. })
  639. }
  640. }
  641. get queryQueue() {
  642. queryQueueDeprecationNotice()
  643. return this._queryQueue
  644. }
  645. }
  646. // expose a Query constructor
  647. Client.Query = Query
  648. module.exports = Client