Apache Flink类型化KafkaSource

k2fxgqgv  于 2022-12-16  发布在  Apache
关注(0)|答案(1)|浏览(133)

我按照here的描述实现了一个到Kafka流的连接,现在我尝试使用Jdbc sink将数据写入postgres数据库。
现在Kafka的源代码看起来没有类型,所以当为SQL编写语句时,它看起来都像Nothing类型。
我如何使用fromSource,我实际上有一个Kafka的类型化源代码?
到目前为止,我尝试了以下方法:

object Main {
  def main(args: Array[String]) {
    val builder = KafkaSource.builder
    builder.setBootstrapServers("localhost:29092")
    builder.setProperty("partition.discovery.interval.ms", "10000")
    builder.setTopics("created")
    builder.setBounded(OffsetsInitializer.latest)
    builder.setStartingOffsets(OffsetsInitializer.earliest)
    builder.setDeserializer(KafkaRecordDeserializationSchema.of(new CreatedEventSchema))
    val source = builder.build()
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val streamSource = env
      .fromSource(source, WatermarkStrategy.noWatermarks, "Kafka Source")
    streamSource.addSink(JdbcSink.sink(
        "INSERT INTO conversations (timestamp, active_conversations, total_conversations) VALUES (?,?,?)",
        (statement, event) => {
          statement.setTime(1, event.date)
          statement.setInt(1, event.a)
          statement.setInt(3, event.b)
        },JdbcExecutionOptions.builder()
          .withBatchSize(1000)
          .withBatchIntervalMs(200)
          .withMaxRetries(5)
          .build(),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
          .withUrl("jdbc:postgresql://localhost:5432/reporting")
          .withDriverName("org.postgresql.Driver")
          .withUsername("postgres")
          .withPassword("veryverysecret:-)")
          .build()

      ))
    env.execute()
  }
}

它不能编译,因为event是Nothing类型,但我认为它不应该是这样,因为使用CreatedEventSchema,Flink应该能够反序列化,也许需要注意的是,实际上我只想处理Kafka消息的值。

7y4bm7vi

7y4bm7vi1#

In Java you might do something like this:

KafkaSource<Event> source =
    KafkaSource.<Event>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics(TOPIC)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new EventDeserializationSchema())
            .build();

with a value deserializer along these lines:

public class EventDeserializationSchema extends AbstractDeserializationSchema<Event> {
    private static final long serialVersionUID = 1L;

    private transient ObjectMapper objectMapper;

    @Override
    public void open(InitializationContext context) {
        objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
    }

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, Event.class);
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

Sorry I don't have a Scala example handy, but hopefully this will point you in the right direction.

相关问题