我从kafka主题得到3mb的消息流,但是默认值是1mb。现在,通过在kafa consumer.properties和server.properties文件中添加以下行,我将kafka属性从1mb更改为3mb。
fetch.message.max.bytes=2048576 ( consumer.properties )
filemessage.max.bytes=2048576 ( server.properties )
replica.fetch.max.bytes=2048576 ( server.properties )
现在,在kafka中添加上述行之后,3mb消息数据将进入kafka数据日志。但storm无法处理3mb数据,只能读取默认大小的数据,即1mb数据。
因此,如何更改这些配置以处理/读取3mb数据。这是我的拓扑类。
String argument = args[0];
Config conf = new Config();
conf.put(JDBC_CONF, map);
conf.setDebug(true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//set the number of workers
conf.setNumWorkers(3);
TopologyBuilder builder = new TopologyBuilder();
//Setup Kafka spout
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "year1234";
String zkRoot = "";
String consumerGroupId = "group1";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("KafkaSpout", kafkaSpout,1);
builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");
builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");
1条答案
按热度按时间yfjy0ee71#
在下面添加以下行