有没有办法找出cassandra中select语句使用了哪个节点?

14ifxucb  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(276)

我已经为编写了一个自定义LoadBalancer策略 spark-cassandra-connector 现在我要确保它真的有效!
我有一个cassandra集群,有3个节点和一个复制因子为2的键空间,所以当我们要检索一个记录时,cassandra上只有两个节点保存数据。
问题是我想确保 spark-cassandra-connector (使用我的负载平衡器策略)仍然是令牌感知的,并且将为每个“select”语句选择正确的节点作为协调器。
现在,我在想,如果我们可以为每个节点在select语句上写一个触发器,如果节点没有保存数据,触发器将创建一个日志,我意识到负载均衡器策略不能正常工作。我们如何在cassandra的select上写触发器?有没有更好的方法来实现这一点?
我已经查看了创建触发器的文档,这些文档太有限了:
正式文件
税务文件
官方回购实施示例

mbyulnm0

mbyulnm01#

您可以从程序端执行此操作,如果您获得绑定语句的路由密钥(您必须使用准备好的语句),请通过元数据类找到它的副本,然后比较此主机是否在您可以从中获取的executioninfo中 ResultSet .

o75abkj4

o75abkj42#

根据亚历克斯所说,我们可以做如下:
创建sparksession后,我们应该创建一个连接器:

import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)

现在我们可以定义一个preparedstatement并执行其余操作:

connector.withSessionDo(session => {

    val selectQuery = "select * from test where id=?"
    val prepareStatement = session.prepare(selectQuery)
    val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
    // We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
    val boundStatement = prepareStatement.bind(s"$id")
    val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
    // We can get tha all of nodes that contains the row
    val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
    val resultSet = session.execute(boundStatement)

    // We can get the node which gave us the row
    val host = resultSet.getExecutionInfo.getQueriedHost

    // Final step is to check whether the replicas contains the host or not!!!
    if (replicas.contains(host)) println("It works!")
  })

重要的是,我们必须显式地绑定分区键基于它们的所有参数(即,我们不能在select语句中设置它们的编码),否则routingkey将为null。

相关问题