我有一个 DoFn
,其中我返回常规输出和带标记的输出:
class Foo(DoFn):
UNPARSABLE_TAG = "unparseable_events"
def process(
self, element: Tuple[str, date]
) -> Iterable[Union[Event, str]]:
ev, d = element
if ev == "1":
yield Event(ev)
else:
yield TaggedOutput(self.UNPARSABLE_TAG, ev)
运行时类型检查似乎期望返回包含常规和标记输出类型的提示。
虽然这是可行的,但当同时使用这两种方法时 PCollections
然后,您需要添加一个“decoratorhack”,以便以后再次获得正确的类型:
valid, invalid = ... | ParDo(Foo())
valid | Map(_beamDecoratorHack)
...
def _beamDecoratorHack(
ev: Union[Event, str]
) -> Event:
assert isinstance(ev, Event)
return ev
这似乎很缓慢。有没有办法不需要黑客?
暂无答案!
目前还没有任何答案,快来回答吧!