问题:我有一个流数据。流数据来自单个pubsub。然后,我将数据拆分为两个组,A组和B组。A组是包含单个事件代码的组。B组是包含事务的组。我需要聚合事务数据,但在A组中存在一些代码之后。例如,我的程序将在代码U存在于组A中之后开始聚合事务,并在代码B存在于组A中之后结束聚合。
问题:管道如何根据组A中的代码知道何时启动和停止?
我创建了一个示例管道,但仍然不知道如何获得预期的结果。
假设我有两个pcollection,一个用于事件代码,一个用于事务
with beam.Pipeline(options=beam_options) as p:
event = (p | "Create1" >> beam.Create([
{"eventCode": "A", "dates": datetime(2022, 12, 1, 7, 0, 0)},
{"eventCode": "C", "dates": datetime(2022, 12, 1, 7, 1, 0)},
{"eventCode": "U", "dates": datetime(2022, 12, 1, 7, 2, 0)},
{"eventCode": "D", "dates": datetime(2022, 12, 1, 7, 3, 0)},
{"eventCode": "E", "dates": datetime(2022, 12, 1, 7, 4, 0)},
{"eventCode": "F", "dates": datetime(2022, 12, 1, 7, 5, 0)},
{"eventCode": "G", "dates": datetime(2022, 12, 1, 7, 6, 0)},
{"eventCode": "B", "dates": datetime(2022, 12, 1, 7, 7, 0)},
{"eventCode": "T", "dates": datetime(2022, 12, 1, 7, 8, 0)},
{"eventCode": "H", "dates": datetime(2022, 12, 1, 7, 9, 0)},
{"eventCode": "I", "dates": datetime(2022, 12, 1, 7, 10, 0)},
{"eventCode": "J", "dates": datetime(2022, 12, 1, 7, 11, 0)},
{"eventCode": "M", "dates": datetime(2022, 12, 1, 7, 12, 0)},
{"eventCode": "B", "dates": datetime(2022, 12, 1, 7, 14, 0)},
{"eventCode": "Y", "dates": datetime(2022, 12, 1, 7, 15, 0)},
{"eventCode": "X", "dates": datetime(2022, 12, 1, 7, 16, 0)},
]))
trx_data = (p | "Create2" >> beam.Create([
{"trxCode": "TRX001", "price": 156, "dates": datetime(2022, 12, 1, 7, 1, 0)},
{"trxCode": "TRX002", "price": 157, "dates": datetime(2022, 12, 1, 7, 2, 0)},
{"trxCode": "TRX003", "price": 158, "dates": datetime(2022, 12, 1, 7, 3, 0)},
{"trxCode": "TRX004", "price": 159, "dates": datetime(2022, 12, 1, 7, 4, 0)},
{"trxCode": "TRX005", "price": 160, "dates": datetime(2022, 12, 1, 7, 5, 0)},
{"trxCode": "TRX001", "price": 161, "dates": datetime(2022, 12, 1, 7, 6, 0)},
{"trxCode": "TRX002", "price": 162, "dates": datetime(2022, 12, 1, 7, 7, 0)},
{"trxCode": "TRX006", "price": 163, "dates": datetime(2022, 12, 1, 7, 8, 0)},
{"trxCode": "TRX001", "price": 164, "dates": datetime(2022, 12, 1, 7, 0, 9)},
{"trxCode": "TRX007", "price": 165, "dates": datetime(2022, 12, 1, 7, 10, 0)},
{"trxCode": "TRX008", "price": 166, "dates": datetime(2022, 12, 1, 7, 11, 0)},
{"trxCode": "TRX003", "price": 167, "dates": datetime(2022, 12, 1, 7, 12, 0)},
{"trxCode": "TRX005", "price": 168, "dates": datetime(2022, 12, 1, 7, 13, 0)},
{"trxCode": "TRX009", "price": 169, "dates": datetime(2022, 12, 1, 7, 14, 0)},
{"trxCode": "TRX010", "price": 170, "dates": datetime(2022, 12, 1, 7, 15, 0)},
]))
# I made a window to simulate the streaming process
event_window = (event
| beam.Map(lambda d: beam.window.TimestampedValue(d, d["dates"].timestamp()))
| beam.WindowInto(beam.window.FixedWindows(1))
| beam.Map(lambda d: (d["eventCode"], d))
| beam.GroupByKey()
| beam.MapTuple(lambda e, d: d)
)
trx_window = (trx_data
| "TrxData1" >> beam.Map(lambda d: beam.window.TimestampedValue(d, d["dates"].timestamp()))
| "TrxData2" >> beam.WindowInto(beam.window.FixedWindows(10*60))
| "TrxData3" >> beam.Map(lambda d: (d["secCode"], d))
| "TrxData4" >> beam.GroupByKey()
)
process = (trx_window
| beam.ParDo(TransformWithSideInput(), stock_u=beam.pvalue.List(event_window))
| beam.Map(print)
)
对于TransformWithSideInput()
,我创建了以下代码:
class TransformWithSideInput(beam.DoFn):
def start_bundle(self):
self.bucket = []
def process(self, element, event):
for ethe in event:
event_code = e.get("eventCode")
event_code_dates = e.get("dates")
queue = []
if event_code == "U":
queue = ["U"]
elif event_code == "B":
queue = ["B"]
if queue == ["U"]:
# If the code U exists then start appending the data into the bucket.
# Logically, the transactions that need to be processed are all transactions
# that come after the time the event code arrives.
if element.get("dates") >= event_code_dates:
self.bucket.append(element)
elif queue == ["B"]:
yield self.bucket
def finish_bundle(self):
if self.bucket:
self.flush()
def flush(self):
yield self.bucket
我通过将事件管道作为辅助输入来创建TransformWithSideInput
。原因是我需要检查此事件管道,如果满足条件,DoFn函数可以返回基于事件的所有数据。然而,事务管道什么也不返回。
在这种情况下,当event_code == "U"
然后启动事务流水线,直到event_code == "B"
存在,然后停止流水线并返回结果。
1条答案
按热度按时间pjngdqdw1#
分布式系统中的数据不一定需要按照任何特定的顺序出现,因此当发生其他事情时,您需要小心触发处理。相反,确定何时看到“所有相关数据”是由水印控制的。关于如何考虑这一点,有许多参考资料,例如https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
如果您的数据可以在某个预定的时间被称为完成,那么在两个组上窗口化联接可能会很好地工作。否则,您可能需要考虑使用state and timers。(特别注意,对于单个事件流,
start_bundle
和finish_bundle
可能会被调用多次,但是状态可以在此之后保持不变。)