Search
 
SCRIPT & CODE EXAMPLE
 

TYPESCRIPT

amqplib

import 'dotenv/config'
import amqplib, { Options, Connection, Channel, ConsumeMessage } from 'amqplib'
import os from 'os'
import EventEmitter from 'events'

import { HydeLivingError } from '@helpers/helper.error'

export class Rabbitmq {
	private connectionOptions: Options.Connect
	private optionsPublish: Options.Publish
	private assertQueueOptions: Options.AssertQueue
	private emitter: InstanceType<typeof EventEmitter> = new EventEmitter({ captureRejections: true })

	private async connection(): Promise<any> {
		try {
			this.connectionOptions = {
				protocol: process.env.RABBITMQ_PROTOCOL,
				vhost: process.env.RABBITMQ_HOST,
				username: process.env.RABBITMQ_USERNAME,
				password: process.env.RABBITMQ_PASSWORD
			}

			const connect: Connection = await amqplib.connect(this.connectionOptions)
			if (connect instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			const channel: Channel = await connect.createChannel()
			channel.assertExchange('exchage', 'topic', { autoDelete: false, durable: true })

			return connect
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}

	async publisher(prefix: string, key: string, data: Record<string, any> | Record<string, any>[]): Promise<any> {
		try {
			const broker: Connection = await this.connection()
			if (broker instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			this.assertQueueOptions = {
				maxPriority: 10,
				autoDelete: true,
				durable: true,
				messageTtl: Number(new Date().getTime() + 3 * 60 * 1000), // 3 minutes,
				expires: Number(new Date().getTime() + 60 * 60 * 1000) // 1 hours
			}

			this.optionsPublish = {
				persistent: true,
				priority: os.cpus().length,
				timestamp: Date.now()
			}

			const channel: Channel = await broker.createChannel()
			if (channel instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			await channel.assertQueue(`${prefix}_${key}`, this.assertQueueOptions)
			const publish: boolean = channel.sendToQueue(`${prefix}_${key}`, Buffer.from(JSON.stringify(data)), this.optionsPublish)

			if (publish == false) {
				await channel.close()
				throw new HydeLivingError('Send message to queue failed')
			}
			return true
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}

	private async consumer(prefix: string, key: string): Promise<any> {
		try {
			const broker: Connection = await this.connection()
			if (broker instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			const channel: Channel = await broker.createChannel()
			if (channel instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			channel.consume(`${prefix}_${key}`, (msg: ConsumeMessage): void => {
				this.emitter.emit('data', msg.content.toString())
				channel.ack(msg)
			})
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}

	async subscriber(prefix: string, key: string): Promise<any> {
		try {
			const consumer: Channel = await this.consumer(prefix, key)
			if (consumer instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			return new Promise<any>((resolve, _reject) => {
				this.emitter.on('data', (data: any) => resolve(JSON.parse(JSON.stringify(data))))
			})
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}
}
Comment

PREVIOUS NEXT
Code Example
Typescript :: useSortBy 
Typescript :: stats splunk many fields 
Typescript :: Angular/RxJs When should I unsubscribe from `Subscription` 
Typescript :: how to install tsu 
Typescript :: css animation for beginners 
Typescript :: dynamic index in typescript 
Typescript :: The Apple I had a built-in video terminal, sockets for __________ kilobytes of onboard random access memory (RAM), a keyboard, and a cassette board meant to work with regular cassette recorders. 
Typescript :: ring Composition and Returning Objects and Lists by Reference 
Typescript :: MInus points of exploration 
Typescript :: how to make a tool detect a click and add points roblox studio 
Typescript :: quizlet In converting an entrepreneurial business script into an enterprise value chain, the financing process of the value chain is usually made up of two different scenes 
Typescript :: java a program that converts letters to their corrosponding telephone digits 
Typescript :: ts date toisostring incorrect conversion 
Typescript :: bullmq 
Typescript :: how to make the score add on while its in a loop in python 
Typescript :: how to execute the same test case for multiple time using testng? 
Typescript :: how to invert sortField primeng 
Typescript :: typescript Empty Types 
Typescript :: add optional parameters javascript or typescript function 
Typescript :: move between points in godot 
Typescript :: typescript object annotation 
Typescript :: calculate north south east west using magnetic sensor 
Typescript :: typescript add class to element 
Typescript :: typescript enum includes value 
Typescript :: filtering objects in django templates 
Typescript :: fusion builder elegant elements for free 
Typescript :: create react project with typescript 
Cpp :: 3d dynamic array c++ 
Cpp :: c++ how to loop through a vector but not the last element 
Cpp :: sleep in cpp 
ADD CONTENT
Topic
Content
Source link
Name
9+8 =