我建立了一个风暴三叉戟拓扑如下:
tridentTopology.newStream(spoutId, spout).parallelismHint(spoutParallel)
.each(new Fields("tId", "message"), new VerifyFilter())
.each(new Fields("tId", "message"), new ParseFunction(), new Fields("rowKey", "column", "value"))
.groupBy(new Fields("rowKey", "column"))
.aggregate(new Fields("value"),new Count(),new Fields("count"))
.each(new Fields("rowKey", "column", "count"), new SaveFunction(), new Fields(columns))
.shuffle()
.partitionPersist(factory, new Fields(rowKeyANdColumns), new HBaseUpdater())
.parallelismHint(boltParallel);
util执行 each(new Fields("rowKey", "column", "count"), new SaveFunction(), new Fields(columns)
,我在savefunction中得到了print log,但是partitionpersist没有将日期写入hbase,并且在所有拓扑中都没有错误;
谁能给些建议?
暂无答案!
目前还没有任何答案,快来回答吧!