Flume抄写器源码,如何设置旧框尺寸?

knpiaxh1  于 2021-06-03  发布在  Flume
关注(0)|答案(1)|浏览(524)

我试图让flume与scribe一起工作,使用scribesource,得到以下异常,flume在几分钟内停止接收数据(大约每秒1 mb)。
有没有办法我可以设置的框架大小Flume,以便它可以接受抄写交通。约束条件是不能在划线侧改变它。。。

2014-06-10 19:40:40,405 WARN org.apache.thrift.server.THsHaServer: Exception while invoking!
org.apache.thrift.transport.TTransportException: Frame size (23757404) larger than max length (16384000)!
    at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:137)
    at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
    at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
    at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
    at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
    at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
    at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:478)
    at org.apache.thrift.server.Invocation.run(Invocation.java:18)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
blmhpbnm

blmhpbnm1#

这是由于flume scribe源使用tframetransport.factory()造成的,后者使用默认值16mb创建。
如果scribesource使用tframedtransport.factory(thriftframesize),那么这将被解决。。。

//Added to accept custom frame size
private int thriftFrameSize = DEFAULT_THRIFT_FRAME_SIZE;

private SourceCounter sourceCounter;

@Override
public void configure(Context context) {
  port = context.getInteger("port", port);

  workers = context.getInteger("workerThreads", DEFAULT_WORKERS);
  if (workers <= 0) {
    workers = DEFAULT_WORKERS;
  }

  if (sourceCounter == null) {
    sourceCounter = new SourceCounter(getName());
  }

  thriftFrameSize = context.getInteger("thriftFrameSize", DEFAULT_THRIFT_FRAME_SIZE);
  if (thriftFrameSize <= 0) {
     thriftFrameSize = DEFAULT_THRIFT_FRAME_SIZE;
  }
}

private class Startup extends Thread {

  public void run() {
    try {
      Scribe.Processor processor = new Scribe.Processor(new Receiver());
      TNonblockingServerTransport transport = new TNonblockingServerSocket(port);
      THsHaServer.Args args = new THsHaServer.Args(transport);

      args.workerThreads(workers);
      args.processor(processor);
      args.transportFactory(new TFramedTransport.Factory(thriftFrameSize));

相关问题