如何在带有kafka解码器的spark中应用scala泛型类型?

9wbgstp7  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(304)

[更新]有人有类似的问题:https://github.com/sksamuel/avro4s/issues/19
似乎没有好的解决办法。
[更新]
我不认为这与在apachespark中解决依赖性问题是一个重复的问题,就像我不使用scala generic来解决 Decoder ,一切正常:

// This works.
class EnigmaDecoder(props: VerifiableProperties = null) extends Decoder[AdTracking] {
  override def fromBytes(bytes: Array[Byte]): AdTracking = {
    if (...)  null
    else AdTracking.parseFrom(...)
  }
}

[原文]
我有多种消息类型(例如。 AdTracking ). 它们都有类似的消息操作接口,比如静态成员 AdTracking AdTracking::parseFrom(ByteString) .
我不想一个接一个地复制消息解析器,所以我用scala中的泛型类型 Package 它。

trait BsParser[T] {
  def parseFrom(bs: ByteString): T  // T: AdTracking
}

object EnigmaDecoder {
  implicit object AdTrackingBsParser extends BsParser[AdTracking] {
    override def parseFrom(bs: ByteString): AdTracking = AdTracking.parseFrom(bs)
  }
}

class EnigmaDecoder[T >: Null : BsParser](props: VerifiableProperties = null) extends Decoder[T] {
  override def fromBytes(bytes: Array[Byte]): T = {
    if (...) null
    // call static method AdTracking::parseFrom(bs) to build an AdTracking object
    else implicitly[BsParser[T]].parseFrom(...)
  }
}

还有这个 EnigmaDecoder[AdTracking] 在spark中用作:

val messages = KafkaUtils.createDirectStream[String, AdTracking, StringDecoder, EnigmaDecoder[AdTracking], (String, AdTracking)](
    ssc, kafkaParams, fromOffsets,  messageHandler)

不幸的是,当我用这个的时候 EgnimaDecoder[AdTracking] 在spark,我犯了这样一个错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): java.lang.NoSuchMethodException: EnigmaDecoder.<init>(kafka.utils.VerifiableProperties)
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.getConstructor(Class.java:1825)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:156)
    at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)

我检查了已编译的类,但不知道如何修复它。

Compiled from "EnigmaDecoder.scala"
public class EnigmaDecoder<T> implements kafka.serializer.Decoder<T> {
  private final BsParser<T> evidence$2;
  public static <T> kafka.utils.VerifiableProperties apply$default$1();
  public static <T> kafka.utils.VerifiableProperties $lessinit$greater$default$1();
  public static <T> EnigmaDecoder<T> apply(kafka.utils.VerifiableProperties, BsParser<T>);
  public T fromBytes(byte[]);
  public EnigmaDecoder(kafka.utils.VerifiableProperties, BsParser<T>);
}

有人能帮我吗?
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题