我试图在cassandra2.0.5,storm版本0.9.0.1中插入一个简单的行到表中。
我的测试如下:
我有一个由id(int)和句子(text)列组成的表。id是主键。
我的喷口生成句子,我添加一个id(代码中的静态增量)。
这是我的拓扑结构:
TridentTopology topology = new TridentTopology();
StateFactory cassandraStateFactory = CassandraMapState.nonTransactional(options);
Fields fields = new Fields("id", "sentence");
MyTridentTupleMapper tupleMapper = new MyTridentTupleMapper(keyspace, fields);
CassandraUpdater updater = new CassandraUpdater(tupleMapper);
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"), new AddId(), new Fields("id"))
.partitionPersist(cassandraStateFactory, fields, updater);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, topology.build());
MyTridentUpleMapper的代码:
https://github.com/guywald/trident-cassandra-read-write-examples/blob/master/src/test/java/com/guywald/storm/trident/cassandra/mytridenttuplemapper.java
我得到以下例外:
2014-02-08 22:20:14 ERROR executor:0 -
java.lang.RuntimeException: java.lang.ClassCastException: storm.trident.state.map.SnapshottableMap cannot be cast to com.hmsonline.storm.cassandra.trident.CassandraState
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
at backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:744)
我不知道为什么它会退回这个,我会很感激你的帮助。
1条答案
按热度按时间tkqqtvp11#
看起来cassandraupdater需要cassandrastate,而cassandramapstate.nontransactional会创建不兼容的snapshottablemap。我相信常见的(或异国风情的)mapstate更新程序将与cassandramapstate一起使用。关于何时使用state vs mapstate,这里有一个很好的解释:https://groups.google.com/forum/#!topic/storm用户/tasr2zwyzks
我认为应该以Cassandra斯塔特工厂为例来说明你的做法。