spark如何确保所有消息都写入kafka,然后只设置偏移量

9rygscc1  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(506)

我试图理解spark-kafka集成,并编写了一个示例工作,其内容如下:
从给定的时间戳中读取postgresql db中的值。
把数据写进Kafka。
将最新的时间戳写入postgresql,以便在下一次执行期间考虑该时间戳。
代码:

package com.r2d2.spark.db

import java.sql.{DriverManager, ResultSet}
import java.time.LocalDateTime
import org.apache.spark.sql.SparkSession;

object TestJob extends App {

val spark = SparkSession.builder.master("local[3]").appName("TestData").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val startTime = System.currentTimeMillis();

import org.apache.spark.sql.functions.{to_json, struct, col, lit, max}

val createDF = 
spark.read.format("jdbc").option("url","jdbc:postgresql://test:5432/analytics")
.option("user", "test")
.option("password", "test")
.option("query", "select key, value from test.sample_table_2 where updated_at >= (select updated_at from test.sample_table order by updated_at desc 1) order by id desc limit 2")
.load()

createDF.selectExpr("CAST(uid AS STRING) AS key", "CAST(value AS STRING) AS value")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test2")
  .save()

val date = createDF.agg(max("updated_at")).alias("updated_at").collect()
.map(_.getTimestamp(0)).mkString

val connection = DriverManager.getConnection("jdbc:postgresql://test:5432/analytics", 
"test", "test")
connection.setAutoCommit(true)
val statement = connection.createStatement()
statement.execute(s"update test.sample_table set updated_at = '${date}' where id = 1")
}

我的问题是,如果在写给Kafka的过程中,Kafka集群崩溃了,那么会有一半的消息被写给Kafka,或者是全部或者没有。
基本上,我想确保所有的消息都被写入kafka,然后在db中只更新时间戳。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题