@evlmaistrenko/tools-mongomq
    Preparing search index...

    Class Queue<TPayload>

    Queue of messages

    import { collection } from "<yourDatabase>"

    const queue = new Queue(collection)

    Type Parameters

    • TPayload = any
    Index

    Constructors

    Properties

    messagesCollection: Collection<MessageDocument<TPayload>>
    messagesStream?: ChangeStream<
        MessageDocument<TPayload>,
        ChangeStreamDocument<MessageDocument<TPayload>>,
    >

    ChangeStream of messagesCollection

    1.1.1

    messagesStreamConsumersCount: number = 0

    How many messages generator iterating at this time

    1.1.1

    messagesStreamTriggers: PromiseWithResolvers<void>[] = []

    Promises with resolvers for messagesStream events

    1.1.1

    Accessors

    • get delayedMessagesTrigger(): Promise<void>

      Promise that resolves when closest delayed message is ready to consume

      Returns Promise<void>

      1.1.1

    • get messagesTrigger(): Promise<void>

      Promise that resolves when change event fired

      Returns Promise<void>

      1.1.1

    Methods

    • Closes messagesStream if there is no messages generators iterating

      Returns Promise<void>

      1.1.1

    • Creates indexes for messagesCollection

      Returns Promise<void>

      1.1.1

    • Receives messages from queue

      Parameters

      • concurrency: number = Infinity

        Maximum number of concurrently considering messages

      Returns AsyncGenerator<(callback: ConsumeCallback<TPayload>) => Promise<void>>

      for await (let consume of queue.messages())
      consume(async (message) => {
      // do something with `message.payload`
      }).catch((error) => {
      // process `error`
      })
    • Publishes massage to queue

      Parameters

      • payload: TPayload

        Payload of message

      • OptionaldelayedTo: Date

        If you need to defer receiving to some date

      Returns Promise<void>

      await queue.publish({ some: "value" })