windows在数据上切片spark python

w51jfk4q  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(462)

我有以下数据:

  1. id | capacity | timestamp
  2. -----------------------------------
  3. 1 | 35 | 2020-12-01 13:28:..
  4. 2 | 47 | 2020-12-01 13:28:..
  5. 3 | 101 | 2020-12-01 13:28:..

我需要用一个窗口片段计算每个id的平均容量。每2分钟我需要获得过去5分钟内每个id的容量平均值。我的代码如下,但没有工作。我定义了窗口,但数据不正确。有人能帮我吗?

  1. def main():
  2. directory = '/Users/jose/sizes'
  3. spark_session = SparkSession \
  4. .builder \
  5. .master("local[2]") \
  6. .appName("StreamingAveragePlazasLibresV2") \
  7. .getOrCreate()
  8. logger = spark_session._jvm.org.apache.log4j
  9. logger.Lo
  10. fields = [StructField("nombre", StringType(), True),
  11. StructField("capacidad", IntegerType(), True),
  12. StructField("libres", IntegerType(), True),
  13. StructField("plazas_ocupadas", IntegerType(), True),
  14. StructField("timestamp", TimestampType(), True)]
  15. lines = spark_session \
  16. .readStream \
  17. .format("csv") \
  18. .option('includeTimestamp', 'true') \
  19. .schema(StructType(fields)) \
  20. .load(directory) \
  21. .withColumn("timestamp", current_timestamp()) \
  22. lines.printSchema()
  23. values = lines\
  24. .groupBy(lines.nombre,lines.capacidad,lines.timestamp) \
  25. .agg(functions.mean("plazas_ocupadas").alias("mean"))
  26. #values.printSchema()
  27. windowSize = 300
  28. slideSize = 120
  29. windowDuration = '{} seconds'.format(windowSize)
  30. slideDuration = '{} seconds'.format(slideSize)
  31. windowedCounts = values.groupBy(
  32. window(values.timestamp, windowDuration, slideDuration),
  33. values.mean
  34. ).count().orderBy('window')
  35. windowedCounts.printSchema()
  36. query = values \
  37. .writeStream \
  38. .outputMode("complete") \
  39. .format("console") \
  40. .start()
  41. query.awaitTermination()
  42. if __name__ == '__main__':
  43. main()
smdnsysy

smdnsysy1#

我不明白你为什么用两次群比。
在没有测试数据的情况下,我建议合并两个groupby(两个printschema()语句之间的代码块):

  1. windowSize = 300
  2. slideSize = 120
  3. windowDuration = '{} seconds'.format(windowSize)
  4. slideDuration = '{} seconds'.format(slideSize)
  5. windowedCounts = lines\
  6. .groupBy(
  7. window(values.timestamp, windowDuration, slideDuration),
  8. lines.nombre,
  9. lines.capacidad,
  10. )\
  11. .agg(functions.mean("plazas_ocupadas").alias("mean"))

相关问题