如何更改默认kafka spoutconfig类

svujldwt  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(373)

我从kafka主题得到3mb的消息流,但是默认值是1mb。现在,通过在kafa consumer.properties和server.properties文件中添加以下行,我将kafka属性从1mb更改为3mb。

fetch.message.max.bytes=2048576 ( consumer.properties )
filemessage.max.bytes=2048576 ( server.properties )
replica.fetch.max.bytes=2048576 ( server.properties )

现在,在kafka中添加上述行之后,3mb消息数据将进入kafka数据日志。但storm无法处理3mb数据,只能读取默认大小的数据,即1mb数据。
因此,如何更改这些配置以处理/读取3mb数据。这是我的拓扑类。

String argument = args[0];
    Config conf = new Config();
    conf.put(JDBC_CONF, map);
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    //set the number of workers
    conf.setNumWorkers(3);

    TopologyBuilder builder = new TopologyBuilder();

    //Setup Kafka spout
    BrokerHosts hosts = new ZkHosts("localhost:2181");
    String topic = "year1234"; 
    String zkRoot = "";
    String consumerGroupId = "group1";
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);

        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("KafkaSpout", kafkaSpout,1);

    builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");

    builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");
yfjy0ee7

yfjy0ee71#

在下面添加以下行

SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);

spoutConfig.fetchSizeBytes = 3048576;
spoutConfig.bufferSizeBytes = 3048576;

相关问题