我试图从PubSub订阅中提取数据,最后,一旦数据被提取,我想做一些转换。目前,它是字节格式。我已经尝试了多种方法来提取JSON格式的数据,使用自定义模式,它失败了一个错误
raise TypeError(TypeError: Parameter to an Iterable hint must be a non-sequence, a type, or a TypeConstraint. <class 'dict'> is an instance of type.
readPubSub.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing
class MySchema(typing.NamedTuple):
user_id:str
event_ts:str
create_ts:str
event_id:str
ifa:str
ifv:str
country:str
chip_balance:str
game:str
user_group:str
user_condition:str
device_type:str
device_model:str
user_name:str
fb_connect:bool
is_active_event:bool
event_payload:str
TOPIC_PATH = "projects/nectar-259905/topics/events"
def pubsub_to_dict(pubsub_message: str) -> dict:
dicts_as_string = pubsub_message.replace(' ','').split('\n\n')
for dict_as_string in dicts_as_string:
yield eval(dict_as_string)
def run(pubsub_topic):
options = PipelineOptions(
streaming=True
)
runner = 'DirectRunner'
print("I reached before pipeline")
with beam.Pipeline(runner, options=options) as pipeline:
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Split Into Dicts' >> beam.Map(pubsub_to_dict)
| 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)
| "Writing to console" >> beam.Map(print))
print("I reached after pipeline")
result = message.run()
result.wait_until_finish()
run(TOPIC_PATH)
如果我在正下方使用
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| "Writing to console" >> beam.Map(print))
输出为
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:42:52.283 UTC',
'event_id': 'Game_Request_Declined',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9140',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:54:38.297 UTC',
'event_id': 'Decline_Game_Request',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9905',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
请让我知道,如果我做错了什么,而解析数据到JSON.此外,我正在寻找的例子做数据屏蔽和运行一些SQL在Apache梁
1条答案
按热度按时间qnakjoqk1#
您的问题是由
ast.literal_eval
导致的,它无法评估字段event_payload
的值(该字段不是正确的JSON格式)。您需要将pubsub消息拆分为单独的dicts,使用
然后,您可以像这样用简单的
beam.Map
填充NamedTupleMySchema(**msg)
只有在NamedTuple与msg
的结构相同时才有效。如果它们不同,您需要手动填充模式,例如: