在flink中调试自定义管道转换器

q9yhzks0  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(392)

我试图在flink中实现一个定制的转换器,但当我试图执行它时,它似乎 fit 操作从未被调用。以下是我迄今为止所做的:

  1. class InfoGainTransformer extends Transformer[InfoGainTransformer] {
  2. import InfoGainTransformer._
  3. private[this] var counts: Option[collection.immutable.Vector[Map[Key, Double]]] = None
  4. // here setters for params, as Flink does
  5. }
  6. object InfoGainTransformer {
  7. // ====================================== Parameters =============================================
  8. // ...
  9. // ==================================== Factory methods ==========================================
  10. // ...
  11. // ========================================== Operations =========================================
  12. implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
  13. override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
  14. val counts = collection.immutable.Vector[Map[Key, Double]]()
  15. input.map {
  16. v =>
  17. v.vector.map {
  18. case (i, value) =>
  19. println("INSIDE!!!")
  20. val key = Key(value, v.label)
  21. val cval = counts(i).getOrElse(key, .0)
  22. counts(i) + (key -> cval)
  23. }
  24. }
  25. }
  26. }
  27. implicit def fitVectorInfoGain[T <: Vector] = new FitOperation[InfoGainTransformer, T] {
  28. override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[T]): Unit = {
  29. input
  30. }
  31. }
  32. implicit def transformLabeledVectorsInfoGain = {
  33. new TransformDataSetOperation[InfoGainTransformer, LabeledVector, LabeledVector] {
  34. override def transformDataSet(
  35. instance: InfoGainTransformer,
  36. transformParameters: ParameterMap,
  37. input: DataSet[LabeledVector]): DataSet[LabeledVector] = input
  38. }
  39. }
  40. implicit def transformVectorsInfoGain[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] = {
  41. new TransformDataSetOperation[InfoGainTransformer, T, T] {
  42. override def transformDataSet(instance: InfoGainTransformer, transformParameters: ParameterMap, input: DataSet[T]): DataSet[T] = input
  43. }
  44. }
  45. }

然后我试着用两种方法:

  1. val scaler = StandardScaler()
  2. val polyFeatures = PolynomialFeatures()
  3. val mlr = MultipleLinearRegression()
  4. val gain = InfoGainTransformer().setK(2)
  5. // Construct the pipeline
  6. val pipeline = scaler
  7. .chainTransformer(polyFeatures)
  8. .chainTransformer(gain)
  9. .chainPredictor(mlr)
  10. val r = pipeline.predict(dataSet map (_.vector))
  11. r.print()

只有我的变压器:

  1. pipeline.fit(dataSet)

在这两种情况下,当我在 fitLabeledVectorInfoGain ,例如在 input.map ,调试器在那里停止,但是如果我也在嵌套Map中设置断点,例如bellow println("INSIDE!!!") ,它永远不会停在那里。
有人知道我如何调试这个自定义转换器吗?

d7v8vwbk

d7v8vwbk1#

现在看来它起作用了。我想发生的事情是我没有实施正确的计划 FitOperation 因为示例状态中没有保存任何内容,所以现在执行以下操作:

  1. implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
  2. override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
  3. // val counts = collection.immutable.Vector[Map[Key, Double]]()
  4. val r = input.map {
  5. v =>
  6. v.vector.foldLeft(Map.empty[Key, Double]) {
  7. case (m, (i, value)) =>
  8. println("INSIDE fit!!!")
  9. val key = Key(value, v.label)
  10. val cval = m.getOrElse(key, .0) + 1.0
  11. m + (key -> cval)
  12. }
  13. }
  14. instance.counts = Some(r)
  15. }
  16. }

现在调试器在所有断点和 TransformOperation 它也被称为。

展开查看全部

相关问题