[更新]有人有类似的问题:https://github.com/sksamuel/avro4s/issues/19
似乎没有好的解决办法。
[更新]
我不认为这与在apachespark中解决依赖性问题是一个重复的问题,就像我不使用scala generic来解决 Decoder
,一切正常:
// This works.
class EnigmaDecoder(props: VerifiableProperties = null) extends Decoder[AdTracking] {
override def fromBytes(bytes: Array[Byte]): AdTracking = {
if (...) null
else AdTracking.parseFrom(...)
}
}
[原文]
我有多种消息类型(例如。 AdTracking
). 它们都有类似的消息操作接口,比如静态成员 AdTracking AdTracking::parseFrom(ByteString)
.
我不想一个接一个地复制消息解析器,所以我用scala中的泛型类型 Package 它。
trait BsParser[T] {
def parseFrom(bs: ByteString): T // T: AdTracking
}
object EnigmaDecoder {
implicit object AdTrackingBsParser extends BsParser[AdTracking] {
override def parseFrom(bs: ByteString): AdTracking = AdTracking.parseFrom(bs)
}
}
class EnigmaDecoder[T >: Null : BsParser](props: VerifiableProperties = null) extends Decoder[T] {
override def fromBytes(bytes: Array[Byte]): T = {
if (...) null
// call static method AdTracking::parseFrom(bs) to build an AdTracking object
else implicitly[BsParser[T]].parseFrom(...)
}
}
还有这个 EnigmaDecoder[AdTracking]
在spark中用作:
val messages = KafkaUtils.createDirectStream[String, AdTracking, StringDecoder, EnigmaDecoder[AdTracking], (String, AdTracking)](
ssc, kafkaParams, fromOffsets, messageHandler)
不幸的是,当我用这个的时候 EgnimaDecoder[AdTracking]
在spark,我犯了这样一个错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): java.lang.NoSuchMethodException: EnigmaDecoder.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.getConstructor(Class.java:1825)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:156)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
我检查了已编译的类,但不知道如何修复它。
Compiled from "EnigmaDecoder.scala"
public class EnigmaDecoder<T> implements kafka.serializer.Decoder<T> {
private final BsParser<T> evidence$2;
public static <T> kafka.utils.VerifiableProperties apply$default$1();
public static <T> kafka.utils.VerifiableProperties $lessinit$greater$default$1();
public static <T> EnigmaDecoder<T> apply(kafka.utils.VerifiableProperties, BsParser<T>);
public T fromBytes(byte[]);
public EnigmaDecoder(kafka.utils.VerifiableProperties, BsParser<T>);
}
有人能帮我吗?
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!