scala—使用配置单元查询返回的Dataframe时http客户端超时

w8f9ii69  于 2021-06-25  发布在  Hive
关注(0)|答案(1)|浏览(352)

这似乎是一个非常奇怪和具体的问题,这让我感到困惑。
当使用 DataFrame 由一个 spark.sql("select * from table") 查询配置单元表时,每当我尝试在该配置单元表的转换或操作步骤中使用http客户端时,都会出现超时异常 DataFrame .
例子:

import scalaj.http._
import org.apache.spark.sql.SparkSession

object Example {

  def postDoc(doc: String): Unit = {
    val resp = Http("https://example.com/endpoint")
      .postData(doc)
      .header("content-type", "application/json")
      .asString
   }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
    import spark.implicits._
    val df = spark.sql("select id, json_doc from some_table")
    df.map(r => r.getAs[String]("json_doc")).foreach(postDoc _)
  }
}

不过,我可以通过 DataFrame 我手动创建;即 Seq((1, "{\"a\": 1}")).toDF("id", "json_doc").foreach(postDoc _) .
我也尝试过创建临时表和使用 spark.sql 从中选择;在一个 DataFrame 我手动创建的,但不是在源于配置单元表的表上创建的。
我的部分 build.sbt ```
scalaVersion := "2.11.12"

libraryDependencies ++= {
val sparkVersion = "2.1.3"
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.scalaj" %% "scalaj-http" % "2.4.2"
)
}

堆栈跟踪

java.net.SocketTimeoutException: Read timed out
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:1950)
at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:1945)
at java.security.AccessController.doPrivileged(Native Method)
at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:1944)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1514)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:367)
at scalaj.http.HttpRequest.exec(Http.scala:343)
at scalaj.http.HttpRequest.asString(Http.scala:492)
at com.gm.avalanche.collect.Collector$.postDoc(Collector.scala:34)
at com.gm.avalanche.collect.Collector$$anonfun$sqlTest$2.apply(Collector.scala:71)
at com.gm.avalanche.collect.Collector$$anonfun$sqlTest$2.apply(Collector.scala:71)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:268)
at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:365)
... 17 more

5jvtdoz2

5jvtdoz21#

原来这个套接字是由运行elasticsearch示例的kubernates环境前面的入口控制器远程关闭的。它被设置为默认的一分钟超时。

相关问题