我将Parquet文件读入元组数据集:
val dataset = spark.read.parquet("some-path").as[Tuple2[KeyClass, ValueClass]](Encoders.kryo)
我可以看到它与:
import spark.implicits._
dataset.map(x => s"$x._1 : $x._2").show(false)
+----------------------------+
|value |
+----------------------------+
|(1, 2) |
|(1, 3) |
|(2, 3) |
+----------------------------+
我的keyclass和valueclass实际上是带有其他嵌套类的复杂类(我不能在这里发布exect类和show方法的结果,因为代码是专有的,但下面是它们的结构):
密钥类:
public class KeyClass implements WritableComparable<KeyClass> {
private byte[] field2;
private byte[] field3;
...
public boolean equals(Object o) {...}
public int hashCode() {...}
}
值类:
public class ValueClass implements Writable {
private OtherClass1 field1;
private OtherClass2 field2;
private boolean field3;
private Long field4;
}
我需要按元组中的一个元素对它进行分组:
+----------------------------+
|value |
+----------------------------+
|(1, [2, 3]) |
|(2, 3) |
+----------------------------+
我试过:
val value1 = dataset.groupByKey(_._1)(Encoders.kryo)
val value2 = value1.mapValues(_._2)(Encoders.bean(classOf[ValueClass]))
val value3 = value2.mapGroups({case (key, value) => (key, value.toList)})
或
value1.mapGroups((a, b) => (a, b.map(_._2))).show(false)
我在使用mapgroups方法的行上收到以下异常的异常:
Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
我还尝试向该方法添加其他编码器:
val value3 = value2.mapGroups((a, b) => (a, b.toArray))(Encoders.tuple(Encoders.bean(classOf[KeyClass]), Encoders.bean(classOf[Array[ValueClass]])))
然后我在同一个方法上看到不同的异常:
Exception in thread "main" java.lang.AssertionError: assertion failed
1条答案
按热度按时间xqkwcwgp1#
groupby操作正确,问题是编码器的使用。我的自定义类是从另一个项目的遗留代码导入的,它们没有正确的编码器,也不遵守为它们创建bean编码器的规则。所以我只需要为每个操作明确指定正确的编码器:
请指出,如果我对编码器的解释是含糊不清或不正确的。