type参数化数据流

9jyewag0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(359)

可以吗 DStreamtype parameter 什么?
如果是,怎么做?
当我尝试的时候 lazy val qwe = mStream.mapWithState(stateSpec)myDStream: DStream[(A, B)] (类参数),我得到:

value mapWithState is not a member of org.apache.spark.streaming.dstream.DStream[(A, B)]
    lazy val qwe = mStream.mapWithState(stateSpec)
uurity8g

uurity8g1#

spark api的实质子集需要隐式 ClassTags (请参阅scala:什么是typetag以及如何使用它?) PairDStreamFunctions.mapWithState 也不例外。检查类定义:

class PairDStreamFunctions[K, V](self: DStream[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])

以及:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
  ): MapWithStateDStream[K, V, StateType, MappedType] = {
  ...
}

如果要创建一个在泛型对上操作的函数,请使用 mapWithState 你至少应该提供 ClassTags 为了 KeyType 以及 ValueType 类型:

def foo[T : ClassTag, U : ClassTag](
  stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f)

如果 StateType 以及 MappedType 你也需要参数化 ClassTags 对于这些:

def bar[T : ClassTag, U : ClassTag, V : ClassTag,  W : ClassTag](
  stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f)

相关问题