Search
 
SCRIPT & CODE EXAMPLE
 
CODE EXAMPLE FOR JAVASCRIPT

kafkajs

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
	clientId: 'node-app',
	brokers: ['localhost:9092'],
	requestTimeout: 3000,
	connectionTimeout: 6000,
	ssl: false
})

exports.producer = async (eventName, data) => {
	const producer = kafka.producer()

	// event kafka producer notification
	await producer.on('producer.connect', () => console.info('producer kafka connected'))
	await producer.on('producer.disconnect', () => console.error('producer kafka disconnect'))
	await producer.on('producer.network.request_timeout', () => console.error('producer kafka network timeout'))

	await producer.connect()
	await producer.send({
		topic: eventName,
		messages: data,
		acks: true,
		compression: 1
	})

	await producer.disconnect()
}

exports.consumer = async (eventName, callback) => {
	const consumer = kafka.consumer({
		groupId: 'test-group',
		maxBytes: 1048576000, // 1GB
		maxBytesPerPartition: 1048576000, // 1GB
		sessionTimeout: 60000,
		heartbeatInterval: 6000,
		rebalanceTimeout: 30000
	})

	// event kafka consumer notification
	await consumer.on('consumer.connect', () => console.info('consumer kafka connected'))
	await consumer.on('consumer.disconnect', () => console.error('consumer kafka disconnect'))
	await consumer.on('consumer.crash', () => console.error('consumer kafka crash'))
	await consumer.on('consumer.stop', () => console.error('consumer kafka stop'))
	await consumer.on('consumer.network.request_timeout', () => console.error('consumer kafka network timeout'))

	await consumer.connect()
	await consumer.subscribe({ topic: eventName, fromBeginning: true })
	await consumer.run({ autoCommit: true, eachMessage: callback })
}
 
PREVIOUS NEXT
Tagged: #kafkajs
ADD COMMENT
Topic
Name
5+1 =