使用三叉戟拓扑从Kafka读取数据时清空数据

im9ewurl  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(310)

我是三叉戟新手。我正在写一个三叉戟拓扑,从Kafka那里读取数据。主题名为“test”。我有当地的Kafka装置。我在当地创办了ZookeeperKafka。并在kafka中创建了一个主题“test”,打开producer并键入消息“hello kafka!”。
我想用三叉戟阅读“测试”主题中的信息“你好,Kafka”。
下面是我的代码。我得到空元组。

TridentTopology topology = new TridentTopology();
    BrokerHosts brokerHosts = new ZkHosts("localhost:2181");

    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.forceFromStart = false;
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

    topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
      .each(new Fields(), new TestFilter()).parallelismHint(1)
      .each(new Fields(), new Utils.PrintFilter());

这是我的testfilter类代码

public TestFilter()
{
    //
}

@Override
public boolean isKeep(TridentTuple tuple) {
    boolean isKeep=true;
    System.out.println("TestFilter is called...");
    if (tuple != null && tuple.getValues().size()>0) {
        System.out.println("data from kafka ::: "+tuple.getValues());
    } 
    return isKeep;
}

每当我在kafka producer中输入消息到“test”主题时,首先会打印sysout,但它不会通过if循环。我只是得到消息'testfilter被称为…'不超过这一点。
我想得到我为“测试”主题提供的实际数据。怎样?

neekobn8

neekobn81#

问题在于stream.each的参数。该方法的javadoc的相关部分是:

each(Fields inputFields, Filter filter)

文档对此不太清楚,但语义是应该使用inputfields参数指定过滤器使用的所有字段。
storm将在输入元组上应用投影并将其转发到过滤器。
假设您没有指定任何输入字段,那么投影将导致空元组,从而导致 tuple.getValues().size()>0 过滤器内部状况。
值得一提的是,每种方法的其他变体:

each(Fields inputFields, Function function, Fields functionFields)
each(Function function, Fields functionFields)

这些将在输入元组的投影上应用提供的函数,将结果元组附加到原始输入元组,将新字段重命名为functionfields(即投影仅用于应用函数)。
特别是第二个版本相当于调用inputfields设置为null(或 new Fields() )并将导致传递给函数的空元组。

相关问题