| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- /*
- * MinIO Javascript Library for Amazon S3 Compatible Cloud Storage, (C) 2016 MinIO, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import { EventEmitter } from 'eventemitter3'
- import jsonLineParser from 'stream-json/jsonl/Parser.js'
- import { DEFAULT_REGION } from './helpers.ts'
- import type { TypedClient } from './internal/client.ts'
- import { pipesetup, uriEscape } from './internal/helper.ts'
- // TODO: type this
- type Event = unknown
- // Base class for three supported configs.
- export class TargetConfig {
- private Filter?: { S3Key: { FilterRule: { Name: string; Value: string }[] } }
- private Event?: Event[]
- private Id: unknown
- setId(id: unknown) {
- this.Id = id
- }
- addEvent(newevent: Event) {
- if (!this.Event) {
- this.Event = []
- }
- this.Event.push(newevent)
- }
- addFilterSuffix(suffix: string) {
- if (!this.Filter) {
- this.Filter = { S3Key: { FilterRule: [] } }
- }
- this.Filter.S3Key.FilterRule.push({ Name: 'suffix', Value: suffix })
- }
- addFilterPrefix(prefix: string) {
- if (!this.Filter) {
- this.Filter = { S3Key: { FilterRule: [] } }
- }
- this.Filter.S3Key.FilterRule.push({ Name: 'prefix', Value: prefix })
- }
- }
- // 1. Topic (simple notification service)
- export class TopicConfig extends TargetConfig {
- private Topic: string
- constructor(arn: string) {
- super()
- this.Topic = arn
- }
- }
- // 2. Queue (simple queue service)
- export class QueueConfig extends TargetConfig {
- private Queue: string
- constructor(arn: string) {
- super()
- this.Queue = arn
- }
- }
- // 3. CloudFront (lambda function)
- export class CloudFunctionConfig extends TargetConfig {
- private CloudFunction: string
- constructor(arn: string) {
- super()
- this.CloudFunction = arn
- }
- }
- // Notification config - array of target configs.
- // Target configs can be
- // 1. Topic (simple notification service)
- // 2. Queue (simple queue service)
- // 3. CloudFront (lambda function)
- export class NotificationConfig {
- private TopicConfiguration?: TargetConfig[]
- private CloudFunctionConfiguration?: TargetConfig[]
- private QueueConfiguration?: TargetConfig[]
- add(target: TargetConfig) {
- let instance: TargetConfig[] | undefined
- if (target instanceof TopicConfig) {
- instance = this.TopicConfiguration ??= []
- }
- if (target instanceof QueueConfig) {
- instance = this.QueueConfiguration ??= []
- }
- if (target instanceof CloudFunctionConfig) {
- instance = this.CloudFunctionConfiguration ??= []
- }
- if (instance) {
- instance.push(target)
- }
- }
- }
- export const buildARN = (partition: string, service: string, region: string, accountId: string, resource: string) => {
- return 'arn:' + partition + ':' + service + ':' + region + ':' + accountId + ':' + resource
- }
- export const ObjectCreatedAll = 's3:ObjectCreated:*'
- export const ObjectCreatedPut = 's3:ObjectCreated:Put'
- export const ObjectCreatedPost = 's3:ObjectCreated:Post'
- export const ObjectCreatedCopy = 's3:ObjectCreated:Copy'
- export const ObjectCreatedCompleteMultipartUpload = 's3:ObjectCreated:CompleteMultipartUpload'
- export const ObjectRemovedAll = 's3:ObjectRemoved:*'
- export const ObjectRemovedDelete = 's3:ObjectRemoved:Delete'
- export const ObjectRemovedDeleteMarkerCreated = 's3:ObjectRemoved:DeleteMarkerCreated'
- export const ObjectReducedRedundancyLostObject = 's3:ReducedRedundancyLostObject'
- export type NotificationEvent =
- | 's3:ObjectCreated:*'
- | 's3:ObjectCreated:Put'
- | 's3:ObjectCreated:Post'
- | 's3:ObjectCreated:Copy'
- | 's3:ObjectCreated:CompleteMultipartUpload'
- | 's3:ObjectRemoved:*'
- | 's3:ObjectRemoved:Delete'
- | 's3:ObjectRemoved:DeleteMarkerCreated'
- | 's3:ReducedRedundancyLostObject'
- | 's3:TestEvent'
- | 's3:ObjectRestore:Post'
- | 's3:ObjectRestore:Completed'
- | 's3:Replication:OperationFailedReplication'
- | 's3:Replication:OperationMissedThreshold'
- | 's3:Replication:OperationReplicatedAfterThreshold'
- | 's3:Replication:OperationNotTracked'
- | string // put string at least so auto-complete could work
- // TODO: type this
- export type NotificationRecord = unknown
- // Poll for notifications, used in #listenBucketNotification.
- // Listening constitutes repeatedly requesting s3 whether or not any
- // changes have occurred.
- export class NotificationPoller extends EventEmitter<{
- notification: (event: NotificationRecord) => void
- error: (error: unknown) => void
- }> {
- private client: TypedClient
- private bucketName: string
- private prefix: string
- private suffix: string
- private events: NotificationEvent[]
- private ending: boolean
- constructor(client: TypedClient, bucketName: string, prefix: string, suffix: string, events: NotificationEvent[]) {
- super()
- this.client = client
- this.bucketName = bucketName
- this.prefix = prefix
- this.suffix = suffix
- this.events = events
- this.ending = false
- }
- // Starts the polling.
- start() {
- this.ending = false
- process.nextTick(() => {
- this.checkForChanges()
- })
- }
- // Stops the polling.
- stop() {
- this.ending = true
- }
- checkForChanges() {
- // Don't continue if we're looping again but are cancelled.
- if (this.ending) {
- return
- }
- const method = 'GET'
- const queries = []
- if (this.prefix) {
- const prefix = uriEscape(this.prefix)
- queries.push(`prefix=${prefix}`)
- }
- if (this.suffix) {
- const suffix = uriEscape(this.suffix)
- queries.push(`suffix=${suffix}`)
- }
- if (this.events) {
- this.events.forEach((s3event) => queries.push('events=' + uriEscape(s3event)))
- }
- queries.sort()
- let query = ''
- if (queries.length > 0) {
- query = `${queries.join('&')}`
- }
- const region = this.client.region || DEFAULT_REGION
- this.client.makeRequestAsync({ method, bucketName: this.bucketName, query }, '', [200], region).then(
- (response) => {
- const asm = jsonLineParser.make()
- pipesetup(response, asm)
- .on('data', (data) => {
- // Data is flushed periodically (every 5 seconds), so we should
- // handle it after flushing from the JSON parser.
- let records = data.value.Records
- // If null (= no records), change to an empty array.
- if (!records) {
- records = []
- }
- // Iterate over the notifications and emit them individually.
- records.forEach((record: NotificationRecord) => {
- this.emit('notification', record)
- })
- // If we're done, stop.
- if (this.ending) {
- response?.destroy()
- }
- })
- .on('error', (e) => this.emit('error', e))
- .on('end', () => {
- // Do it again, if we haven't cancelled yet.
- process.nextTick(() => {
- this.checkForChanges()
- })
- })
- },
- (e) => {
- return this.emit('error', e)
- },
- )
- }
- }
|