flumerpcclient多线程

qgelzfjb  于 2021-06-03  发布在  Flume
关注(0)|答案(1)|浏览(396)

我试图理解在多线程应用程序中使用flume rpcclient的正确方法。到目前为止,我发现的信息表明组件是线程安全的,但是flume文档中的示例在错误处理方面掩盖了这个问题。此代码:

public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = RpcClientFactory.getDefaultInstance(hostname, port);
      // Use the following method to create a thrift client (instead of the above line):
      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }
  }

如果有多个线程调用此方法,并且抛出异常,那么当多个线程尝试在异常处理程序中重新创建客户机时,就会出现问题。
sdk的目的是只允许单个线程使用它吗?这个方法应该被同步吗,就像它出现在flume源的log4jappender中一样?我应该把这个代码放在它自己的worker中,并通过一个队列向它传递事件吗?
有没有人有一个rpcclient被多个线程使用的例子(包括错误条件)?
使用“嵌入式代理”会更好吗?多线程友好吗?

pbpqsu0x

pbpqsu0x1#

使用embedded agent,除了不知道要做什么之外,您得到的情况是相同的:

try {
  agent.put(event);
} catch (EventDeliveryException e) {
  // ???      
}

您可以停止代理,然后重新启动它—但是您需要一个同步块(或 ReentrantReadWriteLock ,在“读取”线程时不阻塞线程 client 字段)。但由于我不是FlumeMaven,我不能告诉你哪一个更好。
例子:

class MyClass {
  private final ReentrantReadWriteLocklock;
  private final Lock readLock;
  private final Lock writeLock;
  private RpcClient client;
  private final String hostname;
  private final Integer port;

  // Constructor
  MyClass(String hostname, Integer port) {
    this.hostname = Objects.requireNonNull(hostname, "hostname");
    this.port = Objects.requireNonNull(port, "port");
    this.lock = new ReentrantReadWriteLock();
    this.readLock = this.lock.readLock();
    this.writeLock = this.lock.writeLock();
    this.client = buildClient();
  }

  private RpcClient buildClient() {
    return RpcClientFactory.getDefaultInstance(hostname, port);
  }    

  public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    readLock.lock(); // lock for reading 'client'
    try {      
      try {
        client.append(event);
      } catch (EventDeliveryException e) {
        writeLock.lock(); // lock for reading/writing client
        try {
          // clean up and recreate the client
          client.close();
          client = null;
          client = buildClient();
        } finally {
          writeLock.unlock();
        }
      }
    } finally {
      readLock.unlock();
    }
  }
}

此外,示例将丢失事件,因为它不会被发回。某种循环+最大重试可能会达到以下效果:

int i = 0;
for (; i < maxRetry; ++i) {
  try {
    client.append(event);
    break;
  } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = RpcClientFactory.getDefaultInstance(hostname, port);
      // Use the following method to create a thrift client (instead of the above line):
      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  }
}
if (i == maxRetry) {
  logger.error("flume client is offline, loosing events {}", event);
}

这是想法,但我不认为这应该是用户(例如:us)的任务,而是客户机或代理中的一个选项,用于存储由于此类错误而无法处理的事件。

相关问题