我将JSON数据发送到pub-sub主题。下面是代码片段。我的目的是解析pub-sub中的数据并进行一些小的转换,然后将其发送到BigQuery。
publishEvents.py
import json
from google.cloud import pubsub_v1
from concurrent import futures
from typing import Callable
project='nectar-259905'
topic='incomingData'
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path( project, topic)
publish_futures = []
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
for line in open('/home/test/Resources/Input/Input_Data.json', 'r'):
traffic=json.loads(line)
data=str(traffic)
#publisher.publish(topic_path, str.encode(str(key),"utf-8"))
publish_future=publisher.publish(topic_path,data.encode("utf-8"))
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
Input_Data.json
{
"user_id": "102105290400257936",
"event_ts": "2021-05-29 00:03:12.262 UTC",
"event_id": "Change_User_Name",
"country": "US",
"game": "gru",
"user_group": "[1, 529701]",
"user_condition": "[1]",
"device_type": "tablet",
"device_model": "iPhone13,2",
"user_name": "Gin47709426",
"fb_connect": false,
"event_payload": "{\"trigger\":\"install\"}",
"is_active_event": true
}
从出版订阅.py中读取
import argparse
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from datetime import datetime
import json
def parse_json_message(message):
"""Parse the input json message and add 'score' & 'processing_time' keys."""
row = json.loads(message)
return {
'user_id':row['user_id'],
'event_ts':row['event_ts'],
'create_ts':row['create_ts'],
'event_id':row['event_id'],
'ifa':row['ifa'],
'ifv':row['ifv'],
'country':row['country'],
'chip_balance':row['chip_balance'],
'game':row['game'],
'user_group':row['user_group'],
'user_condition':row['user_condition'],
'device_type':row['device_type'],
'device_model':row['device_model'],
'user_name':row['user_name'],
'fb_connect':row['fb_connect'],
'is_active_event':row['is_active_event'],
'event_payload':row['event_payload']
}
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--subscription',
dest='subscription',
default='projects/nectar-259905/subscriptions/bq_subscribe',
help='Input Pub/Sub subscription')
parser.add_argument(
'--table_spec ',
dest='table_spec',
default='nectar-259905:JSONData.bq_table',
help='Destination BigQuery table.')
known_args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
table_schema = {
'fields': [
{
"name": "user_id",
"mode": "REQUIRED",
"type": "STRING"
},
{
"name": "event_ts",
"mode": "REQUIRED",
"type": "STRING"
},
{
"name": "create_ts",
"mode": "REQUIRED",
"type": "STRING"
},
{
"name": "event_id",
"mode": "REQUIRED",
"type": "STRING"
},
{
"name": "ifa",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "ifv",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "country",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "chip_balance",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "game",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "user_group",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "user_condition",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "device_type",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "device_model",
"mode": "REQUIRED",
"type": "STRING"
},
{
"name": "user_name",
"mode": "REQUIRED",
"type": "STRING"
},
{
"name": "fb_connect",
"mode": "NULLABLE",
"type": "BOOLEAN"
},
{
"name": "is_active_event",
"mode": "NULLABLE",
"type": "BOOLEAN"
},
{
"name": "event_payload",
"mode": "NULLABLE",
"type": "STRING"
}
]
}
(p
| 'Read from pubsub' >> beam.io.ReadFromPubSub(subscription=known_args.subscription)
| 'Parse JSON messages' >> beam.Map(parse_json_message)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.table_spec,
schema=table_schema,
#method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
#triggering_frequency=1,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
if __name__ == '__main__':
run()
当我执行代码readFromPubSub.py
时,它停留在x1c 0d1x
如果我在schema中做错了什么,或者在解析来自函数parse_json_message
的消息时,有没有指针?
1条答案
按热度按时间anhgbhbe1#
您的问题在这一行显示错误:
最好不要在管道代码中设置此选项。
您可以在启动
Dataflow
作业的命令行中将其直接设置为pipeline option:这个选项不是强制性的,也不需要设置它,你读一个无界的源,
Dataflow
会扣除这是一个流作业。