在Foreach Spark函数中查询并保存到Cassandra

hjzp0vay  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(178)

我需要在Spark/Cassandra环境中执行以下操作:

  • 读取table 1中的最新行,其中包含筛选另一个表2的条件(我使用数据框进行了此操作),到目前为止,此操作运行良好。
  • 对于table 2中的每个筛选行,我需要查询table 3以检查该行的某些状态。
  • 从上一步获得状态后,我需要将一些数据保存到另外三个表中。

问题是我需要查询cassandra表并保存到foreach函数中的其他cassandra表中,这意味着我必须传递Java Spark Context,但不幸的是,它是不可序列化的(请参阅相关问题here),因此我得到了著名的异常:
异常错误:org.apache.spark.api.java.JavaSparkContext
序列化堆栈:
对象不可序列化(类:.......................................................................................................................................................................................................
我已经实现了一个新的类implements ForeachFunction<Row>,并使Java Spark上下文成为局部变量,但我仍然得到相同的异常。
现在,有些人可能会说,我必须使foreach函数成为静态的,但这是不可能的,因为我必须向它传递一个对象,以帮助保存/查询cassandra表的逻辑,必须有一个解决方案,这种情况?
但我不确定我在这里错过了什么。

a11xaf1n

a11xaf1n1#

在执行器上使用Spark上下文是不可能的。但至少有两个解决方案可以解决您的问题:

  • 收集 Dataframe 并在驱动程序上运行本地foreach(但这将使对cassandra的调用逐个运行,可能会非常慢)
  • 使用连接将表1、表2和表3上的所有操作合并到一个 Dataframe 中。然后将cassandra DF与这些 Dataframe 连接,执行过滤并保存到执行器上的cassandra(这将是最快的并行解决方案,但需要一些额外的编码)

相关问题