我有一个为flume编写的拦截器代码如下:
public Event intercept(Event event) {
byte[] xmlstr = event.getBody();
InputStream instr = new ByteArrayInputStream(xmlstr);
//TransformerFactory factory = TransformerFactory.newInstance(TRANSFORMER_FACTORY_CLASS,TRANSFORMER_FACTORY_CLASS.getClass().getClassLoader());
TransformerFactory factory = TransformerFactory.newInstance();
Source xslt = new StreamSource(new File("removeNs.xslt"));
Transformer transformer = null;
try {
transformer = factory.newTransformer(xslt);
} catch (TransformerConfigurationException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Source text = new StreamSource(instr);
OutputStream ostr = new ByteArrayOutputStream();
try {
transformer.transform(text, new StreamResult(ostr));
} catch (TransformerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
event.setBody(ostr.toString().getBytes());
return event;
}
我正在用removens.xslt文件从源xml中删除命名空间。这样我就可以把这些数据存储到hdfs中,然后放到hive中。我的拦截器运行时抛出以下错误:
ERROR org.apache.flume.source.jms.JMSSource: Unexpected error processing events
java.lang.NullPointerException
at test.intercepter.App.intercept(App.java:59)
at test.intercepter.App.intercept(App.java:82)
at org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:146)
at org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:258)
at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:54)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)*
你能告诉我问题出在哪里吗?
1条答案
按热度按时间pgpifvop1#
我找到了解决办法。问题只不过是
new File("removeNs.xslt")
. 它无法找到位置,因为我不知道在哪里保存这个文件,但后来我得到了flume代理路径,但一旦我重新启动flume代理,它就会删除我保存在flume代理目录中的所有文件。所以我修改了代码,并将文件材料保存到java代码中。