如何使用ApacheFlink删除cassandra中的一行?

6vl6ewon  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(356)

在Apache·Flink,通过 CassandraSink 在cassandra中插入一行。但我找不到删除行的方法。
我也试着写自定义Flume,但我得到了 NotSerializableException . 如何构造删除操作的代码?

  1. public class MyCassandraSink implements SinkFunction<String> {
  2. private Cluster cluster = Cluster.builder()
  3. .addContactPoint("127.0.0.1")
  4. .build();
  5. private Session cassandra = cluster.connect("mykeyspace");
  6. @Override
  7. public void invoke(String value, Context context) throws Exception {
  8. cassandra.execute("SOME DELETE QUERY");
  9. }
  10. }
  1. Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [com.datastax.driver.core.SessionManager@3b0fe47a] is not serializable. The object probably contains or references non serializable fields.
  2. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
  3. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
  4. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
  5. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
  6. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
  7. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
  8. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
  9. at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
  10. at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
  11. at com.meshkan.streaming.entry.EventListener.main(EventListener.java:42)
  12. Caused by: java.io.NotSerializableException: com.datastax.driver.core.SessionManager
  13. at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  14. at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
  15. at java.util.concurrent.CopyOnWriteArrayList.writeObject(CopyOnWriteArrayList.java:973)
  16. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  17. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  18. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  19. at java.lang.reflect.Method.invoke(Method.java:498)
  20. at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
  21. at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
  22. at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  23. at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  24. at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  25. at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  26. at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  27. at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  28. at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
  29. at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
  30. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
  31. ... 9 more
xggvc2p6

xggvc2p61#

我找到了解决办法,但我不喜欢。cassandrapojoinputformat可用于删除和更新行(我还将其用于select,顾名思义,这似乎就是它的本意。)
事实上,这是唯一的救赎美德。我一直在用这个直到我找到一个优雅的解决方案。我还在找。。。

  1. CassandraPojoInputFormat<MyThingyConnector> myThingyCassandraPojoInputFormat =
  2. new CassandraPojoInputFormat<MyThingyConnector>(
  3. "DELETE FROM " + dbKeyspace + ".<table_name> <where clause>",
  4. clusterBuilder,
  5. MyThingyConnector.class);
  6. myThingyCassandraPojoInputFormat.configure(null);
  7. myThingyCassandraPojoInputFormat.open(cassandraInputSplit);
  8. myThingyCassandraPojoInputFormat.close();
nxagd54h

nxagd54h2#

要实现自己的insert vs delete逻辑,请创建一个扩展 CassandraSinkBase ,并实施 send() 方法。看到了吗 AbstractCassandraTupleSink 举个例子。注意如何 CassandraSinkBase 通过使cassandra客户机暂时化并在 open() 打电话。

相关问题