import 'dotenv/config'
import amqplib, { Options, Connection, Channel, ConsumeMessage } from 'amqplib'
import os from 'os'
import EventEmitter from 'events'
import { HydeLivingError } from '@helpers/helper.error'
export class Rabbitmq {
private connectionOptions: Options.Connect
private optionsPublish: Options.Publish
private assertQueueOptions: Options.AssertQueue
private emitter: InstanceType<typeof EventEmitter> = new EventEmitter({ captureRejections: true })
private async connection(): Promise<any> {
try {
this.connectionOptions = {
protocol: process.env.RABBITMQ_PROTOCOL,
vhost: process.env.RABBITMQ_HOST,
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD
}
const connect: Connection = await amqplib.connect(this.connectionOptions)
if (connect instanceof Error) throw new HydeLivingError('rabbitmq is not connected')
const channel: Channel = await connect.createChannel()
channel.assertExchange('exchage', 'topic', { autoDelete: false, durable: true })
return connect
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
async publisher(prefix: string, key: string, data: Record<string, any> | Record<string, any>[]): Promise<any> {
try {
const broker: Connection = await this.connection()
if (broker instanceof Error) throw new HydeLivingError('rabbitmq is not connected')
this.assertQueueOptions = {
maxPriority: 10,
autoDelete: true,
durable: true,
messageTtl: Number(new Date().getTime() + 3 * 60 * 1000), // 3 minutes,
expires: Number(new Date().getTime() + 60 * 60 * 1000) // 1 hours
}
this.optionsPublish = {
persistent: true,
priority: os.cpus().length,
timestamp: Date.now()
}
const channel: Channel = await broker.createChannel()
if (channel instanceof Error) throw new HydeLivingError('rabbitmq is not connected')
await channel.assertQueue(`${prefix}_${key}`, this.assertQueueOptions)
const publish: boolean = channel.sendToQueue(`${prefix}_${key}`, Buffer.from(JSON.stringify(data)), this.optionsPublish)
if (publish == false) {
await channel.close()
throw new HydeLivingError('Send message to queue failed')
}
return true
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
private async consumer(prefix: string, key: string): Promise<any> {
try {
const broker: Connection = await this.connection()
if (broker instanceof Error) throw new HydeLivingError('rabbitmq is not connected')
const channel: Channel = await broker.createChannel()
if (channel instanceof Error) throw new HydeLivingError('rabbitmq is not connected')
channel.consume(`${prefix}_${key}`, (msg: ConsumeMessage): void => {
this.emitter.emit('data', msg.content.toString())
channel.ack(msg)
})
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
async subscriber(prefix: string, key: string): Promise<any> {
try {
const consumer: Channel = await this.consumer(prefix, key)
if (consumer instanceof Error) throw new HydeLivingError('rabbitmq is not connected')
return new Promise<any>((resolve, _reject) => {
this.emitter.on('data', (data: any) => resolve(JSON.parse(JSON.stringify(data))))
})
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
}