在发送到通道之前删除null flume事件

ujv3wf0j  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(362)

我在flume的一本书中读到,如果在interceptor的intercept方法中,一个事件被返回为null,那么该事件将被丢弃。因此,我创建了一个自定义拦截器,它根据一个条件将事件返回为null,如下所示:

public Event intercept(Event event) {
    // TODO Auto-generated method stub
    Event finalEvent = event;
    check = new String(event.getBody(),Charsets.UTF_8);

    if(check.matches("([0-9]-.+?-.+?-[0-9][0-9]+)")){

        try {
            fileWriter.append(new String(event.getBody(),Charsets.UTF_8)+ "\n");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finalEvent = null;
    }
    System.out.println("Event is : " + finalEvent);
    return finalEvent;
}

拦截器发出空事件,但文件通道仍将其作为空事件传递给hdfs接收器。为什么不取消这个活动??我使用假脱机目录作为源。

zzoitvuj

zzoitvuj1#

让我们看看会发生什么。使用假脱机目录作为源,源调用函数processeventbatch(events),在processeventbatch()中:

events = interceptorChain.intercept(events);//use your custom interceptor
...
eventQueue.add(event); // add user event to queue,even the event == null

如果假脱机目录源使用processevent(),那么拦截器将工作:inside processevent():

event = interceptorChain.intercept(event);
if (event == null) {
  //null event then return !! intercept works!!
  return;
}

因此,您应该修改processeventbatch(),并执行以下操作:

if (event == null){
    //dont add to eventQueue
}
x759pob2

x759pob22#

在我的拦截器类中,方法 intercept(Event event) 包含有关数据如何流动(如问题中所述)的逻辑,该逻辑在正则表达式不满足时返回null intercept(List<Event> events) 空事件被排除在目的服务器之外。以下是 intercept(List<Event> events) 代码:

public List<Event> intercept(List<Event> events) 
    {
          List<Event> interceptedEvents = new ArrayList<Event>(events.size());
          for (Event event : events) 
          {
              // Intercept any event
              Event interceptedEvent = intercept(event);
              if(interceptedEvent!=null)
                  interceptedEvents.add(interceptedEvent);
          }

          return interceptedEvents;
    }

相关问题