我已经使用yaml设置了cp kafka和cp zookeeper
version: '3'
services:
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper
ports:
- "32181:32181"
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
container_name: kafka
image: confluentinc/cp-kafka
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9092,OUTSIDE://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ES_JAVA_OPTS: "-Xms512m -Xmx3000m"
使用nodejs框架,使用kafka节点库尝试生成和使用消息。
我的程序代码(工作正常)
class kafkaProducer{
constructor(){
/* Getting the kafak avaliable messge broker reference */
this.kafkaBorkerObj = require('kafka-node');
this.strDebug = "----------KAFKA PRODUCER --> ";
}
/***************************************************************************/
/*Purpose : Send Message into the topics.
/*Input : pStrTopicName :: Topic name,
: pStrMessageBody :: Message body
/*Response : Message send status :: TRUE/FALSE
/***************************************************************************/
setMessageInTopic(pStrTopicName, pStrMessageBody, callback){
try{
/* Variable and object init */
const Producer = this.kafkaBorkerObj.Producer;
const kafkaClient = new this.kafkaBorkerObj.KafkaClient({kafkaHost:process.env.KAFAK_BROKER_ENDPOINT});
const producerObj = new Producer(kafkaClient);
const strDebug = this.strDebug;
console.log(strDebug+"Received Topic : "+pStrTopicName);
/* Topic and message defination */
let arrPayloads = [
{
topic: pStrTopicName,
retention: 1000,
messages: JSON.stringify(pStrMessageBody)
}
];
console.log(strDebug+"Paylod : "+arrPayloads);
/* if producer ready then do needful */
producerObj.on('ready', async function() {
console.log(strDebug+"Producer is ready");
/* Producing the message to the topic, create if not exists */
let push_status = producerObj.send(arrPayloads, (err, data) => {
console.log(strDebug+"Producer is send");
/* if any error occred the do needful */
if (err) {
console.log(strDebug+pStrTopicName+']: broker update failed');
console.log(err);
/* Send operation faild response */
callback(false) ;
} else {
console.log(strDebug+pStrTopicName+']: broker update success');
/* Send operation success response */
callback(data[pStrTopicName][0]);
}
/* Closing the Kafka object */
//producerObj.close();
//kafkaClient.close();
});
});
/* if any error occured then do needful */
producerObj.on('error', function(err) {
console.log(err);
console.log(strDebug+kafka_topic+']: connection errored');
/* Closing the Kafka object */
//producerObj.close();
//kafkaClient.close();
/* Send operation faild response */
callback(false) ;
});
}catch(e) {
console.log(e);
/* Send operation faild response */
callback(false) ;
}finally{
}
}
}
/* Making Kafka Producer class as public to the all users */
module.exports = kafkaProducer;
消费者代码是
/*************************************************************************/
/*Purpose : Reading the message from kafka queue for given topics.
/*************************************************************************/
class kafkaConsumer{
constructor(){
/* Getting the kafak avaliable messge broker reference */
this.kafkaBokerObj = require('kafka-node');
this.strDebug = "----------KAFKA CONSUMER --> ";
}
/***************************************************************************/
/*Purpose : Reading the Message from.
/*Input : pStrTopicNameArr :: Topic name array,
/*Response : Send the received message from topic
/***************************************************************************/
getMessageFromKafak(pStrTopicNameArr, callback){
try{
var Consumer = this.kafkaBokerObj.Consumer;
var kafkaClient = new this.kafkaBokerObj.KafkaClient({kafkaHost:process.env.KAFAK_BROKER_ENDPOINT});
var strDebug = this.strDebug;
var options = {
// Auto commit config
autoCommit: false,
autoCommitMsgCount: 100,
autoCommitIntervalMs: 1000,
// Fetch message config
fetchMaxWaitMs: 5000,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 10,
fromOffset: false,
fromBeginning: false
};
let consumerObj = new Consumer(kafkaClient,pStrTopicNameArr,options);
console.log(strDebug+"RESPONE MESSAGE PROCESS "+pStrTopicNameArr)
console.log(pStrTopicNameArr);
for(var pStrTopicObj of pStrTopicNameArr){
var strTopicName = pStrTopicObj.topic;
console.log("Topic Added "+strTopicName);
}
/* If kafka consumer is on then do read the message */
consumerObj.on('message', function (pStrMessage) {
console.log(strDebug+"RESPONE COMMIT MESSAGE ----------------------");
consumerObj.commit({'force':true},function(err, data) {
console.log(strDebug+"RESPONE COMMIT MESSAGE DONE ----------------------");
console.log(err);
console.log(data);
/* Return the message */
return callback(pStrMessage);
});
console.log(strDebug+"RESPONE MESSAGE ----------------------");
/* consumerObj.close();
kafkaClient.close(); */
}).on('error', function (pStrError) {
console.log(strDebug+"RESPONE MESSAGE ERROR ----------------------")
console.log(pStrError);
/* consumerObj.close();
kafkaClient.close(); */
/* Return negative response */
return callback(false);
}).on('offsetOutOfRange', function (topic) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumerObj.setOffset(topic.topic, topic.partition, min);
});
});
consumerObj.addTopics(pStrTopicNameArr, function (err, added) {
console.log("***************");
console.log(err);
console.log(added);
});
/* pStrTopicNameArr.forEach(function(pIndex, pStrValue){
var strTopicName = pIndex.topic;
console.log("Topic Added "+strTopicName);
consumerObj.addTopics(strTopicName, function (err, added) {
console.log("***************");
console.log(err);
console.log(added);
});
}); */
}catch(e) {
console.log(e);
/* Return negative response */
return callback(false);
}finally{
/* Closing the Kafka object */
//consumerObj.close();
//kafkaClient.close();
}
}
}
/* Making Kafka Consumer class as public to the all users */
module.exports = kafkaConsumer;
调用消费代码,此函数get调用一个消息get成功的过程。
function resetTheConsumer(intOffset){
try{
/* if any tpoic is added in the message queue then do needful */
if(strTopicArr.length > 0){
/* Got the response */
kafkaConsumerObj.getMessageFromKafak(strTopicCollection, function(pResponseMessage){
/* received message */
echo("Received message from Message Queue");
echo(pResponseMessage);
sendMessage(pResponseMessage);
});
}else{
echo("No Topic Found");
}
}catch(exception){
echo("Error occured while reading the mesasge from Message Queue sending the exception");
echo(exception);
}
}
我们的尝试(消费者部分)
设置 autoCommit: false
并在成功读取消息后手动提交消息。
结果:消费者仍然从旧的偏移量返回已读的消息
手动提交输出 { getCustomerInfossds: { partition: 0, errorCode: 0 } }
添加步骤(1)添加了 consumerObj.addTopics
结果:addtopics工作,但仍然从旧的偏移量获取read消息。
addtopics的输出 [ 'getCustomerInfossds' ]
(主题名称)
以及接收消息(如果执行消费者,则返回该消息的编号)
{ topic: 'getCustomerInfossds',
value:
'{"headers":{"Authorization":"Bearer clpVU3kwSFNKZlA5TUtrOl9jUGtLb3VwLXMtZGlSQlZHN1ZKZ040Vmh1QQ==","token":"dd8f4eee15c338ebf2484e772a15a05a","wsSessionID":"Dc55aclJlBEYJOj1N7GFql7lzsJNbL","keepAlive":18000,"operation":"getCustomerInfo/","messageSendTimeStamp":"25/6/2020 2:53:15:655","corRelationId":"HqwS0R4aHY4JxpWapDaIYv75EQOtM6udUhajQaVE"},"requestBody":{"topic":"getCustomerInfossds","data":{"getCustomerInfo":{"_id":1}}}}',
offset: 66,
partition: 0,
highWaterOffset: 67,
key: null }
浏览器输出
暂无答案!
目前还没有任何答案,快来回答吧!