我正在运行一个风暴三叉戟拓扑结构,在两条不同的溪流中有两个不同的喷口。我的喷口是jms喷口,使用hdfs状态来持久化元组。
如果我只运行一个spout,我就可以在hdfs中将所有记录发布到jms队列。
当运行拓扑时,两个喷口连接到两个不同的队列,我得到的记录比我在队列中发布的要少。我做错什么了吗。请让我知道,如果有任何问题,在我这样做。
TridentTopology topology = new TridentTopology();
topology.newStream("QueueSpout", TridentSpout).partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());
Stream TopicStream = topology.newStream("TopicSpout", TridentTopicSpout);
TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());
1条答案
按热度按时间xn1cxnb41#
下面的配置可以很好地处理拓扑。
这是因为我没有使用分区分组。
使用global和batch global后工作正常。
洗牌:使用随机循环算法在所有目标分区上均匀地重新分配元组
广播:每个元组被复制到所有目标分区。这在drpc期间非常有用——例如,如果您需要对每个数据分区执行statequery。
partitionby:partitionby接受一组字段并基于该组字段进行语义分区。根据目标分区的数量对字段进行散列和修改,以选择目标分区。partitionby保证同一组字段总是指向同一个目标分区。
全局:所有元组都发送到同一分区。为流中的所有批处理选择相同的分区。
batchglobal:批处理中的所有元组都发送到同一分区。流中的不同批处理可能会转到不同的分区。
分区:这个方法接受一个实现backtype.storm.grouping.customstreamgrouping的自定义分区函数
下面的拓扑配置工作正常。