如何获得原始字段以及在trident中修改的新字段的完整列表?

hsgswve4  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(302)

假设我有一个字段列表,即,{field1,field2,field3,field4}我对field2执行了一些操作,比如我想将每个元组的值加上一些值,比如5,

performed this operation in a function which gave me modified field with "M_field2" as out field name now i want to write complete tuple in a file but in place of field2 i want "M_field2". How i will achieve this.
pexxcrt2

pexxcrt21#

我解决了这个问题。。使用三叉戟只需在输入字段列表中使用修改后的字段名。例如:-

topology.newStream("dummySpout",new DummySpout()).stateQuery(tridentState, new QueryFunctionClass(), new Fields("outLpi","outFileId"))
.each(new Fields("outLpi"),new DBReaderFunction((ArrayList<String>)conf.get("listOfFields")), new Fields((ArrayList<String>)conf.get("listOfFields")))
.each(new Fields((ArrayList<String>)conf.get("listOfFields")), new LoggerFilter())
.aggregate(new Fields("SAL"), new ApplyAggregator(),new Fields("sum"))
.each(new Fields("sum","SAL"),new LoggerFilter());

在最后一行中,“sum”是修改后的字段,sal是原始字段。

lvmkulzt

lvmkulzt2#

从trident api页面上可以看到
一个函数接收一组输入字段并输出零个或多个元组。输出元组的字段附加到流中的原始输入元组。如果函数不发出元组,则过滤掉原始的输入元组。否则,将为每个输出元组复制输入元组
现在从三叉戟教程页面挖掘更多信息,发现了这个
对于分组流,输出将包含分组字段,后跟聚合器发出的字段。例如:

stream.groupBy(new Fields("val1"))
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

在本例中,输出将包含以下字段 "val1" and "sum" .
我不确定,但我能想到的最接近的是

stream.groupBy(new Fields("field1","field3","field4"))
     .aggregate(new Fields("field2"), new Sum(), new Fields("M_field2"))

可能会达到你想要的。如果我错了,请纠正我。

相关问题