如何创建并告诉jackson objectmapper创建泛型类型?

6tdlim6h  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(476)

这是一个有趣的练习,我玩Kafka和斯卡拉。我的目标是创建一个简单的消息类型来发送Kafka主题。下面是我对采用类型参数[a]的泛型/可重用序列化程序的尝试。

import java.util.{Map => jMap}
import scala.reflect.runtime.universe._    
import org.apache.kafka.common.serialization.{Deserializer, Serializer}   
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature._    
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule    

class MySerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] {

  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
  mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)

  override def close() = {/*Do Nothing*/}
  override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/}
  override def serialize(topic: String, subject: A): Array[Byte] =
    mapper.writeValueAsBytes(subject)
  override def deserialize(topic: String, bytes: Array[Byte]): A = {
    val a: A = mapper.readValue(bytes, A.getClass()) /******PROBLEM****/
    return a
  }
}

我在反序列化中遇到的错误是objectmapper.readvalue的第二个参数。我该怎么做才能让它返回一个泛型类型a?
我的sbt:

name := "scalafunplay"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka_2.10" % "0.10.2.0",
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7",
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.7"
)

以下是我的主要应用程序:

package scalafunplay

object Mistkafer {

  def main(args: Array[String]): Unit = {

    case class Asset (ruid: String)

    val test = new Asset("Dan The Man")

    val serializer = new MySerializer[Asset]()

    val sampleSerialized = serializer.serialize("test", test)
    val sampleUnserialized = serializer.deserialize("test", test)

    println("###### RESULT: " + sampleUnserialized)

  }

}
lskq00tm

lskq00tm1#

我决定不使用Jackson。使用java.io字节数组和对象输入/输出流更容易,如下所示:

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
import java.util.{Map => jMap}
import scala.reflect.runtime.universe.TypeTag
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

class ObjectSerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] {

  override def close() = {/*Do Nothing*/}

  override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/}

  override def serialize(topic: String, subject: A): Array[Byte] = {
    val byteArrInStr = new ByteArrayOutputStream()
    val objInpStr = new ObjectOutputStream(byteArrInStr)
    objInpStr.writeObject(subject)
    byteArrInStr.toByteArray()
  }

  override def deserialize(topic: String, bytes: Array[Byte]): A = {
    val byteArrInStr = new ByteArrayInputStream(bytes)
    val objInStr = new ObjectInputStream(byteArrInStr)
    objInStr.readObject().asInstanceOf[A]
  }
}

相关问题