json输入意外结束并显示空值

jmp7cifd  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(475)

我在尝试使用kafka主题中的数据时遇到了unlimited循环,注意到highwateroffset显示了最新的数字,但值为空,offset=0,partition=0
有人能告诉我出了什么问题吗?
功能
启动时调用的主要函数

  1. function registerUpsertUVIConsumer() {
  2. const upsertConsumer = kafkaHelper.getConsumer('UVI');
  3. upsertConsumer.on('message', async (message) => {
  4. console.log('KAFKA MESSAGE: upsertUVI');
  5. try {
  6. await updateUVIToNav(JSON.parse(message.value));
  7. } catch (error) {
  8. console.error('ERROR UPDATING UVI TO NAV:', error.message);
  9. }
  10. });
  11. upsertConsumer.on('error', err => {
  12. upsertConsumer.close(() => {
  13. console.log('Consumer closed inside error event handler');
  14. });
  15. console.log('RETRY in 30s...');
  16. setTimeout(() => {
  17. registerUpsertUVIConsumer();
  18. }, 30000);
  19. });
  20. }

获取消费者
main函数中的函数:kafkahelper.getconsumer('uvi')

  1. getConsumer(topicName) {
  2. var kafka = require('kafka-node');
  3. var client = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' });
  4. const consumer = new Consumer(
  5. client,
  6. [
  7. { topic: topicName, partition: 0}
  8. ],
  9. {
  10. groupId: 'consumer',
  11. fromOffset: false,
  12. autoCommit: true
  13. }
  14. );
  15. return consumer;
  16. }

示例json数据:偏移量16
示例json数据之一,选择偏移量16(主题中的最后一个偏移量)

  1. {
  2. "seqNo": 21,
  3. "id": "ff78ab90-d890-11ea-a9e8-fbc4285e2503",
  4. "companyCode": "EC",
  5. "dataType": "H",
  6. "documentType": "Credit Memo",
  7. "documentRef": "ITN",
  8. "documentNo": "B200000005",
  9. "externalDocumentNo": "B200000005",
  10. "documentDate": "07/08/2020",
  11. "customerCode": "8921",
  12. "description": "REN",
  13. "amount": -40000.00,
  14. "bizstreamDimension": "UV",
  15. "costCenter": "",
  16. "genProdPostingGroup": "",
  17. "modelCode": "NIS_1",
  18. "accessoryCode": "",
  19. "chassisNo": "SN201",
  20. "registrationNo": "V928",
  21. "userFieldText1": "1.5E (A)",
  22. "userFieldText2": "V200000005",
  23. "customerName": "REN",
  24. "applyToDocumentType": "Invoice",
  25. "applyToDocumentNo": "B200000005",
  26. "chequeNo": "",
  27. "payeeName": "",
  28. "paymentMethod": "",
  29. "userFieldText3": "",
  30. "userFieldDate1": "",
  31. "userFieldText4": "",
  32. "makeCode": "NIS",
  33. "gstBusPostingGroup": "",
  34. "gstProdPostingGroup": "",
  35. "taxCode": "",
  36. "gstRate": 0.00,
  37. "gstAmount": 0.00,
  38. "gstBaseAmount": 0.00,
  39. "gstCalcType": "0",
  40. "supplierName": "",
  41. "invoiceNo": "",
  42. "supplierBRNNo": "",
  43. "reservedField2": "",
  44. "reservedField3": "",
  45. "reservedField4": "",
  46. "reservedField5": "",
  47. "reservedField6": "",
  48. "reservedField7": "",
  49. "reservedField8": "",
  50. "reservedField9": "",
  51. "reservedField10": "",
  52. "syncStatus": "failed",
  53. "deleted": 0,
  54. "createdBy": "fc1506d0-bdb8-11ea-b025-07e81e3e1b13",
  55. "updatedBy": "fc1506d0-bdb8-11ea-b025-07e81e3e1b13",
  56. "createdAt": "2020-08-07 09:33:06",
  57. "updatedAt": "2020-08-07 09:33:06",
  58. "fileId": "fda7e331-d890-11ea-a9e8-fbc4285e2503"
  59. }

样本输出
get error from console.error('将uvi更新到导航时出错:',消息);

  1. ERROR UPDATING UVI TO NAV: {
  2. topic: 'UVI',
  3. value: '',
  4. offset: 0,
  5. partition: 0,
  6. highWaterOffset: 17,
  7. key: ''
  8. }

无限循环

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题