使用https://github.com/sutugin/spark-streaming-jdbc-source中的示例,我尝试连接到Postgres数据库作为AWS数据库中的流源。
我的群集正在运行:11.3 LTS(包括Apache Spark 3.3.0和Scala 2.12)
此库安装在我的群集上:apache. spark网站:Spark流_2.12:3.3.2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredJDBC")
.getOrCreate()
import spark.implicits._
val jdbcOptions = Map(
"user" -> "myusername",
"password" -> "mypassword",
"database" -> "testDB",
"driver" -> "org.postgresql.Driver",
"url" -> "jdbc:postgresql://dbhostname:5432:mem:myDb;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false"
)
// Create DataFrame representing the stream of input lines from jdbc
val stream = spark.readStream
.format("jdbc-streaming")
.options(jdbcOptions + ("dbtable" -> "dimensions_test_table") + ("offsetColumn" -> "loaded_timestamp"))
.load
// Start running the query that prints 'select result' to the console
val query = stream.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
但我被一个错误所困扰:未找到类定义错误:org/apache/spark/sql/源代码/v2/流写入支持原因:未找到类异常:org.apache.spark.sql.sources.v2.StreamWriteSupport
我能找到的有关此错误的唯一信息似乎不适用于我的情况。我错过了什么?
我也找过其他库,但这似乎是Scala 2.12上唯一支持jdbc作为源代码的库。
1条答案
按热度按时间72qzrwbm1#
这里有几个问题:
org.apache.spark:spark-streaming_2.12:3.3.2
库。Databricks运行时包括所有必需的Spark库,并且通过安装开源版本,您很可能会破坏特定于Databricks的修改。如果要从数据库获取更改,可以查看更改数据捕获功能,例如CDC for RDS MySQL。然后,可以将数据放置到S3,并使用Delta Live Tables implementing CDC pattern进行拾取。