如何以正确的方式使用GCPDataproc集群中的spark连接到sqlserver?

jhdbpxl9  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(247)

我正在尝试将数据从sqlserver数据库移动到gcp上的bigquery。为此,我们创建了一个dataproc集群,我可以在其中运行spark作业,该作业连接到sqlserver上的源数据库,读取某些表并将它们摄取到bigquery。
gcp dataproc上的版本:

Spark: 2.4.7
Scala: 2.12.12

我的Spark代码:

val dataframe = spark.read.format("jdbc").option("url", s"jdbc:sqlserver://servername:port;DatabaseName=dbname").
option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").
option("user", "username").
option("password", "password").
option("dbtable", s"(select * from tablename where flagCol >= '${YearMonth.of(2019, 1).atDay(1).toString}' and flagCol <= '${YearMonth.of(2019, 12).atEndOfMonth().toString}') as dateDF").
option("partitionColumn", flagCol).
option("lowerBound", s"${YearMonth.of(2019, 1).atDay(1).toString}").
option("numPartitions", s"${YearMonth.of(2019, 12).atEndOfMonth().toString}").
option("numPartitions", 3).
option("fetchsize", 1000).
load()

dataframe.write.format("bigquery").option("table", s"$tablename").mode("append").save()

我用它创建了一个jar,并使用下面的spark submit命令提交它:

gcloud dataproc jobs submit spark --cluster <CLUSTERNAME> --master yarn --deploy-mode cluster --num-executors 3 --executor-memory 4G --executor-cores 3 --driver-class-path <path_of_driver_jar> --driver-memory 1G --jars <path_of_required_jars> --class com.packageName.className mysparkcode.jar

我面临的问题是,从spark到源数据库的连接随机失败,例外情况如下:

Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1981)

起初我以为是网络/执行器超时,并添加了以下配置:

spark.network.timeout 10000000, spark.executor.heartbeatInterval 10000000

但问题依然存在。
所以我在我的本地服务器上尝试了同样的代码,并且在最小资源上没有任何问题。我还尝试了在我们的on prem hadoop集群上使用相同的代码,其中spark可用,并将Dataframe保存为一个虚拟Parquet文件。令我感到好笑的是,这项工作甚至没有投入大量的资源,却很顺利。作业在90秒内将2.5gb的数据加载到Parquet文件中。
prem版本:

Spark: 2.3.3
Scala: 2.11.12

但是同样的代码在我的dataproc集群上失败了。
这就是我创建集群的方式。

gcloud dataproc clusters create <CLUSTERNAME> --enable-component-gateway --bucket <GCP_BUCKET_NAME> --region <REGION_NAME> --subnet <SUBNET_ADDRESS> --no-address --zone <ZONE> --master-machine-type n1-standard-8 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-8 --worker-boot-disk-size 500 --metadata 'PIP_PACKAGES=pyspark==2.4.0' --initialization-actions SOME_STARTUP_SCRIPT.sh,//SOME_PATH/pip-install.sh --image-version 1.5-debian10 --project <PROJECT_NAME> --service-account=<SERVICE_ACCOUNT_NAME> --properties <SOME_JARS>,dataproc:dataproc.conscrypt.provider.enable=false --optional-components ANACONDA,JUPYTER

在我们的on prem hadoop集群上成功执行需要90秒才能将Dataframe保存为Parquet文件。但在dataproc上,即使是像count(*)、min/max of columns这样的最小查询也会导致 Caused by: java.net.SocketException: Connection reset 我甚至尝试将内存参数加倍,结果却遇到了同样的异常。
这是因为我编写代码的方式和执行代码的方式,还是因为版本不匹配的问题?如果不是,我创建集群的方式有问题吗?在集群创建命令中或代码中的任何地方,是否应该使用任何网络设置?有人能告诉我该怎么解决这个问题吗?非常感谢您的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题