如何加入流和数据集?

jogvjijk  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(345)

如何连接流和数据集?我有一个流,文件中有一个静态数据。我想用文件中的数据来丰富流的数据。
示例:在流中我得到机场代码,在文件中我有机场的名称和文件中的代码。现在我想将流数据加入到文件中,形成一个具有机场名称的新流。请提供如何实现这一目标的步骤。

myss37ts

myss37ts1#

根据具体要求,有许多方法可以使用flink进行河流富集。https://www.youtube.com/watch?v=cjs18ikluiy 这是康斯坦丁·可耐福的一篇很好的演讲,涵盖了许多不同的方法,以及它们之间的权衡。
在一个简单的例子中,富集数据是不可变的,并且相当小,我只使用 RichFlatMap 并将整个文件加载到 open() 方法。看起来像这样:

public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> {

    private Map<Long, SensorReferenceData> referenceData;

    @Override
    public void open(final Configuration parameters) throws Exception {
      super.open(parameters);
      referenceData = loadReferenceData();
    }

    @Override
    public void flatMap(
        final Event event,
        final Collector<EnrichedEvent> collector) throws Exception {

      SensorReferenceData sensorReferenceData = 
        referenceData.get(sensorMeasurement.getSensorId());
      collector.collect(new EnrichedEvent(event, sensorReferenceData));
    }

}

您将在中找到更多其他方法的代码示例https://github.com/knaufk/enrichments-with-flink.
更新:
如果您更愿意预加载一些更大的分区引用数据以与流连接,那么有几种方法可以实现这一点,我在上面共享的视频和repo中介绍了其中一些方法。对于那些特定的需求,我建议使用自定义分区器;在同一个github repo中有一个例子。其思想是对丰富数据进行切分,并使用相关的参考数据将每个流事件导向示例。
在我看来,这比试图让表api作为一个连接来进行这种特殊的扩展要简单。

相关问题