postgresql—数据从Kafka流到postgres,几秒钟后丢失

p5fdfcr1  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(284)

我试图保存数据从本地Kafka示例到本地postgres与Spark流。我已经配置了所有的连接和参数,数据实际上已经到达了数据库。然而,它只存在几秒钟。之后,table就空了。如果我在postgres中有数据后立即停止应用程序,数据就会持续存在,所以我想我在spark或kafka配置文件中遗漏了一些流参数。代码是用java编写的,不是scala,所以用dataset代替dataframe。
我试着设置 spark.driver.allowMultipleContexts 但这与上下文无关。当我在后台运行带有完整数据集流的count-on数据库时,总有大约1700条记录,这意味着可能有一些关于批大小的参数。

censusRecordJavaDStream.map(e -> {
    Row row = RowFactory.create(e.getAllValues());
    return row;
}).foreachRDD(rdd -> {
    Dataset<Row> censusDataSet = spark.createDataFrame(rdd, CensusRecord.getStructType());

    censusDataSet
            .write()
            .mode(SaveMode.Overwrite)
            .jdbc("jdbc:postgresql:postgres", "census.census", connectionProperties);
});

我的目标是将Kafka的数据流保存到postgre。每个记录都有唯一的id,这个id在kafka中用作键,因此主键或双项不应该有冲突。对于当前的测试目的,我使用大约100条记录的一小部分;完整的数据集超过300mb。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题