我在尝试使用kafka主题中的数据时遇到了unlimited循环,注意到highwateroffset显示了最新的数字,但值为空,offset=0,partition=0
有人能告诉我出了什么问题吗?
功能
启动时调用的主要函数
function registerUpsertUVIConsumer() {
const upsertConsumer = kafkaHelper.getConsumer('UVI');
upsertConsumer.on('message', async (message) => {
console.log('KAFKA MESSAGE: upsertUVI');
try {
await updateUVIToNav(JSON.parse(message.value));
} catch (error) {
console.error('ERROR UPDATING UVI TO NAV:', error.message);
}
});
upsertConsumer.on('error', err => {
upsertConsumer.close(() => {
console.log('Consumer closed inside error event handler');
});
console.log('RETRY in 30s...');
setTimeout(() => {
registerUpsertUVIConsumer();
}, 30000);
});
}
获取消费者
main函数中的函数:kafkahelper.getconsumer('uvi')
getConsumer(topicName) {
var kafka = require('kafka-node');
var client = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' });
const consumer = new Consumer(
client,
[
{ topic: topicName, partition: 0}
],
{
groupId: 'consumer',
fromOffset: false,
autoCommit: true
}
);
return consumer;
}
示例json数据:偏移量16
示例json数据之一,选择偏移量16(主题中的最后一个偏移量)
{
"seqNo": 21,
"id": "ff78ab90-d890-11ea-a9e8-fbc4285e2503",
"companyCode": "EC",
"dataType": "H",
"documentType": "Credit Memo",
"documentRef": "ITN",
"documentNo": "B200000005",
"externalDocumentNo": "B200000005",
"documentDate": "07/08/2020",
"customerCode": "8921",
"description": "REN",
"amount": -40000.00,
"bizstreamDimension": "UV",
"costCenter": "",
"genProdPostingGroup": "",
"modelCode": "NIS_1",
"accessoryCode": "",
"chassisNo": "SN201",
"registrationNo": "V928",
"userFieldText1": "1.5E (A)",
"userFieldText2": "V200000005",
"customerName": "REN",
"applyToDocumentType": "Invoice",
"applyToDocumentNo": "B200000005",
"chequeNo": "",
"payeeName": "",
"paymentMethod": "",
"userFieldText3": "",
"userFieldDate1": "",
"userFieldText4": "",
"makeCode": "NIS",
"gstBusPostingGroup": "",
"gstProdPostingGroup": "",
"taxCode": "",
"gstRate": 0.00,
"gstAmount": 0.00,
"gstBaseAmount": 0.00,
"gstCalcType": "0",
"supplierName": "",
"invoiceNo": "",
"supplierBRNNo": "",
"reservedField2": "",
"reservedField3": "",
"reservedField4": "",
"reservedField5": "",
"reservedField6": "",
"reservedField7": "",
"reservedField8": "",
"reservedField9": "",
"reservedField10": "",
"syncStatus": "failed",
"deleted": 0,
"createdBy": "fc1506d0-bdb8-11ea-b025-07e81e3e1b13",
"updatedBy": "fc1506d0-bdb8-11ea-b025-07e81e3e1b13",
"createdAt": "2020-08-07 09:33:06",
"updatedAt": "2020-08-07 09:33:06",
"fileId": "fda7e331-d890-11ea-a9e8-fbc4285e2503"
}
样本输出
get error from console.error('将uvi更新到导航时出错:',消息);
ERROR UPDATING UVI TO NAV: {
topic: 'UVI',
value: '',
offset: 0,
partition: 0,
highWaterOffset: 17,
key: ''
}
无限循环
暂无答案!
目前还没有任何答案,快来回答吧!