水印未在spark中显示正确输出

rbl8hiat  于 2023-06-24  发布在  Apache
关注(0)|答案(1)|浏览(123)

我正在使用netcat服务器向spark发送流数据:

nc -lk 9999

我正在以以下格式发送数据:

Time,number

在spark中,我正在拆分它们并执行groupby操作。下面是我的代码:

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.concurrent.TimeoutException;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.streaming.Trigger;

public class SampleProgram {
    public static void main(String args[]) {
        SparkSession spark = SparkSession
                .builder()
                .appName("Spark-Kafka-Integration")
                .config("spark.master", "local")
                .getOrCreate();

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load();

        lines.printSchema();

       Dataset<Row> temp_data = lines.selectExpr("split(value,',')[0] as timestamp","split(value,',')[1] as value");
       Dataset<Row> data = temp_data.selectExpr("CAST(timestamp AS TIMESTAMP)", "CAST(value AS INT)");

        Dataset<Row> windowedCounts = data
                .withWatermark("timestamp", "10 minutes")
                .groupBy(
                    functions.window(data.col("timestamp"), "5 minutes"),
                        col("value")
                ) .count();

        StreamingQuery query = null;
        try {
            query = windowedCounts.writeStream()
                    .outputMode("update")
                    .option("truncate", "false")
                    .format("console")
                    .trigger(Trigger.ProcessingTime(" 45 seconds"))
                    .start();
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            throw new RuntimeException(e);
        }

    }
}

我面临的问题是-
当我给予,比如说,10:00:00,5,它会给出这个输出。

现在,在这个时间点,最大事件时间是10:00:00,我已经指定了水印为10分钟,因此在(10:00:00-00:10:00)之前的任何事件,即09:50:00被拒绝然而,当我给予09:48:00时,它给出了这个输出-

这似乎不正确,因为数据已经太晚了,它应该被拒绝的Spark,但Spark正在考虑它。我错过了什么?

okxuctiv

okxuctiv1#

用这种方式编写groupby

.groupBy(
     window(col("timestamp"),"5 minutes"),
     col("value")
).count();

相关问题