我会尽量保持简单。我定期从Kafka制作者那里读取一些数据,并使用spark结构化流输出以下内容
我有这样的数据输出:
+------------------------------------------+-------------------+--------------+-----------------+
|window |timestamp |Online_Emp |Available_Emp |
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:27|1 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:41|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:29|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:20|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:23|2 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:52|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:08|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:12|1 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:02|1 |1 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:11|1 |0 |
+------------------------------------------+-------------------+--------------+-----------------+
我希望它输出如下:
Time Online_Emp Available_Emp
2017-01-01 00:00:00 52 23
2017-01-01 00:01:00 58 19
2017-01-01 00:02:00 65 28
因此,基本上它计算每分钟在线的员工数(通过他们唯一的驱动程序id),并显示有多少可用。
请注意,一个特定的雇员id可能会在 available
以及 on_duty
在一分钟内,我们需要最后的记录直到最后一分钟
Kafka产品
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
# schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )
with open(filepath, 'r', encoding="utf16") as f:
for item in json_lines.reader(f):
dataDict.update({'timeStamp':item['timestamp'],
'emp_id':item['emp_id'],
'on_duty':item['on_duty']})
_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
sleep(1)
spark结构化流媒体
schema = StructType([ \
StructField("timeStamp", LongType()), \
StructField("emp_id", LongType()), \
StructField("on_duty", LongType())])
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe","emp_dstream")\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")\
.select(F.from_json(F.col("value"), schema).alias("value"))\
.select(F.col("value.*"))\
.withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))\
.groupBy(F.window(F.col("timestamp"), "1 minutes"), F.col("timestamp"))\
.agg(F.count(F.col("timeStamp")).alias("total_employees"),F.collect_list(F.col("on_duty")).alias("on_duty"),F.sum(F.when(F.col("on_duty") == 0, F.lit(1)).otherwise(F.lit(0))).alias("not_on_duty"))\
.writeStream\
.format("console")\
.outputMode("complete")\
.option("truncate", "false")\
.start()\
.awaitTermination()
如何获得所需的输出?
如有任何提示或帮助,我们将不胜感激!
1条答案
按热度按时间bpzcxfmw1#
你的代码运行得很好。请检查以下Kafka数据和Spark流输出。
Batch 5
是最终结果,忽略其他批处理,如批处理0到4。始终认为最新的一批已更新记录的数据可用的Kafka。批处理:0
批次:1
批次:2
批次:3
批次:4
批次:5