我按照here的描述实现了一个到Kafka流的连接,现在我尝试使用Jdbc sink将数据写入postgres数据库。
现在Kafka的源代码看起来没有类型,所以当为SQL编写语句时,它看起来都像Nothing
类型。
我如何使用fromSource
,我实际上有一个Kafka的类型化源代码?
到目前为止,我尝试了以下方法:
object Main {
def main(args: Array[String]) {
val builder = KafkaSource.builder
builder.setBootstrapServers("localhost:29092")
builder.setProperty("partition.discovery.interval.ms", "10000")
builder.setTopics("created")
builder.setBounded(OffsetsInitializer.latest)
builder.setStartingOffsets(OffsetsInitializer.earliest)
builder.setDeserializer(KafkaRecordDeserializationSchema.of(new CreatedEventSchema))
val source = builder.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val streamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks, "Kafka Source")
streamSource.addSink(JdbcSink.sink(
"INSERT INTO conversations (timestamp, active_conversations, total_conversations) VALUES (?,?,?)",
(statement, event) => {
statement.setTime(1, event.date)
statement.setInt(1, event.a)
statement.setInt(3, event.b)
},JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/reporting")
.withDriverName("org.postgresql.Driver")
.withUsername("postgres")
.withPassword("veryverysecret:-)")
.build()
))
env.execute()
}
}
它不能编译,因为event
是Nothing类型,但我认为它不应该是这样,因为使用CreatedEventSchema,Flink应该能够反序列化,也许需要注意的是,实际上我只想处理Kafka消息的值。
1条答案
按热度按时间7y4bm7vi1#
In Java you might do something like this:
with a value deserializer along these lines:
Sorry I don't have a Scala example handy, but hopefully this will point you in the right direction.