我试图理解spark-kafka集成,并编写了一个示例工作,其内容如下:
从给定的时间戳中读取postgresql db中的值。
把数据写进Kafka。
将最新的时间戳写入postgresql,以便在下一次执行期间考虑该时间戳。
代码:
package com.r2d2.spark.db
import java.sql.{DriverManager, ResultSet}
import java.time.LocalDateTime
import org.apache.spark.sql.SparkSession;
object TestJob extends App {
val spark = SparkSession.builder.master("local[3]").appName("TestData").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val startTime = System.currentTimeMillis();
import org.apache.spark.sql.functions.{to_json, struct, col, lit, max}
val createDF =
spark.read.format("jdbc").option("url","jdbc:postgresql://test:5432/analytics")
.option("user", "test")
.option("password", "test")
.option("query", "select key, value from test.sample_table_2 where updated_at >= (select updated_at from test.sample_table order by updated_at desc 1) order by id desc limit 2")
.load()
createDF.selectExpr("CAST(uid AS STRING) AS key", "CAST(value AS STRING) AS value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test2")
.save()
val date = createDF.agg(max("updated_at")).alias("updated_at").collect()
.map(_.getTimestamp(0)).mkString
val connection = DriverManager.getConnection("jdbc:postgresql://test:5432/analytics",
"test", "test")
connection.setAutoCommit(true)
val statement = connection.createStatement()
statement.execute(s"update test.sample_table set updated_at = '${date}' where id = 1")
}
我的问题是,如果在写给Kafka的过程中,Kafka集群崩溃了,那么会有一半的消息被写给Kafka,或者是全部或者没有。
基本上,我想确保所有的消息都被写入kafka,然后在db中只更新时间戳。
暂无答案!
目前还没有任何答案,快来回答吧!