如何使用kafka 0.8log4j appender

atmip9wb  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(454)

我试图运行Kafka-0.8 log4j appender,我无法使它。我想我的应用程序直接发送日志Kafka通过log4j appender。
这是我的log4j.properties。我找不到任何合适的编码器,所以我只是将其配置为使用默认编码器(e、 我评论了这句话。)

  1. log4j.rootLogger=INFO, stdout, KAFKA
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
  5. log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
  6. log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
  7. log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n
  8. log4j.appender.KAFKA.BrokerList=hnode01:9092
  9. log4j.appender.KAFKA.Topic=DKTestEvent
  10. # log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder

这是我的示例应用程序。

  1. import org.apache.log4j.Logger;
  2. import org.apache.log4j.BasicConfigurator;
  3. import org.apache.log4j.PropertyConfigurator;
  4. public class HelloWorld {
  5. static Logger logger = Logger.getLogger(HelloWorld.class.getName());
  6. public static void main(String[] args) {
  7. PropertyConfigurator.configure(args[0]);
  8. logger.info("Entering application.");
  9. logger.debug("Debugging!.");
  10. logger.info("Exiting application.");
  11. }
  12. }

我用maven来编译。我在pom.xml中包含了kafkaè2.8.2-0.8.0和log4jè1.2.17
我得到了这些错误:

  1. INFO [main] (Logging.scala:67) - Verifying properties
  2. INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092
  3. INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder
  4. INFO [main] (HelloWorld.java:14) - Entering application.
  5. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent)
  6. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent)
  7. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent)
  8. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent)
  9. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent)
  10. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent)
  11. .
  12. .
  13. .
  14. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent)
  15. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent)
  16. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent)
  17. INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent)
  18. INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent)
  19. INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent)
  20. INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent)
  21. INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent)
  22. .
  23. .
  24. .
  25. INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent)
  26. ERROR [main] (Logging.scala:67) -
  27. ERROR [main] (Logging.scala:67) -
  28. ERROR [main] (Logging.scala:67) -
  29. ERROR [main] (Logging.scala:67) -
  30. ERROR [main] (Logging.scala:67) -
  31. ERROR [main] (Logging.scala:67) -
  32. java.lang.StackOverflowError
  33. at java.lang.ClassLoader.defineClass1(Native Method)
  34. at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
  35. at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  36. at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
  37. at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
  38. at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
  39. at java.security.AccessController.doPrivileged(Native Method)
  40. at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
  41. at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
  42. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
  43. at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
  44. at java.lang.ClassLoader.defineClass1(Native Method)
  45. at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
  46. at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  47. at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
  48. at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
  49. at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
  50. at java.security.AccessController.doPrivileged(Native Method)
  51. at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
  52. at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
  53. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
  54. at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
  55. at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
  56. at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
  57. at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
  58. at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
  59. at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
  60. at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
  61. at org.apache.log4j.Category.callAppenders(Category.java:206)
  62. at org.apache.log4j.Category.forcedLog(Category.java:391)
  63. at org.apache.log4j.Category.error(Category.java:322)
  64. at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
  65. at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
  66. at kafka.utils.Utils$.swallow(Utils.scala:189)
  67. at kafka.utils.Logging$class.swallowError(Logging.scala:105)
  68. at kafka.utils.Utils$.swallowError(Utils.scala:46)
  69. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
  70. at kafka.producer.Producer.send(Producer.scala:76)
  71. at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
  72. at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
  73. at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
  74. at org.apache.log4j.Category.callAppenders(Category.java:206)
  75. at org.apache.log4j.Category.forcedLog(Category.java:391)
  76. at org.apache.log4j.Category.info(Category.java:666)
  77. at kafka.utils.Logging$class.info(Logging.scala:67)
  78. at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
  79. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
  80. at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
  81. at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
  82. at kafka.utils.Utils$.swallow(Utils.scala:187)
  83. at kafka.utils.Logging$class.swallowError(Logging.scala:105)
  84. at kafka.utils.Utils$.swallowError(Utils.scala:46)
  85. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
  86. at kafka.producer.Producer.send(Producer.scala:76)
  87. at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
  88. at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
  89. at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
  90. .
  91. .
  92. .

如果我不终止程序,我就会不断地出现上述错误。
如果我错过了什么,请告诉我。

plupiseo

plupiseo1#

我认为jonas已经发现了这个问题,即kafka生产者日志也被记录到kafka appender中,从而导致无限循环和最终的堆栈溢出(没有双关语),您可以将所有kafka日志配置为转到不同的appender。下面显示如何将输出发送到stdout:

  1. log4j.logger.kafka=INFO, stdout

因此,您应该在log4j属性中包含以下内容

  1. log4j.rootLogger=INFO, stdout, KAFKA
  2. log4j.logger.kafka=INFO, stdout
  3. log4j.logger.HelloWorld=INFO, KAFKA
ddarikpa

ddarikpa2#

尝试将appender设置为异步,如下所示:log4j.appender.kafka.producertype=async
似乎有道理,它进入了一个无限循环,因为Kafka生产者有自己的日志。。

gc0ot86w

gc0ot86w3#

我已经能够通过kafka0.8.2.2中的log4j生成事件。这是我的log4j配置:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
  3. <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
  4. <appender name="console" class="org.apache.log4j.ConsoleAppender">
  5. <param name="Target" value="System.out" />
  6. <layout class="org.apache.log4j.PatternLayout">
  7. <param name="ConversionPattern" value="%-5p %c{1} - %m%n" />
  8. </layout>
  9. </appender>
  10. <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender">
  11. <param name="Threshold" value="INFO" />
  12. <param name="MaxBackupIndex" value="100" />
  13. <param name="File" value="/tmp/agna-LogFile.log" />
  14. <layout class="org.apache.log4j.PatternLayout">
  15. <param name="ConversionPattern" value="%d %-5p [%c{1}] %m %n" />
  16. </layout>
  17. </appender>
  18. <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender">
  19. <param name="Topic" value="kafkatopic" />
  20. <param name="BrokerList" value="localhost:9092" />
  21. <param name="syncSend" value="true" />
  22. <layout class="org.apache.log4j.PatternLayout">
  23. <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" />
  24. </layout>
  25. </appender>
  26. <logger name="org.apache.kafka">
  27. <level value="error" />
  28. <appender-ref ref="console" />
  29. </logger>
  30. <logger name="com.example.kafkaLogger">
  31. <level value="debug" />
  32. <appender-ref ref="kafkaAppender" />
  33. </logger>
  34. <root>
  35. <priority value="debug" />
  36. <appender-ref ref="console" />
  37. </root>
  38. </log4j:configuration>

以下是源代码:

  1. package com.example;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.json.simple.JSONArray;
  5. import org.json.simple.JSONObject;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutionException;
  8. import org.apache.kafka.clients.producer.ProducerConfig;
  9. import org.apache.kafka.clients.producer.KafkaProducer;
  10. import org.apache.kafka.clients.producer.ProducerRecord;
  11. import org.apache.kafka.common.serialization.StringSerializer;
  12. public class JsonProducer {
  13. static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class);
  14. static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger");
  15. public static void main(String args[]) {
  16. JsonProducer obj = new JsonProducer();
  17. String str = obj.getJsonObjAsString();
  18. // Use the logger
  19. kafkaLogger.info(str);
  20. try {
  21. // Construct and send message
  22. obj.constructAndSendMessage();
  23. } catch (InterruptedException e) {
  24. defaultLogger.error("Caught interrupted exception " + e);
  25. } catch (ExecutionException e) {
  26. defaultLogger.error("Caught execution exception " + e);
  27. }
  28. }
  29. private String getJsonObjAsString() {
  30. JSONObject obj = new JSONObject();
  31. obj.put("name", "John");
  32. obj.put("age", new Integer(55));
  33. obj.put("address", "123 MainSt, Palatine, IL");
  34. JSONArray list = new JSONArray();
  35. list.add("msg 1");
  36. list.add("msg 2");
  37. list.add("msg 3");
  38. obj.put("messages", list);
  39. return obj.toJSONString();
  40. }
  41. private void constructAndSendMessage() throws InterruptedException, ExecutionException {
  42. Properties props = new Properties();
  43. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  44. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  45. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  46. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  47. boolean sync = false;
  48. String topic = "kafkatopic";
  49. String key = "mykey";
  50. String value = "myvalue1 mayvalue2 myvalue3";
  51. ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value);
  52. if (sync) {
  53. producer.send(producerRecord).get();
  54. } else {
  55. producer.send(producerRecord);
  56. }
  57. producer.close();
  58. }
  59. }

整个项目可通过以下链接获得:
https://github.com/ypant/kafka-json-producer.git

展开查看全部

相关问题