通过avro反序列化c中的flume事件

vsikbqxv  于 2021-06-03  发布在  Flume
关注(0)|答案(1)|浏览(329)

我已经建立了一个flume服务,它可以监视netcat或者跟踪一个以exec作为源的日志,诸如此类。im使用内存作为通道,avro作为接收器(文档中指定了节俭,但在flume 1.3或1.4中似乎不起作用)
我已经设置了一个c#socket服务器来接收消息,我得到了一个字节流。如果我使用encoding.utf8.getstring(buffer)读取它们,那么我可以看到如下内容:

"\0\0\0\0\0\0\0\0\00�����Tt������5\ne\0�����Tt������5\ne\0\0appendBatch\0\0�\0�127.0.0.1 - - [12/Nov/2013:22:42:50 +0000] \"GET /docs/appdev/index.html HTTP/1.1\" 200 7645\0�127.0.0.1 - - [12/Nov/2013:22:44:07 +0000] \"GET /docs/appdev/introduction.html HTTP/1.1\" 200 8619\0�127.0.0.1 - - [12/Nov/2013:22:44:09 +0000] \"GET /docs/appdev/installation.html HTTP/1.1\" 200 9045\0�127.0.0.1 - - [12/Nov/2013:22:44:12 +0000] \"GET /docs/appdev/deployment.html HTTP/1.1\" 200 18800\0�127.0.0.1 - - [12/Nov/2013:22:49:07 +0000] \"GET /docs/appdev/source.html HTTP/1.1\" 200 24554\0�127.0.0.1 - - [12/Nov/2013:22:50:38 +0000] \"GET /docs/appdev/processes.html HTTP/1.1\" 200 30743\0�127.0.0.1 - - [12/Nov/2013:22:51:39 +0000] \"GET /docs/appdev/sample/ HTTP/1.1\" 200 1852\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /sample HTTP/1.1\" 404 963\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /favicon.ico HTTP/1.1\" 200 21630\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:23:02:13 +0000] \"GET /sample HTTP/1.1\" 404 963\0"

所以很明显,我是在传递数据,但我希望正确地反序列化它,而不是进行某种regex提取。我可以看到有一个官方的avroc#库,还有一个微软hadoop库,它有反序列化库。我创建了一个要反序列化的本地对象:

[DataContract]
public class AvroEvent
{
    [DataMember]
    public byte[] Body { get; set; }
}

尝试反序列化:

client = serverSocket.EndAccept(result);
  var myNetworkStream = new NetworkStream(client);
  myNetworkStream.Read(buffer, 0, size);
  var avro = new AvroSerializer(typeof(AvroEvent));
  var deser = avro.Deserialize(myNetworkStream);

然后我得到这个错误:

System.InvalidOperationException was unhandled
  HResult=-2146233079
  Message=Unexpected number of bytes.
  Source=Microsoft.Hadoop.Avro

我几乎可以肯定这一切都是以错误的方式进行的,我肯定人们会告诉我不要使用c#,但我在google上的资源已经差不多用完了,所以如果其他人真的这么做了,并为我指明了正确的方向,我会非常感激
托比

lg40wkob

lg40wkob1#

flume使用rpc机制来传递事件。如果选择了avro,那么flume依赖于avro-rpc,而微软的avro库不支持avro-rpc(如新内容中所述),因为它只打算用作序列化框架。
从技术上讲 Deserialize() 方法期望流具有以下数据(以位为单位): 11[size of byte array encoded in variable-length zig zag][actual byte] (*)
您收到的错误可能是因为收到的数据具有不同的wire格式。

  • 开始 1 是必需的,因为库的版本0.8.4951.5418将每个类型封装在null(0)和类型(1)的并集中,因此第一个 1 记录在案 1 是为了这个领域 Body . 在最新版本1.1.0.5中可以配置此行为。

相关问题