kafka使用者抛出java.lang.outofmemoryerror:直接缓冲区内存

krcsximq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(598)

我使用的是单节点kafka代理(0.10.2)和单节点zookeeper代理(3.4.9)。我有一个消费者服务器(单核和1.5 gb内存)。每当我运行一个有5个或更多线程的进程时,在抛出这些异常之后,我的使用者的线程就会被杀死
例外情况1
java.lang.outofmemoryerror:java.nio.heapbytebuffer处的java堆空间。java:57)在java.nio.bytebuffer.allocate(bytebuffer。java:335)在org.apache.kafka.common.network.networkreceive.readfromreadablechannel(networkreceive。java:93)在org.apache.kafka.common.network.networkreceive.readfrom(networkreceive。java:71)在org.apache.kafka.common.network.kafkachannel.receive(kafkachannel。java:169)在org.apache.kafka.common.network.kafkachannel.read(kafkachannel。java:150)在org.apache.kafka.common.network.selector.pollselectionkeys(selector。java:355)在org.apache.kafka.common.network.selector.poll(selector。java:303)在org.apache.kafka.clients.networkclient.poll(networkclient。java:349)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:226)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.pollnowakup(consumernetworkclient。java:263)在org.apache.kafka.clients.consumer.internals.abstractcoordinator$heartbeatthread.run(abstractcoordinator。java:887)
例外情况2
kafka协调器心跳线程中未捕获的异常| topic1:java.lang.outofmemoryerror:java.nio.bits.reservememory(bits)处的直接缓冲区内存。java:693)在java.nio.directbytebuffer。java:123)位于java.nio.bytebuffer.allocatedirect(bytebuffer。java:311)在sun.nio.ch.util.gettemporarydirectbuffer(util。java:241)在sun.nio.ch.ioutil.read(ioutil。java:195)在sun.nio.ch.socketchannelimpl.read(socketchannelimpl。java:380)在org.apache.kafka.common.network.plaintexttransportlayer.read(plaintexttransportlayer。java:110)在org.apache.kafka.common.network.networkreceive.readfromreadablechannel(networkreceive。java:97)在org.apache.kafka.common.network.networkreceive.readfrom(networkreceive。java:71)在org.apache.kafka.common.network.kafkachannel.receive(kafkachannel。java:169)在org.apache.kafka.common.network.kafkachannel.read(kafkachannel。java:150)在org.apache.kafka.common.network.selector.pollselectionkeys(selector。java:355)在org.apache.kafka.common.network.selector.poll(选择器。java:303)在org.apache.kafka.clients.networkclient.poll(networkclient。java:349)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:226)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.pollnowakup(consumernetworkclient。java:263)在org.apache.kafka.clients.consumer.internals.abstractcoordinator$heartbeatthread.run(abstractcoordinator。java:887)
我在google上搜索并使用了下面提到的jvm参数,但仍然出现了相同的异常
-xx:maxdirectmemorysize=768m
-xms512m型
如何解决这个问题?是否需要任何其他javm参数调整?
我的Kafka消费代码是

  1. import com.mongodb.DBObject
  2. import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
  3. import org.apache.kafka.clients.consumer.ConsumerRecord
  4. import org.apache.kafka.clients.consumer.ConsumerRecords
  5. import org.apache.kafka.clients.consumer.KafkaConsumer
  6. import org.apache.kafka.clients.consumer.OffsetAndMetadata
  7. import org.apache.kafka.clients.consumer.OffsetCommitCallback
  8. import org.apache.kafka.common.TopicPartition
  9. import org.apache.kafka.common.errors.InterruptException
  10. import org.apache.kafka.common.errors.WakeupException
  11. import org.slf4j.Logger
  12. import org.slf4j.LoggerFactory
  13. import java.util.regex.Pattern
  14. class KafkaPollingConsumer implements Runnable {
  15. private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
  16. private static final String TAG = "[KafkaPollingConsumer]"
  17. private final KafkaConsumer<String, byte []> kafkaConsumer
  18. private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
  19. List topicNameList
  20. Map kafkaTopicConfigMap = new HashMap<String,Object>()
  21. Map kafkaTopicMessageListMap = new HashMap<String,List>()
  22. Boolean isRebalancingTriggered = false
  23. private final Long REBALANCING_SLEEP_TIME = 1000
  24. public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){
  25. logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
  26. logger.debug("Populating Property for kafak consumer")
  27. logger.debug("BatchSize {}",batchSize)
  28. Properties kafkaConsumerProperties = new Properties()
  29. kafkaConsumerProperties.put("group.id", groupName)
  30. kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  31. kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer")
  32. switch(serverType){
  33. case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
  34. kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
  35. kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
  36. kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
  37. break
  38. case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
  39. kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
  40. kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
  41. kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
  42. kafkaConsumerProperties.put("max.poll.records",1)
  43. kafkaConsumerProperties.put("max.poll.interval.ms",600000)
  44. kafkaConsumerProperties.put("request.timeout.ms",600005)
  45. break
  46. default :
  47. throw "Invalid server type"
  48. break
  49. }
  50. logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
  51. kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
  52. topicNameList = topicNameRegex.split(Pattern.quote('|'))
  53. logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
  54. logger.debug("{} [Constructor] Exit",TAG)
  55. }
  56. private class HandleRebalance implements ConsumerRebalanceListener {
  57. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  58. logger.error('{} In onPartitionAssigned setting isRebalancingTriggered to false',TAG)
  59. isRebalancingTriggered = false
  60. }
  61. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  62. logger.error("{} In onPartitionsRevoked setting osRebalancingTriggered to true",TAG)
  63. isRebalancingTriggered = true
  64. publishAllKafkaTopicBatchMessages()
  65. commitOffset()
  66. }
  67. }
  68. private class AsyncCommitCallBack implements OffsetCommitCallback{
  69. @Override
  70. void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
  71. }
  72. }
  73. @Override
  74. void run() {
  75. logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
  76. populateKafkaConfigMap()
  77. initializeKafkaTopicMessageListMap()
  78. String topicName
  79. String consumerClassName
  80. String consumerMethodName
  81. Boolean isBatchJob
  82. Integer batchSize = 0
  83. final Thread mainThread = Thread.currentThread()
  84. Runtime.getRuntime().addShutdownHook(new Thread() {
  85. public void run() {
  86. logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
  87. kafkaConsumer.wakeup()
  88. try {
  89. mainThread.join()
  90. } catch (InterruptedException exception) {
  91. logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
  92. }
  93. }
  94. })
  95. kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
  96. try{
  97. while(true){
  98. logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
  99. ConsumerRecords kafkaRecords
  100. if(isRebalancingTriggered == false) {
  101. kafkaRecords = kafkaConsumer.poll(100)
  102. }
  103. else{
  104. logger.error("{} in rebalancing going to sleep",TAG)
  105. Thread.sleep(REBALANCING_SLEEP_TIME)
  106. continue
  107. }
  108. for(ConsumerRecord record: kafkaRecords){
  109. if(isRebalancingTriggered == true){
  110. break
  111. }
  112. currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
  113. topicName = record.topic()
  114. DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
  115. consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
  116. consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
  117. isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
  118. logger.debug("Details about Message")
  119. logger.debug("Thread {}",mainThread.getName())
  120. logger.debug("Topic {}",topicName)
  121. logger.debug("Partition {}",record.partition().toString())
  122. logger.debug("Offset {}",record.offset().toString())
  123. logger.debug("clasName {}",consumerClassName)
  124. logger.debug("methodName {}",consumerMethodName)
  125. logger.debug("isBatchJob {}",isBatchJob.toString())
  126. Object message = record.value()
  127. logger.debug("message {}",message.toString())
  128. if(isBatchJob == true){
  129. prepareMessagesBatch(topicName,message)
  130. //batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
  131. //logger.debug("batchSize {}",batchSize.toString())
  132. }
  133. else{
  134. publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message)
  135. }
  136. //publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
  137. //try {
  138. // kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack())
  139. logger.debug("{} Commiting Messages to Kafka",TAG)
  140. //}
  141. /*catch(Exception exception){
  142. kafkaConsumer.commitSync(currentOffsetsMap)
  143. currentOffsetsMap.clear()
  144. logger.error("{} Error while commiting async so commiting in sync {}",TAG,exception.getStackTrace().join("\n"))
  145. }*/
  146. }
  147. commitOffset()
  148. publishAllKafkaTopicBatchMessages()
  149. }
  150. }
  151. catch(InterruptException exception){
  152. logger.error("{} In InterruptException",TAG)
  153. logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
  154. logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
  155. }
  156. catch (WakeupException exception) {
  157. logger.error("{} In WakeUp Exception",TAG)
  158. logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
  159. logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
  160. }
  161. catch(Exception exception){
  162. exception.getMessage()
  163. logger.error("{} In Exception",TAG)
  164. logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
  165. logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
  166. }
  167. finally {
  168. logger.error("{} In finally commiting remaining offset ",TAG)
  169. publishAllKafkaTopicBatchMessages()
  170. //kafkaConsumer.commitSync(currentOffsetsMap)
  171. kafkaConsumer.close()
  172. logger.error("{} Exiting Consumer",TAG)
  173. }
  174. }
  175. private void commitOffset(){
  176. logger.debug("{} [commitOffset] Enter")
  177. logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString())
  178. if(currentOffsetsMap.size() > 0) {
  179. kafkaConsumer.commitSync(currentOffsetsMap)
  180. currentOffsetsMap.clear()
  181. }
  182. logger.debug("{} [commitOffset] Exit")
  183. }
  184. private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
  185. logger.debug("{} [publishMessageToConsumer] Enter",TAG)
  186. if(isBatchJob == true){
  187. publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
  188. }
  189. else{
  190. publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
  191. }
  192. logger.debug("{} [publishMessageToConsumer] Exit",TAG)
  193. }
  194. private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
  195. logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
  196. executeConsumerMethod(consumerClassName,consumerMethodName,message)
  197. logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
  198. }
  199. private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
  200. logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
  201. List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
  202. consumerMessageList.add(message)
  203. if(consumerMessageList.size() == batchSize){
  204. logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
  205. executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
  206. consumerMessageList.clear()
  207. }
  208. kafkaTopicMessageListMap.put(topicName,consumerMessageList)
  209. logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
  210. }
  211. private void populateKafkaConfigMap(){
  212. logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
  213. KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
  214. topicNameList.each { topicName ->
  215. DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
  216. kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
  217. }
  218. logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
  219. logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
  220. }
  221. private void initializeKafkaTopicMessageListMap(){
  222. logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
  223. topicNameList.each { topicName ->
  224. kafkaTopicMessageListMap.put(topicName,[])
  225. }
  226. logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
  227. logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
  228. }
  229. private void executeConsumerMethod(String className, String methodName, def messages){
  230. try{
  231. logger.debug("{} [executeConsumerMethod] Enter",TAG)
  232. logger.debug("{} [executeConsumerMethod] className {} methodName {} messages {}",TAG,className,methodName,messages.toString())
  233. Class.forName(className)."$methodName"(messages)
  234. } catch (Exception exception){
  235. logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
  236. className, messages.toString(), exception.getStackTrace().join("\n"))
  237. }
  238. logger.debug("{} [executeConsumerMethod] Exit",TAG)
  239. }
  240. private void publishAllKafkaTopicBatchMessages(){
  241. logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
  242. String consumerClassName = null
  243. String consumerMethodName = null
  244. kafkaTopicMessageListMap.each { topicName, messageList ->
  245. if (messageList != null && messageList.size() > 0) {
  246. DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
  247. consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
  248. consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
  249. logger.debug("{} Pushing message in topic {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName)
  250. if (messageList != null && messageList.size() > 0) {
  251. executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
  252. messageList.clear()
  253. kafkaTopicMessageListMap.put(topicName, messageList)
  254. }
  255. }
  256. }
  257. logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
  258. }
  259. private void prepareMessagesBatch(String topicName,Object message){
  260. logger.debug("{} [prepareMessagesBatch] Enter",TAG)
  261. logger.debug("{} [prepareMessagesBatch] preparing batch for topic {}",TAG,topicName)
  262. logger.debug("{} [prepareMessagesBatch] preparting batch for message {}",TAG,message.toString())
  263. List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
  264. consumerMessageList.add(message)
  265. kafkaTopicMessageListMap.put(topicName,consumerMessageList)
  266. }

}

ddhy6vgd

ddhy6vgd1#

kafka consumers通过以下两个参数处理数据积压,
最大轮询间隔毫秒
使用使用者群组管理时,调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了一个上限。如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员。
默认值为300000。
最大轮询记录数
对poll()的单个调用中返回的最大记录数。
默认值为500。
忽略根据需求设置上述两个参数可能会导致轮询最大数据,而使用者可能无法处理可用资源,从而导致内存不足或有时无法提交使用者偏移量。因此,建议始终使用max.poll.records和max.poll.interval.ms参数。
在代码中,kafkatopicconfigentity.kafka\u node\u type\u enum.priority.tostring()缺少这两个参数,这可能是导致轮询期间outofmemory问题的原因。

相关问题