我正在使用sink将spark(2.3)结构化流式Dataframe保存到具有自定义sink实现的配置单元表中。
代码如下。
val df = spark.readStream.format("socket").option("host", "localhost").option("port", 19191).load().as[String]
val query = df.map { s => val records = s.split(",") assert(records.length >= 4)
(records(0).toInt, records(1), records(2), records(3))
}
query.selectExpr("_1 as eid", "_2 as name", "_3 as salary", "_4 as designation").
writeStream.
format("hive-streaming").
option("metastore", ".....").
option("db", "test").
option("table", "test_employee").
option("checkpointLocation", "/checkpoints/employee/checkpoint").
queryName("socket-hive-streaming").
start()
这将导致以下运行时错误。
ERROR streaming.MicroBatchExecution: Query socket-hive-streaming [id = ......, runId = ......] terminated with error
java.lang.RuntimeException: Offsets committed out of order: 1 followed by 0
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.TextSocketSource.commit(socket.scala:146)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:356)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:355)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:355)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
1条答案
按热度按时间2fjabf4q1#
有两种方法可以解决您的问题:
删除/清除检查点:
/checkpoints/employee/checkpoint
在你的机器上使用另一个保持偏移的源,比如Kafka
遇到此问题的原因是套接字不维护偏移量信息。
重新启动从接收输入数据的作业时
socket 9999
,您的工作要做的第一件事是尝试从/checkpoints/employee/checkpoint
,它发现已记录的当前偏移量为1
. 然后输入其他消息到socket 9999
,您的作业发现socket 9999
是0
. 所以它抛出了这个异常。