使用flink将pojo保存到cassandra

mpgws1up  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(266)

我是Flink的新手,我想把Kafka的流式数据存储到Cassandra。我已经把字符串转换成pojo了。我的工作如下,

  1. @Table(keyspace = "sample", name = "contact")
  2. public class Person implements Serializable {
  3. private static final long serialVersionUID = 1L;
  4. @Column(name = "name")
  5. private String name;
  6. @Column(name = "timeStamp")
  7. private LocalDateTime timeStamp;

我的转变过程如下,

  1. stream.flatMap(new FlatMapFunction<String, Person>() {
  2. public void flatMap(String value, Collector<Person> out) {
  3. try {
  4. out.collect(objectMapper.readValue(value, Person.class));
  5. } catch (JsonProcessingException e) {
  6. e.printStackTrace();
  7. }
  8. }
  9. }).print(); // I need to use proper method to convert to Datastream.
  10. env.execute();

我阅读了以下链接上的文档以供参考,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html
cassandra接收器接受datastream示例。我需要转换我的转换和存储到Kafka他们。
不能创造CassandrapojoFlume也给了我一些想法。
有方法 .forward() 它回来了 DataStream<Reading> forward ,将示例传递给时,

  1. CassandraSink.addSink(forward)
  2. .setHost("localhost")
  3. .build();
  1. cannot access org.apache.flink.streaming.api.scala.DataStream

如何将我的pojo转换为cassandra中的store?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题