在java服务器端,经过一些处理之后,我通过restfulwebservice将日志数据(json格式)从服务器发布到kafka。
在hdfs侧,我的Flume类型是avro。因此,为了将json(源)解析为avro(目标),我使用morphline和avro模式。
如果发布的数据不适合morphline或avro模式,通常我会得到下面的错误,
原因:com.fasterxml.jackson.core.jsonparseexception:非法的无引号字符((ctrl char,代码10)):必须使用反斜杠转义才能包含在字符串值中
如果我得到这一次,偏移量就不再移动了。简单地说,如果Kafka只得到一次这个错误,它就不能再接收发布的数据了。
为了避免这个错误,我假设有两种解决方案。第一种是在服务器端为大数据端使用的avro模式编写json验证器。我更喜欢的第二种方法是跳过并且不接收为请求的avro模式未格式化的日志数据。但是在跳过一个损坏的数据之后,如果Kafka得到了合适的数据,它就应该将其丢弃。
我认为如果我在flume或kafka配置文件中添加一些参数是可能的。那么,当发布的数据不适合请求的模式或请求的行时,如何跳过sink步骤呢?
1条答案
按热度按时间9bfwbjaz1#
我把问题解决了,
像这样在morphline中添加了try-catch代码块
在
tryRules
我强制代码捕获所有异常。在
rules:
你可以写"command:"
阻塞任何您想要的,如果其中一个抛出除最后一个命令块之外的异常,最后一个命令将运行。记住最后一个是“抓住”。我的意思是,如果第一个命令块失败,最后一个(第二个)命令将运行。若第一个命令运行得很好,最后一个命令就不能工作,因为最后一个命令块的工作方式和catch块类似。所以当代码
readJson {}
在第一个命令块中失败,它抛出一个异常,最后一个命令(catch块)处理它,这样它就不会尝试在kafka主题中接收当前数据,因为它将运行dropRecord {}
.有关详细文档,请访问kitesdk。