java—如何提取kafka流中嵌入在消息中的时间戳

llmtgqce  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(533)

我想提取嵌入每个消息的时间戳,并将它们作为json负载发送到我的数据库中。
我想得到以下三个时间戳。
活动时间: The point in time when an event or data record occurred, i.e. was originally created “by the source”. 处理时间: The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. 摄入时间: The point in time when an event or data record is stored in a topic partition by a Kafka broker. 这是我的streams应用程序代码:

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");

source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        System.out.println("========> o365_user_activity_by_date Log:     " + value);
        ArrayList<String> keywords = new ArrayList<String>();
        try {
            JSONObject send = new JSONObject();
            JSONObject received = new JSONObject(value);

            send.put("current_date", getCurrentDate().toString()); // UTC TIME
            send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
            send.put("user_id", received.get("UserId"));
            send.put("operation", received.get("Operation"));
            send.put("workload", received.get("Workload"));
            keywords.add(send.toString());

        } catch (Exception e) {
            // TODO: handle exception
            System.err.println("Unable to convert to json");
            e.printStackTrace();
        }

        return keywords;
    }
}).to("o365_user_activity_by_date");

在代码中,我只是获取每条记录,对其进行流处理并将其发送到不同的主题。
现在我要发送的每一张唱片 Event-time , Processing-time 以及 Ingestion-time 嵌入有效载荷中。
我已经看过了 FailOnInvalidTimestamp 以及 WallclockTimestampExtractor 但我对如何使用它们感到困惑。
请指导我如何才能做到这一点。

ltqd579y

ltqd579y1#

这个 Timestamp 提取器只能给您一个时间戳,这个时间戳用于基于时间的操作,如窗口聚合或联接。似乎你没有做任何基于时间的计算思想,因此,从计算的Angular 来看,这并不重要。
注意,一个记录只有一个元数据时间戳字段。此时间戳字段可用于存储可由生产者设置的事件时间戳。或者,您可以让代理用代理摄取时间覆盖生产者提供的时间戳。这是一个主题配置。
要访问记录元数据时间戳(独立于事件时间或摄取时间),默认的时间戳提取器将为您提供此时间戳。如果您想在应用程序中访问它,您需要使用处理器api,即在您的案例a中 .transform() 而不是 .flatMap 接线员。你的 Transformer 将用 context 对象,允许您访问提取的时间戳。
因为一个记录只能存储一个元数据时间戳,而且您希望将其用于代理摄取时间,所以上游生产者必须将事件时间戳直接放入有效负载中。
对于处理时间,只需按照代码片段中的指示执行系统调用。

相关问题