pyspark-在结构化流媒体上应用自定义函数

t1rydlwq  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(193)

我有4列['clienttimestamp'、'sensor\u id'、'actvivity'、“incidents”]。从Kafka流,我消费数据,预处理和聚集在窗口中。
如果我使用groupby with“.count()”,那么流可以很好地在控制台中编写每个窗口的计数。
这很管用,

df = df.withWatermark("clientTimestamp", "1 minutes")\
                        .groupby(window(df.clientTimestamp, "1 minutes", "1 minutes"), col('sensor_type')).count()
query = df.writeStream.outputMode("append").format('console').start() 
query.awaitTermination()

但真正的动机是找出关键活动存在的总时间。i、 e.对于每种传感器类型,我按窗口对数据进行分组,得到关键活动列表,并找出所有关键活动持续的总时间”(代码如下)。但我不确定我是否用正确的方式使用自定义项!因为下面的方法不起作用!任何人都可以提供一个为每组窗口应用自定义函数并将输出写入控制台的示例。
这行不通

@f.pandas_udf(schemahh, f.PandasUDFType.GROUPED_MAP)
def calculate_time(pdf):
    pdf = pdf.reset_index(drop=True)
    total_time = 0
    index_list = pdf.index[pdf['activity'] == 'critical'].to_list()
    for ind in index_list:
        start = pdf.loc[ind]['clientTimestamp']
        end = pdf.loc[ind + 1]['clientTimestamp']
        diff = start - end
        time_n_mins = round(diff.seconds / 60, 2)
        total_time = total_time + time_n_mins
    largest_session_time = total_time
    new_pdf = pd.DataFrame(columns=['sensor_type', 'largest_session_time'])
    new_pdf.loc[0] = [pdf.loc[0]['sensor_type'], largest_session_time]
    return new_pdf

df = df.withWatermark("clientTimestamp", "1 minutes")\
                        .groupby(window(df.clientTimestamp, "1 minutes", "1 minutes"), col('sensor_type'), col('activity')).apply(calculate_time)
query = df.writeStream.outputMode("append").format('console').start() 
query.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题