dataframe每行迭代保存到cassandra

ocebsuys  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(214)

我有以下代码:-

def writeToCassandra(cassandraConnector: CassandraConnector) = new ForeachWriter[Row] {
override def process(row: Row): Unit = {
println("row is " + row.toString())}
override def close(errorOrNull: Throwable): Unit = {}

override def open(partitionId: Long, version: Long): Boolean =
  true 
}

val conf = new SparkConf()
  .setAppName("Data")
  .set("spark.cassandra.connection.host", "192.168.0.40,192.168.0.106,192.168.0.113")
  .set("spark.cassandra.connection.keep_alive_ms", "20000")
   .set("spark.executor.memory", "1g")
  .set("spark.driver.memory", "2g")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.executor.instances", "9")
  .set("spark.executor.cores", "1")
  .set("spark.cores.max", "9")
  .set("spark.driver.cores", "3")
  .set("spark.ui.port", "4040")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.speculation", "true")

println("Spark Configuration Done")
val spark = SparkSession
  .builder
  .appName("Data")
  .config(conf)
  .master("local[2]")
  .getOrCreate()
println("Spark Session Config Done")

val cassandraConnector = CassandraConnector(conf)
import spark.implicits._
import org.apache.spark.sql.streaming.OutputMode
val dataStream =
  spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "192.168.0.78:9092,192.168.0.78:9093,192.168.0.78:9094")
    .option("subscribe", "historyfleet")
    .load()

val query =
  dataStream
    .writeStream
    .outputMode(OutputMode.Append())
    .foreach(writeToCassandra(cassandraConnector))
    .format("console")
    .start()

query.awaitTermination()
query.stop()

它给出的运行时错误如下:-

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/streaming/Source$class
at org.apache.spark.sql.kafka010.KafkaSource.<init>(KafkaSource.scala:80)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:94)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$1.applyOrElse(StreamingQueryManager.scala:245)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$1.applyOrElse(StreamingQueryManager.scala:241)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.streaming.Source$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more

我的应用程序需要一些时间将Dataframe插入cassandra,所以我试图检查单个迭代是否会加快我的性能,但它给出了上述错误。使用3节点集群-12个执行器,每个执行器有1个核心。它在Cassandra每秒提供6000次插入。需要对此进行优化。有什么建议吗。谢谢,

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题