我正在设置一个storm集群来计算实时趋势和其他统计数据,但是我在将“恢复”功能引入到这个项目中时遇到了一些问题,因为它允许 kafka-spout
(的源代码) kafka-spout
来自https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)被记住。我开始我的工作 kafka-spout
这样:
BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);
默认设置应该是这样做的,但我认为在我的情况下不是这样做的,每次我开始我的项目时 PartitionManager
尝试查找具有偏移量的文件,但未找到任何内容:
2014-06-25 11:57:08 INFO PartitionManager:73 - Read partition information from: /storm/partition_1 --> null
2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found, using configuration to determine offset
然后从最新的偏移量开始读取。如果我的项目从来没有失败过也没关系,但不是我想要的。
我也看了一下 PartitionManager
使用的类 Zkstate
类来从以下代码段写入偏移量:
分区管理器
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
_committedTo = lastCompletedOffset;
LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}
ZK状态
public void writeBytes(String path, byte[] bytes) {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, bytes);
} else {
_curator.setData().forPath(path, bytes);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
我可以看到第一条信息 writeBytes
方法进入 if
块并尝试创建路径,然后对于第二条消息,它进入 else
方块,看起来没问题。但是当我再次启动这个项目时,上面提到的信息就会出现。不 partition information
可以找到。
2条答案
按热度按时间5jdjgkvh1#
我也有同样的问题。原来我是在本地模式下运行的,它使用内存中的zookeeper,而不是kafka使用的zookeeper。
以确保Kafka斯波特不会使用风暴的Zookeeper
ZkState
如果要存储偏移量,则需要设置SpoutConfig.zkServers
,SpoutConfig.zkPort
,和SpoutConfig.zkRoot
除了ZkHosts
. 例如eagi6jfj2#
我想你是遇到了这个问题:
https://community.hortonworks.com/questions/66524/closedchannelexception-kafka-spout-cannot-read-kaf.html
上面同事的评论解决了我的问题。我添加了一些新的库。