目前,我有一些代码接收输入并创建一个数据流。我的目标是将数据上传到snowflake。目前我正在尝试这一点,有没有一个更简单的方法去做这件事。或者有没有可能把这个写到Pandasdf上,然后把Pandasdf上传到snowflake上?它以前只使用结构化流媒体,与snowflake没有任何联系。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
sfconn = {
"sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com",
"sfUser": os.getenv('SNOWFLAKE_USER'),
"sfPassword": os.getenv('SNOWFLAKE_PASSWORD'),
"sfDatabase": "x",
"sfSchema": "x",
"sfWarehouse": "x"
}
spark = SparkSession.builder\
.appName("snowflake-connector")\
.getOrCreate()
df = spark \
.readStream\
.format('json') \
.schema(spark_schemas['x']) \
.load(f"s3a://{x_path}")
out = df \
.writeStream\
.outputMode("append")\
.option("dbtable", "scratch_table")\
.options(sfconn)\
.trigger(processingTime='1 minutes')\
.format("snowflake")\
.start()
现在它正在显现
options() takes 1 positional argument but 2 were given
和
: java.lang.ClassNotFoundException: Failed to find data source: snowflake.
1条答案
按热度按时间mlmc2os51#
第一个错误如下-
options
取指定选项的可变对数。如果您有选项as map,则需要使用**map
语法,如下所示:对于第二个错误-只需指定正确的连接器名称-
net.snowflake.spark.snowflake
而不是snowflake
,并确保在提交作业时指定了雪花Spark连接器。有关更多详细信息,请参见雪花文档。