Kafka春靴

ehxuflar  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(374)

我想在我的springboot项目中使用kafka流实时处理。所以我需要Kafka流配置或我想使用kstreams或ktable,但我在互联网上找不到的例子。
我做生产者和消费者现在我想流实时。

xqkwcwgp

xqkwcwgp1#

您可以使用https://start.spring.io/ 相应地选择必要的版本/依赖项并生成/下载项目。
您可以开始实现kstream api方法(https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/kstream.html)

gxwragnw

gxwragnw2#

开始使用spring boot上的kafka streams的简单方法:
引导您的项目使用https://start.spring.io. 选择cloudstream和spring作为apachekafka流的依赖项。以下是指向预配置项目模板的链接:https://start.spring.io/#!language=java&dependencies=kafka流、云流
在应用程序中定义kstream bean。例如,这是一个非常基本的消费者应用程序。它只是消耗数据并将记录从kstream记录到标准输出。

  1. @SpringBootApplication
  2. public class Application {
  3. public static void main(String[] args) {
  4. SpringApplication.run(Main.class, args);
  5. }
  6. @Bean
  7. public java.util.function.Consumer<KStream<String, String>> process() {
  8. return stream -> stream.foreach((key, value) -> {
  9. System.out.println(key + ":" + value);
  10. });
  11. }
  12. }

在这个应用程序中,我们定义了一个输入绑定。spring将用一个名称创建这个绑定 process-in-0 ,即bean函数的名称后跟 -in- ,后跟参数的顺序位置。使用此绑定名称可以设置其他属性,例如主题名称。例如, spring.cloud.stream.bindings.process-in-0.destination=my-topic .
请参阅更多示例-spring cloud stream kafka活页夹参考,编程模型部分。
配置 application.yaml 具体如下:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. process-in-0.destination: my-topic
  6. kafka:
  7. streams:
  8. binder:
  9. applicationId: my-app
  10. brokers: localhost:9092
  11. configuration:
  12. default:
  13. key:
  14. serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  15. value:
  16. serde: org.apache.kafka.common.serialization.Serdes$StringSerde
展开查看全部
lnvxswe2

lnvxswe23#

在spring boot中初始化kafka streams应用程序的另一种方法可以在
https://gist.github.com/itzg/e3ebfd7aec220bf0522e23a65b1296c8
这种方法使用kafkastreamsbean调用kafkastreams.start(),它可以提供拓扑或streambuilder bean。

jvidinwx

jvidinwx4#

让我先说,如果你是Kafka流的新手,在上面添加spring boot会增加另一个层次的复杂性,Kafka流有一个很大的学习曲线。以下是让你开始的基本知识:pom:

  1. <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.1.10.RELEASE</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  8. <dependency>
  9. <groupId>org.apache.kafka</groupId>
  10. <artifactId>kafka-clients</artifactId>
  11. <version>${kafka.version}</version>
  12. </dependency>
  13. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  14. <dependency>
  15. <groupId>org.apache.kafka</groupId>
  16. <artifactId>kafka_2.12</artifactId>
  17. <version>${kafka.version}</version>
  18. </dependency>
  19. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
  20. <dependency>
  21. <groupId>org.apache.kafka</groupId>
  22. <artifactId>kafka-streams</artifactId>
  23. <version>${kafka.version}</version>
  24. </dependency>
  25. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
  26. <dependency>
  27. <groupId>org.apache.kafka</groupId>
  28. <artifactId>connect-api</artifactId>
  29. <version>${kafka.version}</version>
  30. </dependency>

现在是配置对象。下面的代码假设您正在创建两个流应用程序,请记住,每个应用程序代表其自己的处理拓扑:

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.common.serialization.Serdes;
  3. import org.apache.kafka.streams.StreamsConfig;
  4. import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
  9. import org.springframework.kafka.core.StreamsBuilderFactoryBean;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. @Configuration
  13. public class KafkaStreamConfig {
  14. @Value("${delivery-stats.stream.threads:1}")
  15. private int threads;
  16. @Value("${delivery-stats.kafka.replication-factor:1}")
  17. private int replicationFactor;
  18. @Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
  19. private String brokersUrl;
  20. @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  21. public StreamsConfig kStreamsConfigs() {
  22. Map<String, Object> config = new HashMap<>();
  23. config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
  24. setDefaults(config);
  25. return new StreamsConfig(config);
  26. }
  27. public void setDefaults(Map<String, Object> config) {
  28. config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
  29. config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  30. config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  31. config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  32. config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
  33. }
  34. @Bean("app1StreamBuilder")
  35. public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
  36. Map<String, Object> config = new HashMap<>();
  37. setDefaults(config);
  38. config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
  39. config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
  40. config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
  41. config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
  42. config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
  43. return new StreamsBuilderFactoryBean(config);
  44. }
  45. @Bean("app2StreamBuilder")
  46. public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
  47. Map<String, Object> config = new HashMap<>();
  48. setDefaults(config);
  49. config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
  50. config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
  51. config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
  52. config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
  53. config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
  54. return new StreamsBuilderFactoryBean(config);
  55. }
  56. }

现在是有趣的部分,使用streamsbuilder构建应用程序(本例中的app1)。

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.kafka.common.serialization.Serdes;
  3. import org.apache.kafka.streams.KeyValue;
  4. import org.apache.kafka.streams.StreamsBuilder;
  5. import org.apache.kafka.streams.kstream.Consumed;
  6. import org.apache.kafka.streams.kstream.KStream;
  7. import org.apache.kafka.streams.kstream.Produced;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. @Slf4j
  13. public class App1 {
  14. @SuppressWarnings("unchecked")
  15. @Bean("app1StreamTopology")
  16. public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {
  17. final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
  18. toSquare.map((key, value) -> { // do something with each msg, square the values in our case
  19. return KeyValue.pair(key, value * value);
  20. }).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic
  21. return toSquare;
  22. }
  23. }

希望这有帮助。

展开查看全部

相关问题