无法在Python中从pubsub写入BigQuery

jdzmm42g  于 2023-03-31  发布在  Python
关注(0)|答案(1)|浏览(101)

我将JSON数据发送到pub-sub主题。下面是代码片段。我的目的是解析pub-sub中的数据并进行一些小的转换,然后将其发送到BigQuery。

publishEvents.py

import json
from google.cloud import pubsub_v1
from concurrent import futures
from typing import Callable

project='nectar-259905'
topic='incomingData'
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path( project, topic)
publish_futures = []

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

for line in open('/home/test/Resources/Input/Input_Data.json', 'r'):
    traffic=json.loads(line)
    data=str(traffic)
    #publisher.publish(topic_path, str.encode(str(key),"utf-8"))
    publish_future=publisher.publish(topic_path,data.encode("utf-8"))
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

Input_Data.json

{
    "user_id": "102105290400257936",
    "event_ts": "2021-05-29 00:03:12.262 UTC",
    "event_id": "Change_User_Name",
    "country": "US",
    "game": "gru",
    "user_group": "[1, 529701]",
    "user_condition": "[1]",
    "device_type": "tablet",
    "device_model": "iPhone13,2",
    "user_name": "Gin47709426",
    "fb_connect": false,
    "event_payload": "{\"trigger\":\"install\"}",
    "is_active_event": true
}

从出版订阅.py中读取

import argparse
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

from datetime import datetime
import json

def parse_json_message(message):
    """Parse the input json message and add 'score' & 'processing_time' keys."""
    row = json.loads(message)
    return {
'user_id':row['user_id'],
'event_ts':row['event_ts'],
'create_ts':row['create_ts'],
'event_id':row['event_id'],
'ifa':row['ifa'],
'ifv':row['ifv'],
'country':row['country'],
'chip_balance':row['chip_balance'],
'game':row['game'],
'user_group':row['user_group'],
'user_condition':row['user_condition'],
'device_type':row['device_type'],
'device_model':row['device_model'],
'user_name':row['user_name'],
'fb_connect':row['fb_connect'],
'is_active_event':row['is_active_event'],
'event_payload':row['event_payload']
    }

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--subscription',
        dest='subscription',
        default='projects/nectar-259905/subscriptions/bq_subscribe',
        help='Input Pub/Sub subscription')

    parser.add_argument(
        '--table_spec ',
        dest='table_spec',
        default='nectar-259905:JSONData.bq_table',
        help='Destination BigQuery table.')

    known_args, pipeline_args = parser.parse_known_args(argv)
    options = PipelineOptions()
    options.view_as(StandardOptions).streaming = True


    with beam.Pipeline(options=options) as p:
        table_schema = {
            'fields': [

  {
    "name": "user_id",
    "mode": "REQUIRED",
    "type": "STRING"

    
  },
  {
    "name": "event_ts",
    "mode": "REQUIRED",
    "type": "STRING"

    
  },
  {
    "name": "create_ts",
    "mode": "REQUIRED",
    "type": "STRING"

    
  },
  {
    "name": "event_id",
    "mode": "REQUIRED",
    "type": "STRING"

    
  },
  {
    "name": "ifa",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "ifv",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "country",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "chip_balance",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "game",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "user_group",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "user_condition",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "device_type",
    "mode": "NULLABLE",
    "type": "STRING"

    
  },
  {
    "name": "device_model",
    "mode": "REQUIRED",
    "type": "STRING"

    
  },
  {
    "name": "user_name",
    "mode": "REQUIRED",
    "type": "STRING"

    
  },
  {
    "name": "fb_connect",
    "mode": "NULLABLE",
    "type": "BOOLEAN"

    
  },
  {
    "name": "is_active_event",
    "mode": "NULLABLE",
    "type": "BOOLEAN"

    
  },
  {
    "name": "event_payload",
    "mode": "NULLABLE",
    "type": "STRING"

    
  }
            ]
        }

        (p
            | 'Read from pubsub' >> beam.io.ReadFromPubSub(subscription=known_args.subscription)
            | 'Parse JSON messages' >> beam.Map(parse_json_message)
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                        known_args.table_spec,
                        schema=table_schema,
                        #method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                        #triggering_frequency=1,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))

    
if __name__ == '__main__':
  run()

当我执行代码readFromPubSub.py时,它停留在x1c 0d1x
如果我在schema中做错了什么,或者在解析来自函数parse_json_message的消息时,有没有指针?

anhgbhbe

anhgbhbe1#

您的问题在这一行显示错误:

options.view_as(StandardOptions).streaming = True

最好不要在管道代码中设置此选项。
您可以在启动Dataflow作业的命令行中将其直接设置为pipeline option

python -m folder.your_main \
    --project=my-project\
    --streaming \

这个选项不是强制性的,也不需要设置它,你读一个无界的源,Dataflow会扣除这是一个流作业。

相关问题