java

wmvff8tz  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(515)

我使用datastax驱动程序将cassandra用作apache flink的某些数据流的接收器:我在执行应用程序时遇到问题,在运行时引发了一个关于队列的错误,该错误在几秒钟后变满。我发现默认值是256,这对于我的负载来说可能太低了,所以我使用pooligoptions设置maxrequestsperconnection来提高它,如下所示:http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/.
不幸的是,对于以下代码,我在启动时出现以下错误:

  1. The implementation of the ClusterBuilder is not serializable.
  2. The object probably contains or references non serializable fields.

我的代码:

  1. PoolingOptions poolingOptions = new PoolingOptions();
  2. poolingOptions
  3. .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
  4. .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);
  5. ClusterBuilder cassandraBuilder = new ClusterBuilder() {
  6. @Override
  7. public Cluster buildCluster(Cluster.Builder builder) {
  8. return builder.addContactPoint(CASSANDRA_ADDRESS)
  9. .withPort(CASSANDRA_PORT)
  10. .withPoolingOptions(poolingOptions)
  11. .build();
  12. }
  13. };
  14. sinkBuilderNormalStream
  15. .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
  16. + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
  17. + " VALUES (?, ?, ?, ?, ?, ?);")
  18. .setClusterBuilder(cassandraBuilder)
  19. .build();

我该怎么处理?

hk8txs48

hk8txs481#

您必须在clusterbuilder#buildcluster中定义pooligoptions。

相关问题