无法创建cassandra pojo接收器

np8igboo  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(331)

我正在使用flink 1.1.3和cassandra 3.8,我想为精简作业创建一个cassandrasink,所以我必须使用pojo,下面是我得到的:

  1. public class StreamingJob {
  2. public static void main(String[] args) throws Exception {
  3. // set up the streaming execution environment
  4. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. Properties properties = new Properties();
  6. //properties to connect with kafka
  7. DataStream<String> stream = env.addSource(/*kafka connection*/);
  8. CassandraSink.addSink(stream)
  9. .setClusterBuilder(new ClusterBuilder() {
  10. @Override
  11. public Cluster buildCluster(Cluster.Builder builder) {
  12. return builder.addContactPoint("127.0.0.1").build();
  13. }
  14. })
  15. .build();
  16. //print messages
  17. stream.rebalance()
  18. .flatMap(new DeserializeJson())
  19. .filter(new EventFilter())
  20. .<Tuple2<String, String>>project(2, 5)
  21. .keyBy(0)
  22. .print();
  23. // execute program
  24. env.execute("Streaming Job");
  25. }

table呢

  1. @Table(keyspace = "flinktest", name = "eventos")
  2. public class Eventos implements Serializable {
  3. @Column(name = "ad_id")
  4. private byte[] adId;
  5. @Column(name = "event_time")
  6. private String eventTime;
  7. public Eventos(byte[] adId, String eventTime){
  8. this.adId = adId;
  9. this.eventTime = eventTime;
  10. }
  11. public byte[] getAdId() {
  12. return adId;
  13. }
  14. public void setId(byte[] adId) {
  15. this.adId = adId;
  16. }
  17. public String getEventTime() {
  18. return eventTime;
  19. }
  20. public void setEventTime(String eventTime) {
  21. this.eventTime = eventTime;
  22. }
  23. }

当我在没有Cassandra辛的情况下运行它时,我没有任何问题,而且工作通常都是由Kafka提供的。添加Flume时,会出现以下错误:

  1. java.lang.RuntimeException: Cannot create CassandraPojoSink with input: String
  2. at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:53)
  3. at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
  4. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
  5. at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
  6. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
  7. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
  8. at java.lang.Thread.run(Thread.java:745)
  9. Caused by: java.lang.IllegalArgumentException: @Table annotation was not found on type java.lang.String
  10. at com.datastax.driver.mapping.AnnotationChecks.getTypeAnnotation(AnnotationChecks.java:39)
  11. at com.datastax.driver.mapping.AnnotationParser.parseEntity(AnnotationParser.java:50)
  12. at com.datastax.driver.mapping.MappingManager.getMapper(MappingManager.java:154)
  13. at com.datastax.driver.mapping.MappingManager.mapper(MappingManager.java:110)
  14. at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:51)

我不知道为什么“@table”会导致这个问题,因为它是从另一个库(datastax)导入的。你知道怎么解决这个问题吗?这是越来越烦人,因为我不能让它与CassandraFlume和例子,我发现没有帮助太多。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题