在python中使用apache beam从pubsub输出中提取列值

xhv8bpkk  于 2023-03-28  发布在  Python
关注(0)|答案(1)|浏览(98)

我试图从PubSub订阅中提取数据,最后,一旦数据被提取,我想做一些转换。目前,它是字节格式。我已经尝试了多种方法来提取JSON格式的数据,使用自定义模式,它失败了一个错误

raise TypeError(TypeError: Parameter to an Iterable hint must be a non-sequence, a type, or a TypeConstraint. <class 'dict'> is an instance of type.

readPubSub.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing

class MySchema(typing.NamedTuple):
    user_id:str
    event_ts:str    
    create_ts:str   
    event_id:str    
    ifa:str
    ifv:str
    country:str
    chip_balance:str    
    game:str    
    user_group:str
    user_condition:str  
    device_type:str 
    device_model:str    
    user_name:str   
    fb_connect:bool 
    is_active_event:bool    
    event_payload:str

TOPIC_PATH = "projects/nectar-259905/topics/events"

def pubsub_to_dict(pubsub_message: str) -> dict:
    dicts_as_string = pubsub_message.replace(' ','').split('\n\n')
    for dict_as_string in dicts_as_string:
        yield eval(dict_as_string)

def run(pubsub_topic):
    options = PipelineOptions(
        streaming=True
    )
    runner = 'DirectRunner'

    print("I reached before pipeline")

    with beam.Pipeline(runner, options=options) as pipeline:
        message=(
                        pipeline
        | "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
        | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
        | 'Split Into Dicts' >> beam.Map(pubsub_to_dict)
        | 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)
        | "Writing to console" >> beam.Map(print))
        
    print("I reached after pipeline")
    result = message.run()
    result.wait_until_finish()

run(TOPIC_PATH)

如果我在正下方使用

message=(
    pipeline
    | "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
    | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
    | "Writing to console" >> beam.Map(print))

输出为

{
        'user_id': '102105290400258488',
        'event_ts': '2021-05-29 20:42:52.283 UTC',
        'event_id': 'Game_Request_Declined',
        'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
        'ifv': '3fc6eb8b4d0cf096c47e2252f41',
        'country': 'US',
        'chip_balance': '9140',
        'game': 'gru',
        'user_group': '[1, 36, 529702]',
        'user_condition': '[1, 36]',
        'device_type': 'phone',
        'device_model': 'TCL 5007Z',
        'user_name': 'Minnie',
        'fb_connect': True,
        'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
        'is_active_event': True
    }

{
    'user_id': '102105290400258488',
    'event_ts': '2021-05-29 20:54:38.297 UTC',
    'event_id': 'Decline_Game_Request',
    'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
    'ifv': '3fc6eb8b4d0cf096c47e2252f41',
    'country': 'US',
    'chip_balance': '9905',
    'game': 'gru',
    'user_group': '[1, 36, 529702]',
    'user_condition': '[1, 36]',
    'device_type': 'phone',
    'device_model': 'TCL 5007Z',
    'user_name': 'Minnie',
    'fb_connect': True,
    'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
    'is_active_event': True
}

请让我知道,如果我做错了什么,而解析数据到JSON.此外,我正在寻找的例子做数据屏蔽和运行一些SQL在Apache梁

qnakjoqk

qnakjoqk1#

您的问题是由ast.literal_eval导致的,它无法评估字段event_payload的值(该字段不是正确的JSON格式)。
您需要将pubsub消息拆分为单独的dicts,使用

def pubsub_to_dict(pubsub_message: str) -> dict:
   # it could be that you need to adjust the code a little bit, in case the actual pubsub return differs from the way I ingested your example data
   # the idea: first remove all whitespaces, then split at the line break between the dicts
   dicts_as_string = pubsub_message.replace(' ','').split('\n\n')
   for dict_as_string in dicts_as_string:
       return eval(dict_as_string)

然后,您可以像这样用简单的beam.Map填充NamedTuple

with beam.Pipeline(runner, options=options) as pipeline:
        message=(
            pipeline
            | "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
            | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
            | 'Split Into Dicts' >> beam.Map(pubsub_to_dict)
            | 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)

MySchema(**msg)只有在NamedTuple与msg的结构相同时才有效。如果它们不同,您需要手动填充模式,例如:

MySchema(
 field_1 = msg['key_1'],
 field_2 = msg['key_2']
)

相关问题