如何从默认设置设置spoutconfig?

jum4pzuy  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(539)

我正在尝试使用graphapi获取fb页面数据。每个帖子的大小超过1mb,其中kafka default fetch.message为1mb。通过在kafa consumer.properties和server.properties文件中添加以下行,我将kafka属性从1mb更改为3mb。

fetch.message.max.bytes=3048576 (consumer.properties)
file message.max.bytes=3048576 (server.properties)
replica.fetch.max.bytes=3048576 (server.properties )

现在,在kafka中添加上述行之后,3mb消息数据将进入kafka数据日志。但是storm无法处理这些数据,它只能读取默认大小,即1mb数据。我应该向storm拓扑中添加哪些参数来读取来自kafka主题的3mb数据。我需要在storm中增加buffer.size吗?我不清楚。
这是我的拓扑代码。

String argument = args[0];
    Config conf = new Config();
    conf.put(JDBC_CONF, map);
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    //set the number of workers
    conf.setNumWorkers(3);

    TopologyBuilder builder = new TopologyBuilder();       
   //Setup Kafka spout
    BrokerHosts hosts = new ZkHosts("localhost:2181");
    String topic = "year1234"; 
    String zkRoot = "";
    String consumerGroupId = "group1";
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);

        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("KafkaSpout", kafkaSpout,1);        builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");        builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");

提前谢谢

643ylb08

643ylb081#

类spoutconfig扩展了kafkanconfig,它具有以下所有设置:

public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;

请注意,它们是公共的,因此您可以更改它们

spoutConfig.fetchSizeBytes = 3048576;
spoutConfig.bufferSizeBytes = 3048576;

请看这里:http://grepcode.com/file/repo1.maven.org/maven2/org.apache.storm/storm-kafka/0.9.2-incubating/storm/kafka/kafkaconfig.java#kafkaconfig

相关问题