java—处理序列化框架的不兼容版本更改

d6kp6zgx  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(268)

问题描述

我们有一个hadoop集群,在该集群上存储使用kryo(序列化框架)序列化为字节的数据。我们以前使用的kryo版本是从官方版本2.21中派生出来的,用于将我们自己的补丁程序应用到我们在使用kryo时遇到的问题上。当前的kryo版本2.22也修复了这些问题,但解决方案不同。因此,我们不能仅仅更改我们使用的kryo版本,因为这意味着我们将无法再读取已经存储在hadoop集群上的数据。为了解决这个问题,我们想运行一个hadoop作业
读取存储的数据
反序列化与旧版本kryo一起存储的数据
用kryo的新版本序列化还原的对象
将新的序列化表示写回数据存储
问题是,在一个java程序中使用同一类的两个不同版本(更准确地说,在hadoop作业的mapper类中)并不是一件小事。

简言之问题

如何在一个hadoop作业中反序列化和序列化具有相同序列化框架的两个不同版本的对象?

相关事实概述

我们的数据存储在hadoopcdh4集群上,用kryo版本2.21.2-1序列化
我们想用kryo版本2.22序列化数据,该版本与我们的版本不兼容
我们用apachemaven构建hadoop作业jar

可能的(和不可能的)方法

(1) 重命名包

我们想到的第一种方法是使用maven shade插件的重定位功能重命名kryo分支中的包,并使用不同的工件id发布它,这样我们就可以在转换作业项目中依赖这两个工件。然后我们将示例化一个旧版本和新版本的kryo对象,并使用旧版本进行反序列化,使用新版本再次序列化该对象。
问题
我们没有在hadoop作业中显式地使用kryo,而是通过我们自己的库的多个层来访问它。对于这些库中的每一个,都有必要
重命名涉及的包和
创建具有不同组或工件id的版本
为了让事情变得更加混乱,我们还使用了其他第三方库提供的kryo序列化程序,我们必须对这些程序执行相同的操作。

(2) 使用多个类加载器

我们提出的第二种方法是在包含转换作业的maven项目中完全不依赖kryo,而是从每个版本的jar加载所需的类,这些类存储在hadoop的分布式缓存中。序列化对象将如下所示:

public byte[] serialize(Object foo, JarClassLoader cl) {
    final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
    Object k = kryoClass.getConstructor().newInstance();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");

    Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
    Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
    writeObject.invoke(k, output, foo);
    outputClass.getMethod("close").invoke(output);
    baos.close();
    byte[] bytes = baos.toByteArray();
    return bytes;
}

问题
虽然这种方法可以示例化一个未配置的kryo对象并序列化/还原一些对象,但是我们使用了更复杂的kryo配置。这包括几个定制的序列化程序、注册的类id等等。例如,我们无法找到一种方法来为类设置自定义序列化程序,而不获取noclassdeffounderror-以下代码不起作用:

Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError

最后一行是

java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer

因为 URISerializer 类引用kryo的 Serializer 类并尝试使用自己的类装入器(即系统类装入器)装入它,而系统类装入器不知道 Serializer 班级。

(3) 使用中间序列化

目前最有希望的方法似乎是使用独立的中间序列化,例如使用gson或类似的json,然后运行两个独立的作业:
kryo:2.21.2-ourpatchbranch in 我们的常规存储->临时存储中的json
临时存储中的json->kryo:2-22 in 我们的普通商店
问题
这种解决方案的最大问题是,它将处理的数据的空间消耗大致翻了一番。此外,我们还需要另一种序列化方法,这种方法在所有数据上都不会出现问题,我们需要首先对此进行研究。

bn31dyow

bn31dyow1#

对于2,您可以创建两个jar文件,其中包含序列化程序以及新版本和旧版本序列化程序的所有依赖项,如下所示。然后创建一个map reduce作业,将每个版本的代码加载到一个单独的类装入器中,并在中间添加一些粘合代码,用旧代码反序列化,然后用新代码序列化。
您必须小心,域对象与粘合代码加载在同一个类加载器中,要序列化/反序列化的代码依赖于与粘合代码相同的类加载器,以便它们都可以看到相同的域对象类。

jogvjijk

jogvjijk2#

我将使用多类加载器方法。
(包重命名也会起作用。它看起来确实很难看,但这是一个一次性的黑客,所以美丽和正确可以退居二线。中间序列化似乎有风险——使用kryo是有原因的,使用不同的中间形式会否定这个原因。
总体设计将是:

child classloaders:      Old Kryo     New Kryo   <-- both with simple wrappers
                                \       /
                                 \     /
                                  \   /
                                   \ /
                                    |
default classloader:    domain model; controller for the re-serialization

在默认类加载器中加载域对象类
用修改过的kryo版本和 Package 器代码加载jar。 Package 器有一个静态的“main”方法,只有一个参数:要反序列化的文件名。从默认类加载器通过反射调用main方法:

Class deserializer = deserializerClassLoader.loadClass("com.example.deserializer.Main");
    Method mainIn = deserializer.getMethod("main", String.class);
    Object graph = mainIn.invoke(null, "/path/to/input/file");

此方法:
将文件反序列化为一个对象图
将对象放置到共享空间中。threadlocal是一种简单的方法,或者将其返回到 Package 器脚本。
当调用返回时,用一个简单的 Package 器将新的序列化框架加载到第二个jar中。 Package 器有一个静态“main”方法和一个参数来传递要序列化的文件名。从默认类加载器通过反射调用main方法:

Class serializer = deserializerClassLoader.loadClass("com.example.serializer.Main");
    Method mainOut = deserializer.getMethod("main", Object.class, String.class);
    mainOut.invoke(null, graph, "/path/to/output/file");

此方法
从threadlocal检索对象
序列化对象并将其写入文件
注意事项
在代码片段中,为每个对象序列化和反序列化创建一个类加载器。您可能只想加载一次类加载器,发现主要方法并循环文件,例如:

for (String file: files) {
    Object graph = mainIn.invoke(null, file + ".in");
    mainOut.invoke(null, graph, file + ".out");
}

域对象是否引用了任何kryo类?如果是这样,您就有困难:
如果引用只是一个类引用,例如调用一个方法,那么第一次使用该类将把两个kryo版本中的一个加载到默认的类加载器中。这可能会导致问题,因为序列化或反序列化的一部分可能是由错误版本的kryo执行的
如果引用用于示例化任何kryo对象并将引用存储在域模型(类或示例成员)中,那么kryo实际上将在模型中序列化它自己的一部分。这可能会破坏这种方法的交易。
无论哪种情况,您的第一种方法都应该是检查这些引用并消除它们。一种确保您已经这样做的方法是确保默认类加载器没有访问任何kryo版本的权限。如果域对象以任何方式引用kryo,则引用将失败(如果直接引用类,则会出现classnotfounderror,如果使用反射,则会出现classnotfoundexception)。

mccptt67

mccptt673#

我想到的最简单的方法就是使用一个额外的java应用程序为您进行转换。因此,您可以将二进制数据发送到辅助java应用程序(简单的本地套接字可以很好地实现这一点),这样就不必摆弄类加载器或包。
唯一要考虑的是中间表示。您可能希望使用另一种序列化机制,或者如果时间不是问题,您可能希望使用java的内部序列化。
使用第二个java应用程序可以避免处理临时存储和在内存中执行所有操作。
一旦你有了这些套接字+第二个应用程序代码,你就会发现很多情况下这是很方便的。
此外,还可以使用jgroups构建本地集群,并省去使用sockets的麻烦。jgroups是我所知道的最简单的通信api。只要形成一个逻辑通道,检查谁加入。最好的是,它甚至可以在同一个jvm中工作,这使得测试变得很容易,如果远程完成,就可以将不同的物理服务器绑定在一起,就像对本地应用程序一样。
另一个可供选择的变量是使用zeromq及其ipc(进程间通信)协议。

相关问题