如何在python中实现带有结构化流的snowflake连接器?

nom7f22z  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(504)

目前,我有一些代码接收输入并创建一个数据流。我的目标是将数据上传到snowflake。目前我正在尝试这一点,有没有一个更简单的方法去做这件事。或者有没有可能把这个写到Pandasdf上,然后把Pandasdf上传到snowflake上?它以前只使用结构化流媒体,与snowflake没有任何联系。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

sfconn = {
    "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com",
    "sfUser": os.getenv('SNOWFLAKE_USER'),
    "sfPassword": os.getenv('SNOWFLAKE_PASSWORD'),
    "sfDatabase": "x",
    "sfSchema": "x",
    "sfWarehouse": "x"
}

spark = SparkSession.builder\
    .appName("snowflake-connector")\
    .getOrCreate()

df = spark \
              .readStream\
              .format('json') \
              .schema(spark_schemas['x']) \
              .load(f"s3a://{x_path}")

out = df \
    .writeStream\
    .outputMode("append")\
    .option("dbtable", "scratch_table")\
    .options(sfconn)\
    .trigger(processingTime='1 minutes')\
    .format("snowflake")\
    .start()

现在它正在显现

options() takes 1 positional argument but 2 were given

: java.lang.ClassNotFoundException: Failed to find data source: snowflake.
mlmc2os5

mlmc2os51#

第一个错误如下- options 取指定选项的可变对数。如果您有选项as map,则需要使用 **map 语法,如下所示:

opts = {'inferSchema': "true", "header": "false"}
df = spark.read.options(**opts)
   .format("csv")
   .schema("ticker String,date Date, price Float")
   .load(".../datasets/dow-quotes.csv")

对于第二个错误-只需指定正确的连接器名称- net.snowflake.spark.snowflake 而不是 snowflake ,并确保在提交作业时指定了雪花Spark连接器。有关更多详细信息,请参见雪花文档。

相关问题