python 是否可以在条件满足后触发管道?如果可以,如何操作?

dw1jzc5e  于 2022-12-21  发布在  Python
关注(0)|答案(1)|浏览(125)

问题:我有一个流数据。流数据来自单个pubsub。然后,我将数据拆分为两个组,A组和B组。A组是包含单个事件代码的组。B组是包含事务的组。我需要聚合事务数据,但在A组中存在一些代码之后。例如,我的程序将在代码U存在于组A中之后开始聚合事务,并在代码B存在于组A中之后结束聚合。
问题:管道如何根据组A中的代码知道何时启动和停止?

我创建了一个示例管道,但仍然不知道如何获得预期的结果。
假设我有两个pcollection,一个用于事件代码,一个用于事务

with beam.Pipeline(options=beam_options) as p:
    event = (p | "Create1" >> beam.Create([
        {"eventCode": "A", "dates": datetime(2022, 12, 1, 7, 0, 0)},
        {"eventCode": "C", "dates": datetime(2022, 12, 1, 7, 1, 0)},
        {"eventCode": "U", "dates": datetime(2022, 12, 1, 7, 2, 0)},
        {"eventCode": "D", "dates": datetime(2022, 12, 1, 7, 3, 0)},
        {"eventCode": "E", "dates": datetime(2022, 12, 1, 7, 4, 0)},
        {"eventCode": "F", "dates": datetime(2022, 12, 1, 7, 5, 0)},
        {"eventCode": "G", "dates": datetime(2022, 12, 1, 7, 6, 0)},
        {"eventCode": "B", "dates": datetime(2022, 12, 1, 7, 7, 0)},
        {"eventCode": "T", "dates": datetime(2022, 12, 1, 7, 8, 0)},
        {"eventCode": "H", "dates": datetime(2022, 12, 1, 7, 9, 0)},
        {"eventCode": "I", "dates": datetime(2022, 12, 1, 7, 10, 0)},
        {"eventCode": "J", "dates": datetime(2022, 12, 1, 7, 11, 0)},
        {"eventCode": "M", "dates": datetime(2022, 12, 1, 7, 12, 0)},
        {"eventCode": "B", "dates": datetime(2022, 12, 1, 7, 14, 0)},
        {"eventCode": "Y", "dates": datetime(2022, 12, 1, 7, 15, 0)},
        {"eventCode": "X", "dates": datetime(2022, 12, 1, 7, 16, 0)},
    ]))

    trx_data = (p | "Create2" >> beam.Create([
        {"trxCode": "TRX001", "price": 156, "dates": datetime(2022, 12, 1, 7, 1, 0)},
        {"trxCode": "TRX002", "price": 157, "dates": datetime(2022, 12, 1, 7, 2, 0)},
        {"trxCode": "TRX003", "price": 158, "dates": datetime(2022, 12, 1, 7, 3, 0)},
        {"trxCode": "TRX004", "price": 159, "dates": datetime(2022, 12, 1, 7, 4, 0)},
        {"trxCode": "TRX005", "price": 160, "dates": datetime(2022, 12, 1, 7, 5, 0)},
        {"trxCode": "TRX001", "price": 161, "dates": datetime(2022, 12, 1, 7, 6, 0)},
        {"trxCode": "TRX002", "price": 162, "dates": datetime(2022, 12, 1, 7, 7, 0)},
        {"trxCode": "TRX006", "price": 163, "dates": datetime(2022, 12, 1, 7, 8, 0)},
        {"trxCode": "TRX001", "price": 164, "dates": datetime(2022, 12, 1, 7, 0, 9)},
        {"trxCode": "TRX007", "price": 165, "dates": datetime(2022, 12, 1, 7, 10, 0)},
        {"trxCode": "TRX008", "price": 166, "dates": datetime(2022, 12, 1, 7, 11, 0)},
        {"trxCode": "TRX003", "price": 167, "dates": datetime(2022, 12, 1, 7, 12, 0)},
        {"trxCode": "TRX005", "price": 168, "dates": datetime(2022, 12, 1, 7, 13, 0)},
        {"trxCode": "TRX009", "price": 169, "dates": datetime(2022, 12, 1, 7, 14, 0)},
        {"trxCode": "TRX010", "price": 170, "dates": datetime(2022, 12, 1, 7, 15, 0)},
    ]))
    
    # I made a window to simulate the streaming process
    event_window = (event
                      | beam.Map(lambda d: beam.window.TimestampedValue(d, d["dates"].timestamp()))
                      | beam.WindowInto(beam.window.FixedWindows(1))
                      | beam.Map(lambda d: (d["eventCode"], d))
                      | beam.GroupByKey()
                      | beam.MapTuple(lambda e, d: d)
                      )
    trx_window = (trx_data
                   | "TrxData1" >> beam.Map(lambda d: beam.window.TimestampedValue(d, d["dates"].timestamp()))
                   | "TrxData2" >> beam.WindowInto(beam.window.FixedWindows(10*60))
                   | "TrxData3" >> beam.Map(lambda d: (d["secCode"], d))
                   | "TrxData4" >> beam.GroupByKey()
                   )

    process = (trx_window
               | beam.ParDo(TransformWithSideInput(), stock_u=beam.pvalue.List(event_window))
               | beam.Map(print)
               )

对于TransformWithSideInput(),我创建了以下代码:

class TransformWithSideInput(beam.DoFn):
    def start_bundle(self):
        self.bucket = []

    def process(self, element, event):
        for ethe  in event:
            event_code = e.get("eventCode")
            event_code_dates = e.get("dates")
            queue = []
            if event_code == "U":
                queue = ["U"]
            elif event_code == "B":
                queue = ["B"]

            if queue == ["U"]:
                # If the code U exists then start appending the data into the bucket.
                # Logically, the transactions that need to be processed are all transactions
                # that come after the time the event code arrives.
                if element.get("dates") >= event_code_dates:
                    self.bucket.append(element)
            elif queue == ["B"]:
                yield self.bucket

    def finish_bundle(self):
        if self.bucket:
            self.flush()

    def flush(self):
        yield self.bucket

我通过将事件管道作为辅助输入来创建TransformWithSideInput。原因是我需要检查此事件管道,如果满足条件,DoFn函数可以返回基于事件的所有数据。然而,事务管道什么也不返回。
在这种情况下,当event_code == "U"然后启动事务流水线,直到event_code == "B"存在,然后停止流水线并返回结果。

pjngdqdw

pjngdqdw1#

分布式系统中的数据不一定需要按照任何特定的顺序出现,因此当发生其他事情时,您需要小心触发处理。相反,确定何时看到“所有相关数据”是由水印控制的。关于如何考虑这一点,有许多参考资料,例如https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
如果您的数据可以在某个预定的时间被称为完成,那么在两个组上窗口化联接可能会很好地工作。否则,您可能需要考虑使用state and timers。(特别注意,对于单个事件流,start_bundlefinish_bundle可能会被调用多次,但是状态可以在此之后保持不变。)

相关问题