使用apacheflinkjavaapi将数据读写到cassandra

mwg9r5ms  于 2021-06-25  发布在  Flink
关注(0)|答案(4)|浏览(381)

我打算使用apache flink将数据读/写到cassandra中。我希望使用flink连接器cassandra,我没有找到连接器的好文档/示例。
你能告诉我使用ApacheFlink从cassandra读写数据的正确方法吗。我只看到一个例子,纯粹是为了写?apache flink是否也适用于从cassandra读取数据,类似于apache spark?

1cosmwyk

1cosmwyk1#

ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("localhost").withPort(9042).build();
        }
    };

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));

    DataStreamSource t = env.createInput(inputFormat,  TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
    tableEnv.registerDataStream("t1",t);
    Table t2 = tableEnv.sql("select * from t1");

    t2.printSchema();
xdyibdwo

xdyibdwo2#

我也有同样的问题,这就是我要找的。我不知道它是否对你的需要过于简化了,但我想我还是应该展示给你看。

ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("urlToUse.com").withPort(9042).build();
        }
    };

    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);

    cassandraInputFormat.configure(null);
    cassandraInputFormat.open(null);

    Tuple2<String, String> testOutputTuple = new Tuple2<>();
    cassandraInputFormat.nextRecord(testOutputTuple);

    System.out.println("column1: " + testOutputTuple.f0);
    System.out.println("column2: " + testOutputTuple.f1);

我发现这一点的原因是找到了“cassandrainputformat”类的代码,并看到了它是如何工作的(http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/cassandrainputformat.java). 我真的希望它只是一种形式,而不是基于Cassandra名字的完整的阅读课程,我有一种感觉其他人可能也在想同样的事情。

qlzsbp2j

qlzsbp2j3#

你可以用 RichFlatMapFunction 扩展类

class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
  var userCollection: MongoCollection[Document] = _
  override def open(parameters: Configuration): Unit = {
// do something here like opening connection
    val client: MongoClient = MongoClient("mongodb://localhost:10000")

    userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
    super.open(parameters)
  }
  override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {

// Do something here per record and this function can make use of objects initialized via open
      userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
        (result: Document) => {
//          println(result)
                      },
      (t: Throwable) =>{
        println(t)
      },
        ()=>{
          out.collect(event)
        }
      )
    }

  }

}

基本上 open 每个worker和 flatmap 按记录执行。这个例子适用于mongo,但也可以类似地用于cassandra

s4chpxco

s4chpxco4#

在你的情况下,据我所知,你管道的第一步是从Cassandra那里读取数据,而不是编写一个 RichFlatMapFunction 你应该自己写 RichSourceFunction 作为参考,您可以看看wikipediaeditssource的简单实现。

相关问题