我使用ml管道和各种自定义的基于udf的转换器。我要寻找的是一种序列化/反序列化此管道的方法。
我使用
ObjectOutputStream.write()
但是,每当我尝试反序列化管道时:
java.lang.ClassNotFoundException: org.sparkexample.DateTransformer
datetransformer在哪里是我的自定义转换器。是否有任何方法/接口可以实现正确的序列化?
我发现有
MLWritable
接口可能由我的类实现(datetransformer扩展transfrormer),但是找不到有用的示例。
2条答案
按热度按时间smdncfj31#
如果您使用的是spark 2.x+,则使用defaultparamswritable扩展您的转换器
例如
然后用一个字符串参数创建一个构造函数
最后为成功阅读添加一个同伴类
我在我的生产服务器上工作。我将添加gitlab链接到项目后,当我上传它
iklwldmw2#
简而言之,你不能,至少不容易。
开发人员已经尽力使添加一个新的变压器/估计器变得尽可能困难。基本上所有的东西
org.apache.spark.ml.util.ReadWrite
是私人的(除了MLWritable
以及MLReadable
)因此没有办法使用那里的任何实用方法/类/对象。还有(我肯定你已经发现)绝对没有关于如何做到这一点的文档,但是嘿,好的代码文档本身是不是?!从中挖掘代码
org.apache.spark.ml.util.ReadWrite
以及org.apache.spark.ml.feature.HashingTF
看来你需要重写MLWritable.write
以及MLReadable.read
. 这个DefaultParamsWriter
以及DefaultParamsReader
其中似乎包含实际的save/load实现,正在保存和加载一组元数据:班
时间戳
sparkversion公司
uid编号
参数Map
(可选,额外元数据)
因此,任何实现都至少需要涵盖这些内容,而不需要学习任何模型的转换器可能就可以做到这一点。一个需要拟合的模型也需要在它的实现中保存数据
save/write
-例如,这是LocalLDAModel
是吗(https://github.com/apache/spark/blob/v1.6.3/mllib/src/main/scala/org/apache/spark/ml/clustering/lda.scala#l523)所以学习的模型只是保存为Parquet文件(似乎)作为测试,我从
org.apache.spark.ml.util.ReadWrite
这似乎是必要的,并测试了以下变压器没有做任何有用的。警告:这几乎肯定是错误的做法,而且很可能在将来被打破。我真诚地希望我误解了一些东西,有人会纠正我如何实际创建一个可以序列化/反序列化的转换器
这是spark 1.6.3的版本,如果您使用的是2.x,它可能已经损坏了
那我们需要所有的公用设施
org.apache.spark.ml.util.ReadWrite
https://github.com/apache/spark/blob/v1.6.3/mllib/src/main/scala/org/apache/spark/ml/util/readwrite.scala有了它,你就可以使用
CustomTransformer
在一个Pipeline
保存/加载管道。我在sparkshell中很快测试了它,它似乎可以工作,但肯定不是很好。