我在flume的一本书中读到,如果在interceptor的intercept方法中,一个事件被返回为null,那么该事件将被丢弃。因此,我创建了一个自定义拦截器,它根据一个条件将事件返回为null,如下所示:
public Event intercept(Event event) {
// TODO Auto-generated method stub
Event finalEvent = event;
check = new String(event.getBody(),Charsets.UTF_8);
if(check.matches("([0-9]-.+?-.+?-[0-9][0-9]+)")){
try {
fileWriter.append(new String(event.getBody(),Charsets.UTF_8)+ "\n");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finalEvent = null;
}
System.out.println("Event is : " + finalEvent);
return finalEvent;
}
拦截器发出空事件,但文件通道仍将其作为空事件传递给hdfs接收器。为什么不取消这个活动??我使用假脱机目录作为源。
2条答案
按热度按时间zzoitvuj1#
让我们看看会发生什么。使用假脱机目录作为源,源调用函数processeventbatch(events),在processeventbatch()中:
如果假脱机目录源使用processevent(),那么拦截器将工作:inside processevent():
因此,您应该修改processeventbatch(),并执行以下操作:
x759pob22#
在我的拦截器类中,方法
intercept(Event event)
包含有关数据如何流动(如问题中所述)的逻辑,该逻辑在正则表达式不满足时返回nullintercept(List<Event> events)
空事件被排除在目的服务器之外。以下是intercept(List<Event> events)
代码: