无法使用apache flume将数据插入hbase

kx7yvsdv  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(443)

我正在使用flume将日志文件数据插入hbase表中,但没有向表中插入任何内容。Flume药剂如下:

agent1.sources = tail
 agent1.channels = memoryChannel
 agent1.sinks = loggerSink sink1
 agent1.sources.tail.type = exec
agent1.sources.tail.command = tail -f /usr/local/jarsfortest/LogsForTest/generatingLogs-app.logs
agent1.sources.tail.channels = memoryChannel

agent1.sinks.loggerSink.channel = memoryChannel
agent1.sinks.loggerSink.type = logger

agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sink1.channel = memoryChannel
agent1.sinks.sink1.table = testFlume
agent1.sinks.sink1.columnFamily = log
agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

# agent1.sinks.sink1.serializer.regex = [a-zA-Z0-9]*[^C][a-zA-Z0-9]*[^C][a-zA-Z0-9]*

agent1.sinks.sink1.serializer.regex =[a-zA-Z0-9]*^C[a-zA-Z0-9]*^C[a-zA-Z0-9]*
agent1.sinks.sink1.serializer.colNames = id, no_fill_reason, bid

agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 1000

以上代理已成功启动..但日志文件数据未插入到hbase中。日志文件数据如下:id0^cok^c10,即数据以控制字符分隔。请帮帮我。提前谢谢。

ioekq8ef

ioekq8ef1#

我可以帮助分析根本原因。
使用ctrl+c终止 this flume-ng process .
仔细观察输出。将有三种类型的指标:
类型的关闭度量:源
类型:通道的关闭度量
类型的关闭度量:sink
例如:

Shutdown Metric for type: SOURCE, name: r1. source.start.time == 1483838106878
Shutdown Metric for type: SOURCE, name: r1. source.stop.time == 1483838118766
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.events.accepted == 141
Shutdown Metric for type: SOURCE, name: r1. src.events.received == 147
Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0

Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1483838106874
Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1483838118789
Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 1000000
Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 147
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 31
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 26

Shutdown Metric for type: SINK, name: k1. sink.start.time == 1483838108891
Shutdown Metric for type: SINK, name: k1. sink.stop.time == 1483838118758
Shutdown Metric for type: SINK, name: k1. sink.batch.complete == 0
Shutdown Metric for type: SINK, name: k1. sink.batch.empty == 4
Shutdown Metric for type: SINK, name: k1. sink.batch.underflow == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.closed.count == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.creation.count == 0
Shutdown Metric for type: SINK, name: k1. sink.connection.failed.count == 0
Shutdown Metric for type: SINK, name: k1. sink.event.drain.attempt == 26
Shutdown Metric for type: SINK, name: k1. sink.event.drain.sucess == 0

在上面的例子中,问题是在下沉阶段,因为 drain.success 等于0。

scyqe7ek

scyqe7ek2#

原因可能是regexpression不匹配,请按照以下步骤进行调试
1) 使用此选项仅启动一个代理

flume-ng agent -n $1 -c ../../config/conf/ -f ../../config/conf/$1.conf -Xmx3g**-Xdebug -Xrunjdwp:transport=dt_socket, address=1044,server=y,suspend=y**
  --classpath ../lib/*:../../config/conf/zoo.cfg:../.

一旦你开始写剧本
听1044。。。
消息会来的。
2) 创建eclipse远程应用程序以连接到您的服务器名称和端口(如1044)
3) getActions 方法负责将行放入hbase。。
把一个断点放进去 getActions 方法 EventSerializer ```
public List getActions() throws FlumeException {
List actions = Lists.newArrayList();

    Matcher m = this.inputPattern.matcher(new String(this.payload, this.charset));
    if (!(m.matches())) {
        return Lists.newArrayList();
    }

    if (m.groupCount() != this.colNames.size())
        return Lists.newArrayList();
    try {
        byte[] rowKey;
        byte[] rowKey;
        if (this.rowKeyIndex < 0)
            rowKey = getRowKey();
        else {
            rowKey = m.group(this.rowKeyIndex + 1).getBytes(Charsets.UTF_8);
        }
        Put put = new Put(rowKey);

        for (int i = 0; i < this.colNames.size(); ++i) {
            if (i != this.rowKeyIndex) {
                put.add(this.cf, (byte[]) this.colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
            }
        }
        if (this.depositHeaders) {
            for (Map.Entry entry : this.headers.entrySet()) {
                put.add(this.cf, ((String) entry.getKey()).getBytes(this.charset),
                                ((String) entry.getValue()).getBytes(this.charset));
            }
        }
        actions.add(put);
    } catch (Exception e) {
        throw new FlumeException("Could not get row key!", e);
    }
    return actions;
}

相关问题