当我将源类型用作记录器时,flume正在截断字符——它只显示前20个字符,而忽略其余的字符

ubby3x7f  于 2021-06-03  发布在  Flume
关注(0)|答案(2)|浏览(279)

这是我的测试配置(使用 netcat+ 记录器(作为控制台)

\#START OF CONFIG FILE

\#Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

\# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 4444

\# Describe the sink
a1.sinks.k1.type = logger

\#Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

\# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

\#====END OF CONFIG FILE

现在我发出以下命令来使用我的特定配置:

$bin/flume-ng agent --conf conf --conf-file conf/netcat_dump.conf  --name a1 -Dflume.root.logger=DEBUG,console

使用 netcat 命令并输入以下文本:

$netcat localhost 4444

这是第一个通过netcat发送到flume的事件
现在,如果您查看flume控制台,就会看到截断的日志行。

2013-11-25 15:33:20,862  ---- Event: { headers:{} body: 54 68 69 73 20 69 73 20 46 69 72 73 
74 20 45 76**This is First Ev**}
2013-11-25 15:33:20,862  ---- Events processed = 1

注意:我尝试了大多数通道参数,但没有帮助。

ev7lccsx

ev7lccsx1#

您的输出按预期工作,因为默认的记录器接收器会将正文内容截断为16字节。我不相信您可以在不创建自己的自定义loggersink的情况下重写此行为,因为当前loggersink没有任何配置参数。我修改了下面现有的loggersink,并将其命名为advancedloggersink(有点用词不当,因为它并不是那么高级)。
高级记录器接收器添加一个名为 maxBytes 您可以使用它来设置日志消息的输出量。默认值仍然是16字节,但是现在可以用任何您想要的内容覆盖它。如果将其设置为0,则它将打印整个日志消息。
要使其正常工作,您需要下载flume二进制文件,然后创建一个jar文件,其中包含advancedloggersink类。编译和创建jar文件时,需要包含以下flume jar,这些jar位于flume二进制下载的lib目录中:
flume-ng-configuration-1.4.0.jar文件
Flume-ng-core-1.4.0.jar
flume-ng-sdk-1.4.0.jar
slf4j-api-1.6.1.jar
假设您创建了一个名为 advancedLoggerSink.jar 然后将其放入flume插件目录中一个名为lib的目录中。插件目录默认为 $FLUME_HOME/plugins.d 但你可以在任何地方创造它。目录结构应如下所示: plugins.d/advanced-logger-sink/lib/advancedLoggerSink.jar (请确保将jar放在名为'lib'的目录中。有关插件目录布局的更多信息,请参阅flume用户指南http://flume.apache.org/flumeuserguide.html)
要运行flume代理,请使用以下命令: flume-ng agent --plugins-path /path/to/your/plugins.d --conf /conf/directory --conf-file /conf/logger.flume --name a1 -Dflume.root.logger=INFO,console 注意我是如何指定plugins路径的(plugins.d目录所在的路径)。flume将自动加载plugins.d目录中的advancedloggersink。
以下是advancedloggersink类:

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdvancedLoggerSink extends AbstractSink implements Configurable {

    private static final int defaultMaxBytes = 16;

    private int maxBytesProp;

    private static final Logger logger = LoggerFactory
            .getLogger(AdvancedLoggerSink.class);

    @Override
    public void configure(Context context) {
        // maxBytes of 0 means to log the entire event
        int maxBytesProp = context.getInteger("maxBytes", defaultMaxBytes);
        if (maxBytesProp < 0) {
            maxBytesProp = defaultMaxBytes;
        }

        this.maxBytesProp = maxBytesProp;
    }

    @Override
    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;

        try {
            transaction.begin();
            event = channel.take();

            if (event != null) {
                if (logger.isInfoEnabled()) {
                    logger.info("Event: " + EventHelper.dumpEvent(
                                    event,
                                    this.maxBytesProp == 0 ? event.getBody().length : this.maxBytesProp
                                ));
                }
            } else {
                // No event found, request back-off semantics from the sink
                // runner
                result = Status.BACKOFF;
            }
            transaction.commit();
        } catch (Exception ex) {
            transaction.rollback();
            throw new EventDeliveryException("Failed to log event: " + event,
                    ex);
        } finally {
            transaction.close();
        }

        return result;
    }
}

然后,您的配置文件应该如下所示:


# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = AdvancedLoggerSink

# maxBytes is the maximum number of bytes to output for the body of the event

# the default is 16 bytes. If you set maxBytes to 0 then the entire record will

# be output.

a1.sinks.k1.maxBytes = 0

# Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
axkjgtzd

axkjgtzd2#

sarus编写的上述答案除了将a1.sinks.k1.type更改为包含包名的整个类路径名外,其他都有效。另外,对于flume 1.6.0,将编译好的jar复制到安装的flume路径下的lib文件夹中。也可以使用system.out.pritnln而不是使用log。像下面这样

if(event!=null){
          System.out.println(EventHelper.dumpEvent(event,event.getBody().length));
          status = Status.READY; 
      }else{
          System.out.println("Event is null");
          status = Status.BACKOFF; 
      }

相关问题