我是三叉戟新手。我正在写一个三叉戟拓扑,从Kafka那里读取数据。主题名为“test”。我有当地的Kafka装置。我在当地创办了ZookeeperKafka。并在kafka中创建了一个主题“test”,打开producer并键入消息“hello kafka!”。
我想用三叉戟阅读“测试”主题中的信息“你好,Kafka”。
下面是我的代码。我得到空元组。
TridentTopology topology = new TridentTopology();
BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
.each(new Fields(), new TestFilter()).parallelismHint(1)
.each(new Fields(), new Utils.PrintFilter());
这是我的testfilter类代码
public TestFilter()
{
//
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean isKeep=true;
System.out.println("TestFilter is called...");
if (tuple != null && tuple.getValues().size()>0) {
System.out.println("data from kafka ::: "+tuple.getValues());
}
return isKeep;
}
每当我在kafka producer中输入消息到“test”主题时,首先会打印sysout,但它不会通过if循环。我只是得到消息'testfilter被称为…'不超过这一点。
我想得到我为“测试”主题提供的实际数据。怎样?
1条答案
按热度按时间neekobn81#
问题在于stream.each的参数。该方法的javadoc的相关部分是:
文档对此不太清楚,但语义是应该使用inputfields参数指定过滤器使用的所有字段。
storm将在输入元组上应用投影并将其转发到过滤器。
假设您没有指定任何输入字段,那么投影将导致空元组,从而导致
tuple.getValues().size()>0
过滤器内部状况。值得一提的是,每种方法的其他变体:
这些将在输入元组的投影上应用提供的函数,将结果元组附加到原始输入元组,将新字段重命名为functionfields(即投影仅用于应用函数)。
特别是第二个版本相当于调用inputfields设置为null(或
new Fields()
)并将导致传递给函数的空元组。