json输入意外结束并显示空值

jmp7cifd  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(422)

我在尝试使用kafka主题中的数据时遇到了unlimited循环,注意到highwateroffset显示了最新的数字,但值为空,offset=0,partition=0
有人能告诉我出了什么问题吗?
功能
启动时调用的主要函数

function registerUpsertUVIConsumer() {
    const upsertConsumer = kafkaHelper.getConsumer('UVI');

    upsertConsumer.on('message', async (message) => {
      console.log('KAFKA MESSAGE: upsertUVI');
      try {
        await updateUVIToNav(JSON.parse(message.value));
      } catch (error) {
        console.error('ERROR UPDATING UVI TO NAV:', error.message);
      }
    });

    upsertConsumer.on('error', err => {
      upsertConsumer.close(() => {
        console.log('Consumer closed inside error event handler');
      });
      console.log('RETRY in 30s...');
      setTimeout(() => {
        registerUpsertUVIConsumer();
      }, 30000);
    });
  }

获取消费者
main函数中的函数:kafkahelper.getconsumer('uvi')

getConsumer(topicName) {
        var kafka = require('kafka-node');
        var client = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' });
        const consumer = new Consumer(

            client,
            [
                { topic: topicName, partition: 0}
            ],
            {
                groupId: 'consumer',
                fromOffset: false,
                autoCommit: true
            }

        );

        return consumer;
    }

示例json数据:偏移量16
示例json数据之一,选择偏移量16(主题中的最后一个偏移量)

{
  "seqNo": 21,
  "id": "ff78ab90-d890-11ea-a9e8-fbc4285e2503",
  "companyCode": "EC",
  "dataType": "H",
  "documentType": "Credit Memo",
  "documentRef": "ITN",
  "documentNo": "B200000005",
  "externalDocumentNo": "B200000005",
  "documentDate": "07/08/2020",
  "customerCode": "8921",
  "description": "REN",
  "amount": -40000.00,
  "bizstreamDimension": "UV",
  "costCenter": "",
  "genProdPostingGroup": "",
  "modelCode": "NIS_1",
  "accessoryCode": "",
  "chassisNo": "SN201",
  "registrationNo": "V928",
  "userFieldText1": "1.5E (A)",
  "userFieldText2": "V200000005",
  "customerName": "REN",
  "applyToDocumentType": "Invoice",
  "applyToDocumentNo": "B200000005",
  "chequeNo": "",
  "payeeName": "",
  "paymentMethod": "",
  "userFieldText3": "",
  "userFieldDate1": "",
  "userFieldText4": "",
  "makeCode": "NIS",
  "gstBusPostingGroup": "",
  "gstProdPostingGroup": "",
  "taxCode": "",
  "gstRate": 0.00,
  "gstAmount": 0.00,
  "gstBaseAmount": 0.00,
  "gstCalcType": "0",
  "supplierName": "",
  "invoiceNo": "",
  "supplierBRNNo": "",
  "reservedField2": "",
  "reservedField3": "",
  "reservedField4": "",
  "reservedField5": "",
  "reservedField6": "",
  "reservedField7": "",
  "reservedField8": "",
  "reservedField9": "",
  "reservedField10": "",
  "syncStatus": "failed",
  "deleted": 0,
  "createdBy": "fc1506d0-bdb8-11ea-b025-07e81e3e1b13",
  "updatedBy": "fc1506d0-bdb8-11ea-b025-07e81e3e1b13",
  "createdAt": "2020-08-07 09:33:06",
  "updatedAt": "2020-08-07 09:33:06",
  "fileId": "fda7e331-d890-11ea-a9e8-fbc4285e2503"
}

样本输出
get error from console.error('将uvi更新到导航时出错:',消息);

ERROR UPDATING UVI TO NAV: {
  topic: 'UVI',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 17,
  key: ''
}

无限循环

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题