Kafka生产者用SpringKafka模板工厂实现

ghhaqwfi  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(557)

我有一个简单的restapi(方法1),它使用kafka客户机api生成发送到kafka集群的消息。
spring boot rest->producer.send(kafka clients lib)->kafka集群
此外,我还有另一个实现(方法2)
spring boot rest->producer factory implementation(单个配置spring对象)->kafka template send(spring kafka)->kafka集群
我观察到方法2比方法1花费更多的时间。例如,一条消息的方法1需要40毫秒,方法2需要近100毫秒。
我希望使用基于producer-factory的实现来最小化推送消息所花费的时间。有没有想过怎么调整?
实施细则如下:(生产厂家)

  1. @Configuration
  2. public class KafkaConfig {
  3. @Value("${bootstrap.servers}")
  4. String bootStrapServers;
  5. @Bean
  6. public Map<String,Object> configs(){
  7. Map<String, Object> properties = new HashMap<String, Object>();
  8. properties.put("bootstrap.servers", bootStrapServers);
  9. properties.put("acks", "0");
  10. properties.put("retries", 0);
  11. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  12. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  13. return properties;
  14. }
  15. @Bean
  16. public ProducerFactory<String,String> factory(){
  17. return new DefaultKafkaProducerFactory<>(configs());
  18. }
  19. @Bean
  20. public KafkaTemplate<String,String> template(){
  21. return new KafkaTemplate<>(factory());
  22. }
  23. }
  1. Controller :
  2. @Autowired
  3. private KafkaTemplate<String,String> template;
  4. public ResponseEntity<String> producer(@PathVariable String topicName, @RequestBody String requestBody) throws JsonProcessingException {
  5. try {
  6. template.send(topicName,requestBody);
  7. } catch (Exception ex) {
  8. logger.error(ex);
  9. } finally {
  10. }
  11. return ResponseEntity.ok().build();
  12. }
tquggr8v

tquggr8v1#

我确实看到了比我预期的更多的开销(与您的结果类似)。我会做一些分析,看看是否可以改进。
框架总是会增加一些开销,但底线是,与所有spring项目一样,如果需要的话,您仍然可以下拉到较低级别的api。

  1. @SpringBootApplication
  2. public class So65791199Application {
  3. public static void main(String[] args) {
  4. SpringApplication.run(So65791199Application.class, args);
  5. }
  6. @Bean
  7. public ApplicationRunner runner(KafkaTemplate<String, String> template,
  8. ProducerFactory<String, String> pf) {
  9. return args -> {
  10. StopWatch watch = new StopWatch();
  11. ListenableFuture<SendResult<String, String>> future = template.send("so65791199", "foo");
  12. future.get(10, TimeUnit.SECONDS);
  13. List<ListenableFuture<SendResult<String, String>>> futures = new LinkedList<>();
  14. watch.start("template");
  15. IntStream.range(0, 10000).forEach(i -> {
  16. futures.add(template.send("so65791199", "foo"));
  17. });
  18. for (ListenableFuture<SendResult<String, String>> fut : futures) {
  19. fut.get(10, TimeUnit.SECONDS);
  20. }
  21. watch.stop();
  22. Producer<String, String> producer = new KafkaProducer<>(pf.getConfigurationProperties());
  23. ProducerRecord<String, String> pr = new ProducerRecord<>("so65791199", 0, null, "foo");
  24. Future<RecordMetadata> fut = producer.send(pr);
  25. fut.get(10, TimeUnit.SECONDS);
  26. watch.start("raw producer");
  27. List<Future<RecordMetadata>> futs = new LinkedList<>();
  28. IntStream.range(0, 10000).forEach(i -> {
  29. futs.add(producer.send(new ProducerRecord<>("so65791199", 0, null, "foo")));
  30. });
  31. for (Future<RecordMetadata> futr : futs) {
  32. futr.get(10, TimeUnit.SECONDS);
  33. }
  34. watch.stop();
  35. producer.close();
  36. System.out.println(watch.prettyPrint());
  37. };
  38. }
  39. @Bean
  40. public NewTopic topic() {
  41. return TopicBuilder.name("so65791199").partitions(1).replicas(1).build();
  42. }
  43. }
  1. StopWatch '': running time = 126595537 ns
  2. ---------------------------------------------
  3. ns % Task name
  4. ---------------------------------------------
  5. 088742103 070% template
  6. 037853434 030% raw producer
展开查看全部

相关问题