connection.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. 'use strict'
  2. const EventEmitter = require('events').EventEmitter
  3. const { parse, serialize } = require('pg-protocol')
  4. const { getStream, getSecureStream } = require('./stream')
  5. const flushBuffer = serialize.flush()
  6. const syncBuffer = serialize.sync()
  7. const endBuffer = serialize.end()
  8. // TODO(bmc) support binary mode at some point
  9. class Connection extends EventEmitter {
  10. constructor(config) {
  11. super()
  12. config = config || {}
  13. this.stream = config.stream || getStream(config.ssl)
  14. if (typeof this.stream === 'function') {
  15. this.stream = this.stream(config)
  16. }
  17. this._keepAlive = config.keepAlive
  18. this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
  19. this.parsedStatements = {}
  20. this.ssl = config.ssl || false
  21. this._ending = false
  22. this._emitMessage = false
  23. const self = this
  24. this.on('newListener', function (eventName) {
  25. if (eventName === 'message') {
  26. self._emitMessage = true
  27. }
  28. })
  29. }
  30. connect(port, host) {
  31. const self = this
  32. this._connecting = true
  33. this.stream.setNoDelay(true)
  34. this.stream.connect(port, host)
  35. this.stream.once('connect', function () {
  36. if (self._keepAlive) {
  37. self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
  38. }
  39. self.emit('connect')
  40. })
  41. const reportStreamError = function (error) {
  42. // errors about disconnections should be ignored during disconnect
  43. if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
  44. return
  45. }
  46. self.emit('error', error)
  47. }
  48. this.stream.on('error', reportStreamError)
  49. this.stream.on('close', function () {
  50. self.emit('end')
  51. })
  52. if (!this.ssl) {
  53. return this.attachListeners(this.stream)
  54. }
  55. this.stream.once('data', function (buffer) {
  56. const responseCode = buffer.toString('utf8')
  57. switch (responseCode) {
  58. case 'S': // Server supports SSL connections, continue with a secure connection
  59. break
  60. case 'N': // Server does not support SSL connections
  61. self.stream.end()
  62. return self.emit('error', new Error('The server does not support SSL connections'))
  63. default:
  64. // Any other response byte, including 'E' (ErrorResponse) indicating a server error
  65. self.stream.end()
  66. return self.emit('error', new Error('There was an error establishing an SSL connection'))
  67. }
  68. const options = {
  69. socket: self.stream,
  70. }
  71. if (self.ssl !== true) {
  72. Object.assign(options, self.ssl)
  73. if ('key' in self.ssl) {
  74. options.key = self.ssl.key
  75. }
  76. }
  77. const net = require('net')
  78. if (net.isIP && net.isIP(host) === 0) {
  79. options.servername = host
  80. }
  81. try {
  82. self.stream = getSecureStream(options)
  83. } catch (err) {
  84. return self.emit('error', err)
  85. }
  86. self.attachListeners(self.stream)
  87. self.stream.on('error', reportStreamError)
  88. self.emit('sslconnect')
  89. })
  90. }
  91. attachListeners(stream) {
  92. parse(stream, (msg) => {
  93. const eventName = msg.name === 'error' ? 'errorMessage' : msg.name
  94. if (this._emitMessage) {
  95. this.emit('message', msg)
  96. }
  97. this.emit(eventName, msg)
  98. })
  99. }
  100. requestSsl() {
  101. this.stream.write(serialize.requestSsl())
  102. }
  103. startup(config) {
  104. this.stream.write(serialize.startup(config))
  105. }
  106. cancel(processID, secretKey) {
  107. this._send(serialize.cancel(processID, secretKey))
  108. }
  109. password(password) {
  110. this._send(serialize.password(password))
  111. }
  112. sendSASLInitialResponseMessage(mechanism, initialResponse) {
  113. this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
  114. }
  115. sendSCRAMClientFinalMessage(additionalData) {
  116. this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
  117. }
  118. _send(buffer) {
  119. if (!this.stream.writable) {
  120. return false
  121. }
  122. return this.stream.write(buffer)
  123. }
  124. query(text) {
  125. this._send(serialize.query(text))
  126. }
  127. // send parse message
  128. parse(query) {
  129. this._send(serialize.parse(query))
  130. }
  131. // send bind message
  132. bind(config) {
  133. this._send(serialize.bind(config))
  134. }
  135. // send execute message
  136. execute(config) {
  137. this._send(serialize.execute(config))
  138. }
  139. flush() {
  140. if (this.stream.writable) {
  141. this.stream.write(flushBuffer)
  142. }
  143. }
  144. sync() {
  145. this._ending = true
  146. this._send(syncBuffer)
  147. }
  148. ref() {
  149. this.stream.ref()
  150. }
  151. unref() {
  152. this.stream.unref()
  153. }
  154. end() {
  155. // 0x58 = 'X'
  156. this._ending = true
  157. if (!this._connecting || !this.stream.writable) {
  158. this.stream.end()
  159. return
  160. }
  161. return this.stream.write(endBuffer, () => {
  162. this.stream.end()
  163. })
  164. }
  165. close(msg) {
  166. this._send(serialize.close(msg))
  167. }
  168. describe(msg) {
  169. this._send(serialize.describe(msg))
  170. }
  171. sendCopyFromChunk(chunk) {
  172. this._send(serialize.copyData(chunk))
  173. }
  174. endCopyFrom() {
  175. this._send(serialize.copyDone())
  176. }
  177. sendCopyFail(msg) {
  178. this._send(serialize.copyFail(msg))
  179. }
  180. }
  181. module.exports = Connection