scala—从cassandra读取数据,以便在flink中进行处理

iszxjhcz  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(317)

我必须使用flink作为流引擎来处理来自kafka的数据流。为了对数据进行分析,我需要在cassandra中查询一些表。最好的方法是什么?我一直在寻找scala中此类案例的例子。但是我什么也找不到。用scala作为编程语言,如何在flink中读取来自cassandra的数据?使用apacheflinkjavaapi将数据读写到cassandra还有一个问题。答案中提到了多种方法。我想知道对我来说什么是最好的方法。而且,大多数可用的示例都是java的。我正在寻找scala的例子。

6xfqseft

6xfqseft1#

我目前在flink1.3中使用asyncio阅读了cassandra的文章。以下是相关文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html (如果它有databaseclient,您将改用com.datastax.drive.core.cluster)
如果您需要一个更深入的示例来使用它来阅读cassandra的文章,请告诉我,但不幸的是,我只能提供一个java示例。
编辑1
下面是我使用flink的异步i/o从cassandra读取的代码示例。我仍在确定和修复一个问题,由于某种原因(不深入探讨),对于单个查询返回的大量数据,异步数据流的超时会被触发,即使cassandra看起来返回得很好,而且远远早于超时时间。但假设这只是我正在做的其他事情的一个bug,而不是因为这段代码,那么这对您来说应该很好(对我来说也已经好几个月了):

public class GenericCassandraReader extends RichAsyncFunction<CustomInputObject, ResultSet> {

    private final Properties props;
    private Session client;

    public GenericCassandraReader(Properties props) {
        super();
        this.props = props;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        client = Cluster.builder()
                .addContactPoint(props.cassandraUrl)
                .withPort(props.cassandraPort)
                .build()
                .connect(props.cassandraKeyspace);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(final CustomInputObject customInputObject, final AsyncCollector<ResultSet> asyncCollector) throws Exception {

        String queryString = "select * from table where fieldToFilterBy='" + customInputObject.id() + "';";

        ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(queryString);

        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {

            public void onSuccess(ResultSet resultSet) {
                asyncCollector.collect(Collections.singleton(resultSet));
            }

            public void onFailure(Throwable t) {
                asyncCollector.collect(t);
            }
        });
    }
}

再次抱歉耽搁了。我希望能解决这个错误,所以我可以肯定,但在这一点上只是有一些参考将比没有更好。
编辑2
所以我们最终确定问题不在于代码,而在于网络吞吐量。很多字节试图通过一个不够大的管道来处理它,一些字节开始备份,一些字节开始慢慢进入,但是(多亏了datastax cassandra驱动程序的querylogger,我们可以看到这个)接收每个查询结果所用的时间开始攀升到4秒,然后是6秒,然后是8秒,以此类推。
热释光;dr,代码很好,只要注意如果您遇到来自flink的asyncwaitoperator的timeoutexceptions,可能是网络问题。
编辑2.5
我们还意识到,由于网络延迟问题,我们最终使用richmapfunction来保存从cassandra读取的数据,这可能是有益的。所以作业只是跟踪所有通过它的记录,而不是每次通过一个新记录时都要从表中读取以获取其中的所有记录。

相关问题