我使用scala和flink 1.0-snapshot对数据集执行leftouterjoin,得到以下异常:
11:54:15,921 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at com.mycompany.FlinkTest$.main(FlinkTest.scala:99) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at select('date as 'date,'dsCode as 'dsCode,'datatype as 'datatype,'quote as 'quote,'name as 'name)) (1/1) switched to FAILED with exception.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:241)
at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:81)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
我使用一个简单的scala case类作为数据集的类型:
case class RawValue(date: String, dsCode: String, datatype: String, quote: Double, name: String)
我使用以下方法生成case类示例:
def getRawValuesFromZipFile(fileName: String) : Array[RawValue]
我通过以下方式初始化环境并创建数据集[rawvalue]:
val env = ExecutionEnvironment.createLocalEnvironment(4)
val rawValues = env.fromCollection(getRawValuesFromZipFile("filename.zip"))
rawValues.print
我怀疑是序列化问题导致了这个错误,我使用scala2.10.5和java7系统库来编译这个项目。我使用的是eclipse,项目是由示例项目生成脚本生成的。
任何关于解决问题的帮助或提示都将不胜感激:-)谢谢,丹尼尔
1条答案
按热度按时间jjjwad0x1#
这个
env.fromCollection()
调用可能并不真正适合您的用例。如果数据变得很大,它就会中断,因为数据是随作业一起传送的。在工作节点上不并行读取数据。你可以看看这个:https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#read-压缩文件,看看它是否适用于你的情况。它只支持gzip,但也许您可以用这种格式压缩数据。