我试图理解在多线程应用程序中使用flume rpcclient的正确方法。到目前为止,我发现的信息表明组件是线程安全的,但是flume文档中的示例在错误处理方面掩盖了这个问题。此代码:
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
如果有多个线程调用此方法,并且抛出异常,那么当多个线程尝试在异常处理程序中重新创建客户机时,就会出现问题。
sdk的目的是只允许单个线程使用它吗?这个方法应该被同步吗,就像它出现在flume源的log4jappender中一样?我应该把这个代码放在它自己的worker中,并通过一个队列向它传递事件吗?
有没有人有一个rpcclient被多个线程使用的例子(包括错误条件)?
使用“嵌入式代理”会更好吗?多线程友好吗?
1条答案
按热度按时间pbpqsu0x1#
使用embedded agent,除了不知道要做什么之外,您得到的情况是相同的:
您可以停止代理,然后重新启动它—但是您需要一个同步块(或
ReentrantReadWriteLock
,在“读取”线程时不阻塞线程client
字段)。但由于我不是FlumeMaven,我不能告诉你哪一个更好。例子:
此外,示例将丢失事件,因为它不会被发回。某种循环+最大重试可能会达到以下效果:
这是想法,但我不认为这应该是用户(例如:us)的任务,而是客户机或代理中的一个选项,用于存储由于此类错误而无法处理的事件。