如何配置apacheflume来删除被ignorepattern属性忽略的文件

epfja78i  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(498)

我有数据进入一个spooldir,我拿起它使用flume和转发它进一步的一些处理。
有些文件不是必需的,所以我在flume中使用igonorepattern属性来避免被拾取。
但问题是,我收到的必需文件和非必需文件数量相等,而且我无法控制传入的数据,因此我必须接受进入spooldir的任何内容。
由于我有相当多的这些不需要的文件,我没有足够的磁盘空间来存储它们很长一段时间。因此,我想知道是否有一种方法可以让flume像对所有人一样自动删除这些文件 .COMPLETED 文件(是的,我正在删除flume拾取的文件)

iswrvxsc

iswrvxsc1#

flume假脱机目录源无法删除忽略的文件。它立即/从不仅删除已处理的文件。
有三种方法可以解决这个问题。
首先,您可以显式地解决问题(使用shell脚本或任何其他可以找到忽略模式的文件并将其删除的小程序)。我认为这不是一个好办法。
其次,您可以通过实现flume源接口来编写自己的定制假脱机目录源。对于这种小问题,需要付出很大的努力和严峻的挑战。
第三,滥用解决方案,可以使用morphline拦截器。在flume用户指南的这一部分中提到了morphline拦截器。另外,您可能想看看morphline引用
拦截器从源代码获取事件,进行一些处理,最后将其转发到您所知道的通道。
如果选择第三种解决方案,则必须使用kite-sdk。您必须使用flume-env.sh将cloudera的kite morphlines核心依赖项添加到flume\u类路径,或者只需在$apache\u flume\u home/lib中添加jar
在此解决方案中,Flume配置示例为:

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
a1.sources.src-1.interceptors = morph

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /spool/dir
a1.sources.src-1.fileHeader = true
a1.sources.src-1.ignoredPattern = 'whatever'

a1.sources.src-1.interceptors.morph.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.src-1.interceptors.morph.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.src-1.interceptors.morph.morphlineId = morphline1

a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = ch-1
a1.sinks.k1.sink.directory = /roll/dir

然后可以创建一个定制的morpline拦截器文件$apache\u flume\u home/conf/morpline.conf
在这个conf文件中,您可以处理what-if-you-want,只需小心记录对象是否返回到子进程。
这也不是一个好的解决方案,但是您可以编写java代码来执行flume事务期间的任何进程。在每个事件中,您可以检查目录,如果您不需要该文件,则可以将其删除(必须确保运行java进程的用户在此目录中具有权限)

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      {
        readJson { }
      }
      {
        java {
          imports : """
            import java.io.File;
            import java.io.IOException;
          """
          code : """
            try {
                // This code from my flume agent, you may want to use it, but it is not necessary
                // JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY);    

                // You can traverse in the relevant directory
                // and find the ignored pattern manually
                // then you can delete it with java code

                //Second part of my code
                //String rootNodeStr = rootNode.toString();
                //record.put("rootNodeStr", rootNodeStr.getBytes(StandardCharsets.UTF_8));
              }
            } catch (IOException e) {
              logger.error("So sad",e);
            }
            return child.process(record);
          """
        }
      }
      {
        setValues {
          _attachment_body : "@{rootNodeStr}"
        }
      }
    ]
  }
]

我希望这会有帮助。

相关问题