在java中使用spark bigquery连接器时遇到问题

r7xajy2e  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(813)

我可以通过spark big query connector从本地读取bigquery表中的数据,但是当我在google cloud中部署它并通过dataproc运行时,我得到了下面的异常。如果您看到下面的日志,它可以识别表的架构,然后等待8-10分钟并抛出下面的异常。有人能帮忙吗?

  1. 20/10/30 13:15:40 INFO org.spark_project.jetty.util.log: Logging initialized @2859ms
  2. 20/10/30 13:15:40 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
  3. 20/10/30 13:15:40 INFO org.spark_project.jetty.server.Server: Started @2959ms
  4. 20/10/30 13:15:40 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector
  5. 20/10/30 13:15:40 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
  6. 20/10/30 13:15:41 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at <REMOVED_RM_INFO_FOR_SECURITY_PURPOSE>
  7. 20/10/30 13:15:41 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at <REMOVED_HISTORY_SERVER_INFO_FOR_SECURITY_PURPOSE>
  8. 20/10/30 13:15:44 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1603913904708_0011
  9. 20/10/30 13:15:50 INFO com.google.cloud.spark.bigquery.BigQueryUtilScala: BigQuery client project id is [<REMOVED_PROJECT_ID_FOR_SECURITY_PURPOSE>}], derived from the parentProject option
  10. 20/10/30 13:15:52 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Querying table <REMOVED_TABLE_NAME_FOR_SECURITY_PURPOSE>, parameters sent from Spark: requiredColumns=[country,ssn,fname,postal_code,lname,city,tenant_id,mob,PARTY_ID,src_id], filters=[]
  11. 20/10/30 13:15:52 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Going to read from <REMOVED_TABLE_NAME_FOR_SECURITY_PURPOSE> columns=[country, ssn, fname, postal_code, lname, city, tenant_id, mob, PARTY_ID, src_id], filter=''
  12. Exception in thread "main" com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
  13. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
  14. at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
  15. at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
  16. at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
  17. at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
  18. at shaded.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074)
  19. at shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
  20. at shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213)
  21. at shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
  22. at shaded.com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
  23. at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
  24. at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
  25. at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
  26. at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
  27. at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
  28. at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
  29. at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
  30. at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
  31. at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
  32. at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  33. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  34. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  35. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  36. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  37. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  38. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  39. at java.lang.Thread.run(Thread.java:748)
  40. Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
  41. at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
  42. at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
  43. at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.createReadSession(BigQueryReadClient.java:230)
  44. at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.scala:134)
  45. at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
  46. at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
  47. at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:338)
  48. at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:337)
  49. at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:415)
  50. at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:333)
  51. at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
  52. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  53. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  54. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  55. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  56. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
  57. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  58. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  59. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  60. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  61. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  62. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  63. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  64. at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  65. at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  66. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  67. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  68. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  69. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  70. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  71. at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  72. at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  73. at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  74. at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  75. at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
  76. at org.apache.spark.sql.Dataset.head(Dataset.scala:2495)
  77. at org.apache.spark.sql.Dataset.take(Dataset.scala:2709)
  78. at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  79. at org.apache.spark.sql.Dataset.show(Dataset.scala:731)
  80. at com.gcp.poc.SparkBigQueryConnector.main(SparkBigQueryConnector.java:33)
  81. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  82. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  83. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  84. at java.lang.reflect.Method.invoke(Method.java:498)
  85. at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  86. at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
  87. at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
  88. at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
  89. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
  90. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  91. Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
  92. at io.grpc.Status.asRuntimeException(Status.java:533)
  93. ... 16 more
  94. Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: null: bigquerystorage.googleapis.com
  95. at io.netty.channel.unix.Errors.throwConnectException(Errors.java:102)
  96. at io.netty.channel.unix.Socket.connect(Socket.java:255)
  97. at io.netty.channel.epoll.AbstractEpollChannel.doConnect0(AbstractEpollChannel.java:758)
  98. at io.netty.channel.epoll.AbstractEpollChannel.doConnect(AbstractEpollChannel.java:743)
  99. at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.connect(AbstractEpollChannel.java:585)
  100. at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
  101. at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
  102. at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
  103. at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
  104. at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
  105. at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
  106. at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
  107. at io.grpc.netty.WriteBufferingAndExceptionHandler.connect(WriteBufferingAndExceptionHandler.java:150)
  108. at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
  109. at io.netty.channel.AbstractChannelHandlerContext.access$1000(AbstractChannelHandlerContext.java:38)
  110. at io.netty.channel.AbstractChannelHandlerContext$11.run(AbstractChannelHandlerContext.java:535)
  111. at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
  112. at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
  113. at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
  114. at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
  115. at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
  116. ... 1 more
  117. Caused by: java.net.NoRouteToHostException
  118. ... 22 more
  119. 20/10/30 13:25:35 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark
  120. Job output is complete

pom.xml文件:

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <java.version>1.8</java.version>
  4. <maven.compiler.source>1.8</maven.compiler.source>
  5. <maven.compiler.target>1.8</maven.compiler.target>
  6. <spark.version>2.3.4</spark.version>
  7. <scala.version>2.11.8</scala.version>
  8. </properties>
  9. <distributionManagement>
  10. <snapshotRepository>
  11. <id>sonatype-nexus-snapshots</id>
  12. <url>https://oss.sonatype.org/content/repositories/snapshots</url>
  13. </snapshotRepository>
  14. <repository>
  15. <id>sonatype-nexus-staging</id>
  16. <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
  17. </repository>
  18. </distributionManagement>
  19. <dependencies>
  20. <dependency>
  21. <groupId>com.google.cloud</groupId>
  22. <artifactId>google-cloud-shared-dependencies</artifactId>
  23. <version>0.13.0</version>
  24. <type>pom</type>
  25. <scope>import</scope>
  26. </dependency>
  27. <dependency>
  28. <groupId>com.google.cloud</groupId>
  29. <artifactId>google-cloud-bigquery</artifactId>
  30. <version>1.116.10</version>
  31. <exclusions>
  32. <exclusion>
  33. <groupId>com.google.guava</groupId>
  34. <artifactId>guava-jdk5</artifactId>
  35. </exclusion>
  36. <exclusion>
  37. <groupId>com.google.guava</groupId>
  38. <artifactId>guava</artifactId>
  39. </exclusion>
  40. <exclusion>
  41. <groupId>com.google.guava</groupId>
  42. <artifactId>failureaccess</artifactId>
  43. </exclusion>
  44. <exclusion>
  45. <groupId>com.google.guava</groupId>
  46. <artifactId>listenablefuture</artifactId>
  47. </exclusion>
  48. </exclusions>
  49. </dependency>
  50. <dependency>
  51. <groupId>com.google.cloud.spark</groupId>
  52. <artifactId>spark-bigquery_2.11</artifactId>
  53. <version>0.17.3</version>
  54. <exclusions>
  55. <exclusion>
  56. <groupId>com.google.guava</groupId>
  57. <artifactId>guava-jdk5</artifactId>
  58. </exclusion>
  59. <exclusion>
  60. <groupId>com.google.guava</groupId>
  61. <artifactId>guava</artifactId>
  62. </exclusion>
  63. <exclusion>
  64. <groupId>com.google.guava</groupId>
  65. <artifactId>failureaccess</artifactId>
  66. </exclusion>
  67. <exclusion>
  68. <groupId>com.google.guava</groupId>
  69. <artifactId>listenablefuture</artifactId>
  70. </exclusion>
  71. </exclusions>
  72. </dependency>
  73. <dependency>
  74. <groupId>org.projectlombok</groupId>
  75. <artifactId>lombok</artifactId>
  76. <version>1.18.10</version>
  77. <scope>provided</scope>
  78. </dependency>
  79. <dependency>
  80. <groupId>org.apache.spark</groupId>
  81. <artifactId>spark-core_2.11</artifactId>
  82. <version>${spark.version}</version>
  83. </dependency>
  84. <dependency>
  85. <groupId>org.apache.spark</groupId>
  86. <artifactId>spark-sql_2.11</artifactId>
  87. <version>${spark.version}</version>
  88. </dependency>
  89. <dependency>
  90. <groupId>com.google.guava</groupId>
  91. <artifactId>guava</artifactId>
  92. <version>30.0-jre</version>
  93. </dependency>
  94. <dependency>
  95. <groupId>org.scala-lang</groupId>
  96. <artifactId>scala-library</artifactId>
  97. <version>${scala.version}</version>
  98. </dependency>
  99. <dependency>
  100. <groupId>org.scala-lang</groupId>
  101. <artifactId>scala-compiler</artifactId>
  102. <version>${scala.version}</version>
  103. </dependency>
  104. <dependency>
  105. <groupId>org.scala-lang</groupId>
  106. <artifactId>scala-reflect</artifactId>
  107. <version>${scala.version}</version>
  108. </dependency>
  109. </dependencies>
  110. <build>
  111. <plugins>
  112. <plugin>
  113. <groupId>org.apache.maven.plugins</groupId>
  114. <artifactId>maven-compiler-plugin</artifactId>
  115. <version>3.3</version>
  116. <configuration>
  117. <source>${java.version}</source>
  118. <target>${java.version}</target>
  119. </configuration>
  120. </plugin>
  121. <!-- Maven Shade Plugin -->
  122. <plugin>
  123. <groupId>org.apache.maven.plugins</groupId>
  124. <artifactId>maven-shade-plugin</artifactId>
  125. <version>3.2.1</version>
  126. <executions>
  127. <!-- Run shade goal on package phase -->
  128. <execution>
  129. <phase>package</phase>
  130. <goals>
  131. <goal>shade</goal>
  132. </goals>
  133. <configuration>
  134. <filters>
  135. <filter>
  136. <artifact>*:*</artifact>
  137. <excludes>
  138. <exclude>META-INF/*.SF</exclude>
  139. <exclude>META-INF/*.DSA</exclude>
  140. <exclude>META-INF/*.RSA</exclude>
  141. </excludes>
  142. </filter>
  143. </filters>
  144. <finalName>${project.artifactId}-${project.version}</finalName>
  145. <relocations>
  146. <relocation>
  147. <pattern>com.google.common</pattern>
  148. <shadedPattern>shaded.com.google.common</shadedPattern>
  149. </relocation>
  150. <relocation>
  151. <pattern>com.google.protobuf</pattern>
  152. <shadedPattern>shaded.com.google.protobuf</shadedPattern>
  153. </relocation>
  154. </relocations>
  155. <transformers>
  156. <transformer
  157. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  158. </transformer>
  159. <transformer
  160. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  161. </transformers>
  162. </configuration>
  163. </execution>
  164. </executions>
  165. </plugin>
  166. <plugin>
  167. <groupId>org.jacoco</groupId>
  168. <artifactId>jacoco-maven-plugin</artifactId>
  169. <version>0.8.5</version>
  170. <executions>
  171. <execution>
  172. <id>prepare-agent</id>
  173. <goals>
  174. <goal>prepare-agent</goal>
  175. </goals>
  176. <configuration>
  177. <destFile>target/ut-coverage.exec</destFile>
  178. </configuration>
  179. </execution>
  180. <execution>
  181. <id>report</id>
  182. <phase>verify</phase>
  183. <goals>
  184. <goal>report</goal>
  185. </goals>
  186. <configuration>
  187. <dataFile>target/ut-coverage.exec</dataFile>
  188. <outputDirectory>target/jacoco-ut</outputDirectory>
  189. </configuration>
  190. </execution>
  191. </executions>
  192. </plugin>
  193. </plugins>

示例代码:

  1. public static SparkSession getSparkSession() {
  2. return SparkSession.builder()
  3. //.master("local[*]")
  4. .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-latest.jar")
  5. .getOrCreate();
  6. }
  7. public static void main(String[] args) {
  8. SparkSession session = getSparkSession();
  9. Dataset<Row> readDS = session.read().format("bigquery")
  10. .option("table", "<PROJECT_ID.DATASET.TABLENAME>")
  11. .option("project", projectId)
  12. .option("parentProject", projectId)
  13. .load();
  14. readDS.show(1,false);
  15. }
izj3ouym

izj3ouym1#

对于其他人,
下面是我使用的大查询依赖项,它现在可以正常工作了。

  1. <dependency>
  2. <groupId>com.google.cloud.spark</groupId>
  3. <artifactId>spark-bigquery-with-dependencies_2.12</artifactId>
  4. <version>0.17.3</version>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>com.google.guava</groupId>
  8. <artifactId>guava-jdk5</artifactId>
  9. </exclusion>
  10. <exclusion>
  11. <groupId>com.google.guava</groupId>
  12. <artifactId>guava</artifactId>
  13. </exclusion>
  14. <exclusion>
  15. <groupId>com.google.guava</groupId>
  16. <artifactId>failureaccess</artifactId>
  17. </exclusion>
  18. <exclusion>
  19. <groupId>com.google.guava</groupId>
  20. <artifactId>listenablefuture</artifactId>
  21. </exclusion>
  22. </exclusions>
  23. </dependency>
展开查看全部

相关问题