kafka生成消息键作为字符串,即使rest程序有int?

lsmd5eda  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(417)

我正在使用以下程序制作Kafka的唱片:

import java.io.IOException;
import java.security.SecureRandom;

public class SensorStatusProducer {
  private final static String TOPIC = "SENSOR_STATUS_DETAILS";
  private final static String PRODUCER_URI = "http://xxx.xxx.xxx.xxx:8082/topics/" + TOPIC;
  private final static SecureRandom randomNumber = new SecureRandom();
  private final static SensorDetails sensorDetails = new SensorDetails();

  public static void main(String[] args) {
    int[] sensorid = sensorDetails.getSensorid(); //this will return [1001,1002,1003,1004,1005]
    try {
      HttpRestProxyUtil rest = new HttpRestProxyUtil(); //this is defined in another class
      for (int sid : sensorid) {
        rest.produceRecords(PRODUCER_URI, String.format("{\"records\":[{\"key\": %d," +
          "\"value\":{" +
          "\"sensorid\":%d," +
          "\"status\":%s," +
          "\"lastconnectedtime\":%s}}]}", sid, sid, "\"CONNECTED\"", String.format("\"%s\"", sensorDetails.currentTimestamp()))); //currentTimestamp() function in defined in another class
      }
    } catch (InterruptedException | IOException me) {
      me.printStackTrace();
    }
  }
}

键的格式说明符为%d,但生成的记录具有字符串类型的键。
这一点从以下几点可以看出:
尝试制作表格时:

CREATE TABLE STATUS_IB_TABLE (ROWKEY INT KEY,
  sensorid INTEGER,
  status VARCHAR,
  lastconnectedtime STRING)
WITH (TIMESTAMP='lastconnectedtime', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', KAFKA_TOPIC='SENSOR_STATUS_DETAILS', VALUE_FORMAT='JSON', KEY='sensorid');


正如@andrew coates指出的那样,键被序列化为字符串
我不知道怎么可能。
有人能帮我澄清一下吗,我做错了什么?
附言:
=>这是我之前问题ksqldb未正确获取rowkey的后续问题
=>汇合平台版本:5.5
=>这是程序的主类。

v1uwarro

v1uwarro1#

rest代理支持各种内容类型,但不包括用于写入序列化32位整数的基元类型。
因此,您的代码使用字符串键为主题生成数据。有关如何生成int的示例,请参见此处使用 kafkacat .
因为您使用的是java,所以可以使用本机java producer api精确地控制如何将数据生成到kafka(这也比rest api更具性能和灵活性)。

相关问题