我正在尝试将数据从sqlserver数据库移动到gcp上的bigquery。为此,我们创建了一个dataproc集群,我可以在其中运行spark作业,该作业连接到sqlserver上的源数据库,读取某些表并将它们摄取到bigquery。
gcp dataproc上的版本:
Spark: 2.4.7
Scala: 2.12.12
我的Spark代码:
val dataframe = spark.read.format("jdbc").option("url", s"jdbc:sqlserver://servername:port;DatabaseName=dbname").
option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").
option("user", "username").
option("password", "password").
option("dbtable", s"(select * from tablename where flagCol >= '${YearMonth.of(2019, 1).atDay(1).toString}' and flagCol <= '${YearMonth.of(2019, 12).atEndOfMonth().toString}') as dateDF").
option("partitionColumn", flagCol).
option("lowerBound", s"${YearMonth.of(2019, 1).atDay(1).toString}").
option("numPartitions", s"${YearMonth.of(2019, 12).atEndOfMonth().toString}").
option("numPartitions", 3).
option("fetchsize", 1000).
load()
dataframe.write.format("bigquery").option("table", s"$tablename").mode("append").save()
我用它创建了一个jar,并使用下面的spark submit命令提交它:
gcloud dataproc jobs submit spark --cluster <CLUSTERNAME> --master yarn --deploy-mode cluster --num-executors 3 --executor-memory 4G --executor-cores 3 --driver-class-path <path_of_driver_jar> --driver-memory 1G --jars <path_of_required_jars> --class com.packageName.className mysparkcode.jar
我面临的问题是,从spark到源数据库的连接随机失败,例外情况如下:
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1981)
起初我以为是网络/执行器超时,并添加了以下配置:
spark.network.timeout 10000000, spark.executor.heartbeatInterval 10000000
但问题依然存在。
所以我在我的本地服务器上尝试了同样的代码,并且在最小资源上没有任何问题。我还尝试了在我们的on prem hadoop集群上使用相同的代码,其中spark可用,并将Dataframe保存为一个虚拟Parquet文件。令我感到好笑的是,这项工作甚至没有投入大量的资源,却很顺利。作业在90秒内将2.5gb的数据加载到Parquet文件中。
prem版本:
Spark: 2.3.3
Scala: 2.11.12
但是同样的代码在我的dataproc集群上失败了。
这就是我创建集群的方式。
gcloud dataproc clusters create <CLUSTERNAME> --enable-component-gateway --bucket <GCP_BUCKET_NAME> --region <REGION_NAME> --subnet <SUBNET_ADDRESS> --no-address --zone <ZONE> --master-machine-type n1-standard-8 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-8 --worker-boot-disk-size 500 --metadata 'PIP_PACKAGES=pyspark==2.4.0' --initialization-actions SOME_STARTUP_SCRIPT.sh,//SOME_PATH/pip-install.sh --image-version 1.5-debian10 --project <PROJECT_NAME> --service-account=<SERVICE_ACCOUNT_NAME> --properties <SOME_JARS>,dataproc:dataproc.conscrypt.provider.enable=false --optional-components ANACONDA,JUPYTER
在我们的on prem hadoop集群上成功执行需要90秒才能将Dataframe保存为Parquet文件。但在dataproc上,即使是像count(*)、min/max of columns这样的最小查询也会导致 Caused by: java.net.SocketException: Connection reset
我甚至尝试将内存参数加倍,结果却遇到了同样的异常。
这是因为我编写代码的方式和执行代码的方式,还是因为版本不匹配的问题?如果不是,我创建集群的方式有问题吗?在集群创建命令中或代码中的任何地方,是否应该使用任何网络设置?有人能告诉我该怎么解决这个问题吗?非常感谢您的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!