notification.ts 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. * MinIO Javascript Library for Amazon S3 Compatible Cloud Storage, (C) 2016 MinIO, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import { EventEmitter } from 'eventemitter3'
  17. import jsonLineParser from 'stream-json/jsonl/Parser.js'
  18. import { DEFAULT_REGION } from './helpers.ts'
  19. import type { TypedClient } from './internal/client.ts'
  20. import { pipesetup, uriEscape } from './internal/helper.ts'
  21. // TODO: type this
  22. type Event = unknown
  23. // Base class for three supported configs.
  24. export class TargetConfig {
  25. private Filter?: { S3Key: { FilterRule: { Name: string; Value: string }[] } }
  26. private Event?: Event[]
  27. private Id: unknown
  28. setId(id: unknown) {
  29. this.Id = id
  30. }
  31. addEvent(newevent: Event) {
  32. if (!this.Event) {
  33. this.Event = []
  34. }
  35. this.Event.push(newevent)
  36. }
  37. addFilterSuffix(suffix: string) {
  38. if (!this.Filter) {
  39. this.Filter = { S3Key: { FilterRule: [] } }
  40. }
  41. this.Filter.S3Key.FilterRule.push({ Name: 'suffix', Value: suffix })
  42. }
  43. addFilterPrefix(prefix: string) {
  44. if (!this.Filter) {
  45. this.Filter = { S3Key: { FilterRule: [] } }
  46. }
  47. this.Filter.S3Key.FilterRule.push({ Name: 'prefix', Value: prefix })
  48. }
  49. }
  50. // 1. Topic (simple notification service)
  51. export class TopicConfig extends TargetConfig {
  52. private Topic: string
  53. constructor(arn: string) {
  54. super()
  55. this.Topic = arn
  56. }
  57. }
  58. // 2. Queue (simple queue service)
  59. export class QueueConfig extends TargetConfig {
  60. private Queue: string
  61. constructor(arn: string) {
  62. super()
  63. this.Queue = arn
  64. }
  65. }
  66. // 3. CloudFront (lambda function)
  67. export class CloudFunctionConfig extends TargetConfig {
  68. private CloudFunction: string
  69. constructor(arn: string) {
  70. super()
  71. this.CloudFunction = arn
  72. }
  73. }
  74. // Notification config - array of target configs.
  75. // Target configs can be
  76. // 1. Topic (simple notification service)
  77. // 2. Queue (simple queue service)
  78. // 3. CloudFront (lambda function)
  79. export class NotificationConfig {
  80. private TopicConfiguration?: TargetConfig[]
  81. private CloudFunctionConfiguration?: TargetConfig[]
  82. private QueueConfiguration?: TargetConfig[]
  83. add(target: TargetConfig) {
  84. let instance: TargetConfig[] | undefined
  85. if (target instanceof TopicConfig) {
  86. instance = this.TopicConfiguration ??= []
  87. }
  88. if (target instanceof QueueConfig) {
  89. instance = this.QueueConfiguration ??= []
  90. }
  91. if (target instanceof CloudFunctionConfig) {
  92. instance = this.CloudFunctionConfiguration ??= []
  93. }
  94. if (instance) {
  95. instance.push(target)
  96. }
  97. }
  98. }
  99. export const buildARN = (partition: string, service: string, region: string, accountId: string, resource: string) => {
  100. return 'arn:' + partition + ':' + service + ':' + region + ':' + accountId + ':' + resource
  101. }
  102. export const ObjectCreatedAll = 's3:ObjectCreated:*'
  103. export const ObjectCreatedPut = 's3:ObjectCreated:Put'
  104. export const ObjectCreatedPost = 's3:ObjectCreated:Post'
  105. export const ObjectCreatedCopy = 's3:ObjectCreated:Copy'
  106. export const ObjectCreatedCompleteMultipartUpload = 's3:ObjectCreated:CompleteMultipartUpload'
  107. export const ObjectRemovedAll = 's3:ObjectRemoved:*'
  108. export const ObjectRemovedDelete = 's3:ObjectRemoved:Delete'
  109. export const ObjectRemovedDeleteMarkerCreated = 's3:ObjectRemoved:DeleteMarkerCreated'
  110. export const ObjectReducedRedundancyLostObject = 's3:ReducedRedundancyLostObject'
  111. export type NotificationEvent =
  112. | 's3:ObjectCreated:*'
  113. | 's3:ObjectCreated:Put'
  114. | 's3:ObjectCreated:Post'
  115. | 's3:ObjectCreated:Copy'
  116. | 's3:ObjectCreated:CompleteMultipartUpload'
  117. | 's3:ObjectRemoved:*'
  118. | 's3:ObjectRemoved:Delete'
  119. | 's3:ObjectRemoved:DeleteMarkerCreated'
  120. | 's3:ReducedRedundancyLostObject'
  121. | 's3:TestEvent'
  122. | 's3:ObjectRestore:Post'
  123. | 's3:ObjectRestore:Completed'
  124. | 's3:Replication:OperationFailedReplication'
  125. | 's3:Replication:OperationMissedThreshold'
  126. | 's3:Replication:OperationReplicatedAfterThreshold'
  127. | 's3:Replication:OperationNotTracked'
  128. | string // put string at least so auto-complete could work
  129. // TODO: type this
  130. export type NotificationRecord = unknown
  131. // Poll for notifications, used in #listenBucketNotification.
  132. // Listening constitutes repeatedly requesting s3 whether or not any
  133. // changes have occurred.
  134. export class NotificationPoller extends EventEmitter<{
  135. notification: (event: NotificationRecord) => void
  136. error: (error: unknown) => void
  137. }> {
  138. private client: TypedClient
  139. private bucketName: string
  140. private prefix: string
  141. private suffix: string
  142. private events: NotificationEvent[]
  143. private ending: boolean
  144. constructor(client: TypedClient, bucketName: string, prefix: string, suffix: string, events: NotificationEvent[]) {
  145. super()
  146. this.client = client
  147. this.bucketName = bucketName
  148. this.prefix = prefix
  149. this.suffix = suffix
  150. this.events = events
  151. this.ending = false
  152. }
  153. // Starts the polling.
  154. start() {
  155. this.ending = false
  156. process.nextTick(() => {
  157. this.checkForChanges()
  158. })
  159. }
  160. // Stops the polling.
  161. stop() {
  162. this.ending = true
  163. }
  164. checkForChanges() {
  165. // Don't continue if we're looping again but are cancelled.
  166. if (this.ending) {
  167. return
  168. }
  169. const method = 'GET'
  170. const queries = []
  171. if (this.prefix) {
  172. const prefix = uriEscape(this.prefix)
  173. queries.push(`prefix=${prefix}`)
  174. }
  175. if (this.suffix) {
  176. const suffix = uriEscape(this.suffix)
  177. queries.push(`suffix=${suffix}`)
  178. }
  179. if (this.events) {
  180. this.events.forEach((s3event) => queries.push('events=' + uriEscape(s3event)))
  181. }
  182. queries.sort()
  183. let query = ''
  184. if (queries.length > 0) {
  185. query = `${queries.join('&')}`
  186. }
  187. const region = this.client.region || DEFAULT_REGION
  188. this.client.makeRequestAsync({ method, bucketName: this.bucketName, query }, '', [200], region).then(
  189. (response) => {
  190. const asm = jsonLineParser.make()
  191. pipesetup(response, asm)
  192. .on('data', (data) => {
  193. // Data is flushed periodically (every 5 seconds), so we should
  194. // handle it after flushing from the JSON parser.
  195. let records = data.value.Records
  196. // If null (= no records), change to an empty array.
  197. if (!records) {
  198. records = []
  199. }
  200. // Iterate over the notifications and emit them individually.
  201. records.forEach((record: NotificationRecord) => {
  202. this.emit('notification', record)
  203. })
  204. // If we're done, stop.
  205. if (this.ending) {
  206. response?.destroy()
  207. }
  208. })
  209. .on('error', (e) => this.emit('error', e))
  210. .on('end', () => {
  211. // Do it again, if we haven't cancelled yet.
  212. process.nextTick(() => {
  213. this.checkForChanges()
  214. })
  215. })
  216. },
  217. (e) => {
  218. return this.emit('error', e)
  219. },
  220. )
  221. }
  222. }