如何在kafka中设置消息的大小?

qv7cva1a  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(550)

我现在用的是Kafka0.9.0.1。根据我发现的一些消息源,设置消息大小的方法是在 server.properties .
message.max.字节
replica.fetch.max.字节
fetch.message.max.字节
我的 server.properties 文件实际上有这些设置。

message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760

其他可能相关的设置如下。

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

但是,当我尝试发送有效负载为4到6MB的消息时,消费者永远不会收到任何消息。生产者似乎在发送消息时没有抛出任何异常。如果我确实发送较小的有效负载(比如<1 mb),那么消费者确实会收到消息。
你知道我在配置方面做错了什么吗?
下面是发送消息的示例代码。

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
  File f = new File(dir, s);
  byte[] data = Files.readAllBytes(f.toPath());
  Payload payload = new Payload(data); //a simple pojo to store payload
  String key = String.valueOf(System.currentTimeMillis());
  byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
  producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();

下面是接收消息的示例代码。

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
  ConsumerRecord<String, byte[]> records = consumer.poll(100);
  for(ConsumerRecord<String, byte[]> record : records) {
    long offset = record.offset();
    String key = record.key();
    byte[] val = record.value();
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
    System.out.println(
      System.format("offset=%d, key=%s", offset, key));
  }
}

下面是填充生产者和使用者的属性文件的方法。

public static Properties getProducerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("compression.type", "snappy");
  props.put("max.request.size", 10485760); //need this
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  return props;
}

public static Properties getConsumerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("max.partition.fetch.bytes", 10485760); //need this too
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  return props;
}
ifsvaxew

ifsvaxew1#

max.fetch.bytes选项也是可能的。

n3ipq98p

n3ipq98p2#

简,不要用 fetch.message.max.bytes 首先,因为这是一个来自使用者的属性,不在server.properties文件中;其次,因为是旧版本的使用者,所以使用 max.partition.fetch.bytes 当您创建使用者作为用于示例化它的属性的一部分时。

lp0sw83n

lp0sw83n3#

您需要增加服务器端(如上所述)和客户端。
使用kafka python producer的python示例:

producer = KafkaProducer(bootstrap_servers=brokers, max_request_size=1048576)

将“最大请求大小”增加到所需值,默认值为1048576。

相关问题