我可以将avro序列化数据附加到现有的azure blob吗?

2guxujil  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(374)

我在问我能不能,但我也想知道我能不能。
下面是我的场景:我接收小批量的avro序列化消息。我想用avro-serde的一个配置单元表来存储它们,以便以后进行分析。我在azure中运行,并将消息存储在blob中。我尽量避免有很多小斑点(因为我相信这会对Hive产生负面影响)。如果我已经将avro头写入blob,我相信可以用 CloudBlockBlob.PutBlockAsync() . (只要我知道同步标记。)
但是,我检查了两个.net库,这似乎不支持我的方法(我必须一次编写整个avro容器文件)。
http://www.nuget.org/packages/apache.avro/
http://www.nuget.org/packages/microsoft.hadoop.avro/
我采取了正确的方法吗?我在图书馆遗漏了什么吗?
我的问题与这个问题类似(但不同):能否将数据附加到现有的avro数据文件中?

axr492tv

axr492tv1#

简而言之,我是想做错事。
首先,我们决定avro不是在线序列化的合适格式。主要是因为avro希望模式定义出现在每个avro文件中。这给传送的东西增加了很多重量。你仍然可以使用avro,但这不是它的设计目的(它是为hdfs上的大文件而设计的。)
其次,现有的库(对于.net)只支持通过流附加到avro文件。这不能很好地Map到azure块blob(您不希望将块blob作为流打开)。
第三,即使可以绕过前两个,单个avro文件中的所有项也应该共享相同的模式。我们有一组异类项,我们想缓冲、批处理和写入blob。在我们将项写入blob时,试图按类型/模式来分隔这些项增加了许多复杂性。最后,我们选择使用json。

w51jfk4q

w51jfk4q2#

这是可能的。
首先,必须使用cloudappendblob:

CloudAppendBlob appBlob = container.GetAppendBlobReference(
            string.Format("{0}{1}", date.ToString("yyyyMMdd"), ".log"));

appBlob.AppendText(
                string.Format(
                "{0} | Error: Something went wrong and we had to write to the log!!!\r\n",
                dateLogEntry.ToString("o")));

第二步是告诉avro lib不要在append上写头,并在append之间共享相同的同步标记:

var avroSerializer = AvroSerializer.Create<Object>();
        using (var buffer = new MemoryStream())
        {
            using (var w = AvroContainer.CreateWriter<Object>(buffer, Codec.Deflate))
            {
                Console.WriteLine("Init Sample Data Set...");
                var headerField = w.GetType().GetField("header", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
                var header = headerField.GetValue(w);
                var marker = header.GetType().GetField("syncMarker", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
                marker.SetValue(header, new byte[16]);
                using (var writer = new SequentialWriter<Object>(w, 24))
                {
                    // Serialize the data to stream by using the sequential writer
                    for (int i = 0; i < 10; i++)
                    {
                        writer.Write(new Object());
                    }
                }
            }
            Console.WriteLine("Append Sample Data Set...");

            //Prepare the stream for deserializing the data
            using (var w = AvroContainer.CreateWriter<Object>(buffer, Codec.Deflate))
            {
                var isHeaderWritten = w.GetType().GetField("isHeaderWritten", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
                isHeaderWritten.SetValue(w, true);
                var headerField = w.GetType().GetField("header", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
                var header = headerField.GetValue(w);
                var marker = header.GetType().GetField("syncMarker", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
                marker.SetValue(header, new byte[16]);

                using (var writer = new SequentialWriter<Object>(w, 24))
                {
                    // Serialize the data to stream by using the sequential writer
                    for (int i = 10; i < 20; i++)
                    {
                        writer.Write(new Object());
                    }
                }
            }
            Console.WriteLine("Deserializing Sample Data Set...");
        }

相关问题