我是scala/spark新手。我继承了一个旧代码,我已经对它进行了重构,并试图使用它从“锡拉”检索数据。代码如下所示:
val TEST_QUERY = s"SELECT user_id FROM test_table WHERE name = ? AND id_type = 'test_type';"
var selectData = List[Row]()
dataRdd.foreachPartition {
iter => {
// Build up a cluster that we can connect to
// Start a session with the cluster by connecting to it.
val cluster = ScyllaConnector.getCluster(clusterIpString, scyllaPreferredDc, scyllaUsername, scyllaPassword)
var batchCounter = 0
val session = cluster.connect(tableConfig.keySpace)
val preparedStatement: PreparedStatement = session.prepare(TEST_QUERY)
iter.foreach {
case (test_name: String) => {
// Get results
val testResults = session.execute(preparedStatement.bind(test_name))
if (testResults != null){
val testResult = testResults.one()
if(testResult != null){
val user_id = testResult.getString("user_id")
selectData ::= Row(user_id, test_name)
}
}
}
}
session.close()
cluster.close()
}
}
println("Head is =======> ")
println(selectData.head)
由于 selectedData
列表为空,尽管其中有与select语句匹配的数据。我觉得我这样做是不正确的,但不能找出什么需要改变,以得到这个固定,所以任何帮助是非常感谢。
ps:我使用一个列表来保存结果的整个想法是这样我就可以使用这个列表来创建一个Dataframe。如果你能给我指出正确的方向,我将不胜感激。
1条答案
按热度按时间von4xj4u1#
如果您查看foreachpartition函数的定义,您将看到,根据定义,它不能返回任何内容,因为它的返回类型是
void
.不管怎样,从spark查询cassandra/scylla的数据是一种非常糟糕的方法。由于协议的兼容性,spark cassandra连接器也应该能够与scylla一起工作。
要从cassandra读取Dataframe,只需执行以下操作:
文档非常详细,所以请阅读。