我在尝试运行flink流媒体应用程序时出现以下错误。
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.test.SwissProt: no suitable constructor found, can not deserialize from Object value (missing default constructor or creator, or perhaps need to add/enable type information?)
at [Source: [B@681c6d54; line: 1, column: 12]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:261)
at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1456)
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1012)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1203)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:314)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2920)
at com.test.SwissProtDeserializationSchema.deserialize(SwissProtDeserializationSchema.scala:17)
at com.test.SwissProtDeserializationSchema.deserialize(SwissProtDeserializationSchema.scala:9)
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
at java.lang.Thread.run(Thread.java:745)
在scala中,我认为当您创建一个case类时,会创建默认构造函数吗?我不明白这个错误。请帮帮我!
我有以下scala对象:
运行flink流的主scala对象
package com.test
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
object Run {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val rawStream = env.addSource(new FlinkKafkaConsumer09("XML", new SwissProtDeserializationSchema,properties))
rawStream.print
env.execute()
}
}
描述输入的case类
package com.test
case class SwissProt (name: String,
address: String,
phoneNumber: String,
cellPhoneNumber: String
) {
}
最后,反序列化类将kafka事件提取到我的case类对象中
package com.test
import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema
import com.fasterxml.jackson.dataformat.xml.XmlMapper
class SwissProtDeserializationSchema extends AbstractDeserializationSchema[SwissProt]{
private var xmlMapper: XmlMapper = null
override def deserialize(bytes: Array[Byte]): SwissProt = {
if (xmlMapper == null) {
xmlMapper = new XmlMapper()
}
xmlMapper.readValue(bytes, classOf[SwissProt])
}
}
1条答案
按热度按时间ne5o7dgx1#
您只需要注册defaultscalamodule。所以