我想在我的springboot项目中使用kafka流实时处理。所以我需要Kafka流配置或我想使用kstreams或ktable,但我在互联网上找不到的例子。我做生产者和消费者现在我想流实时。
xqkwcwgp1#
您可以使用https://start.spring.io/ 相应地选择必要的版本/依赖项并生成/下载项目。您可以开始实现kstream api方法(https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/kstream.html)
gxwragnw2#
开始使用spring boot上的kafka streams的简单方法:引导您的项目使用https://start.spring.io. 选择cloudstream和spring作为apachekafka流的依赖项。以下是指向预配置项目模板的链接:https://start.spring.io/#!language=java&dependencies=kafka流、云流在应用程序中定义kstream bean。例如,这是一个非常基本的消费者应用程序。它只是消耗数据并将记录从kstream记录到标准输出。
@SpringBootApplicationpublic class Application { public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public java.util.function.Consumer<KStream<String, String>> process() { return stream -> stream.foreach((key, value) -> { System.out.println(key + ":" + value); }); }}
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
return stream -> stream.foreach((key, value) -> {
System.out.println(key + ":" + value);
});
在这个应用程序中,我们定义了一个输入绑定。spring将用一个名称创建这个绑定 process-in-0 ,即bean函数的名称后跟 -in- ,后跟参数的顺序位置。使用此绑定名称可以设置其他属性,例如主题名称。例如, spring.cloud.stream.bindings.process-in-0.destination=my-topic .请参阅更多示例-spring cloud stream kafka活页夹参考,编程模型部分。配置 application.yaml 具体如下:
process-in-0
-in-
spring.cloud.stream.bindings.process-in-0.destination=my-topic
application.yaml
spring: cloud: stream: bindings: process-in-0.destination: my-topic kafka: streams: binder: applicationId: my-app brokers: localhost:9092 configuration: default: key: serde: org.apache.kafka.common.serialization.Serdes$StringSerde value: serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring:
cloud:
stream:
bindings:
process-in-0.destination: my-topic
kafka:
streams:
binder:
applicationId: my-app
brokers: localhost:9092
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
lnvxswe23#
在spring boot中初始化kafka streams应用程序的另一种方法可以在https://gist.github.com/itzg/e3ebfd7aec220bf0522e23a65b1296c8这种方法使用kafkastreamsbean调用kafkastreams.start(),它可以提供拓扑或streambuilder bean。
jvidinwx4#
让我先说,如果你是Kafka流的新手,在上面添加spring boot会增加另一个层次的复杂性,Kafka流有一个很大的学习曲线。以下是让你开始的基本知识:pom:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.10.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${kafka.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>${kafka.version}</version></dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<artifactId>kafka_2.12</artifactId>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<artifactId>kafka-streams</artifactId>
<artifactId>connect-api</artifactId>
现在是配置对象。下面的代码假设您正在创建两个流应用程序,请记住,每个应用程序代表其自己的处理拓扑:
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;import org.springframework.kafka.core.StreamsBuilderFactoryBean;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaStreamConfig { @Value("${delivery-stats.stream.threads:1}") private int threads; @Value("${delivery-stats.kafka.replication-factor:1}") private int replicationFactor; @Value("${messaging.kafka-dp.brokers.url:localhost:9092}") private String brokersUrl; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public StreamsConfig kStreamsConfigs() { Map<String, Object> config = new HashMap<>(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default"); setDefaults(config); return new StreamsConfig(config); } public void setDefaults(Map<String, Object> config) { config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class); } @Bean("app1StreamBuilder") public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() { Map<String, Object> config = new HashMap<>(); setDefaults(config); config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000); config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads); config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor); return new StreamsBuilderFactoryBean(config); } @Bean("app2StreamBuilder") public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() { Map<String, Object> config = new HashMap<>(); setDefaults(config); config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000); config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads); config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor); return new StreamsBuilderFactoryBean(config); }}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaStreamConfig {
@Value("${delivery-stats.stream.threads:1}")
private int threads;
@Value("${delivery-stats.kafka.replication-factor:1}")
private int replicationFactor;
@Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
private String brokersUrl;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
setDefaults(config);
return new StreamsConfig(config);
public void setDefaults(Map<String, Object> config) {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
@Bean("app1StreamBuilder")
public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
return new StreamsBuilderFactoryBean(config);
@Bean("app2StreamBuilder")
public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
现在是有趣的部分,使用streamsbuilder构建应用程序(本例中的app1)。
import lombok.extern.slf4j.Slf4j;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.kstream.Consumed;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.Produced;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;@Component@Slf4jpublic class App1 { @SuppressWarnings("unchecked") @Bean("app1StreamTopology") public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) { final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long())); toSquare.map((key, value) -> { // do something with each msg, square the values in our case return KeyValue.pair(key, value * value); }).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic return toSquare; }}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class App1 {
@SuppressWarnings("unchecked")
@Bean("app1StreamTopology")
public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {
final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
toSquare.map((key, value) -> { // do something with each msg, square the values in our case
return KeyValue.pair(key, value * value);
}).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic
return toSquare;
希望这有帮助。
4条答案
按热度按时间xqkwcwgp1#
您可以使用https://start.spring.io/ 相应地选择必要的版本/依赖项并生成/下载项目。
您可以开始实现kstream api方法(https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/kstream.html)
gxwragnw2#
开始使用spring boot上的kafka streams的简单方法:
引导您的项目使用https://start.spring.io. 选择cloudstream和spring作为apachekafka流的依赖项。以下是指向预配置项目模板的链接:https://start.spring.io/#!language=java&dependencies=kafka流、云流
在应用程序中定义kstream bean。例如,这是一个非常基本的消费者应用程序。它只是消耗数据并将记录从kstream记录到标准输出。
在这个应用程序中,我们定义了一个输入绑定。spring将用一个名称创建这个绑定
process-in-0
,即bean函数的名称后跟-in-
,后跟参数的顺序位置。使用此绑定名称可以设置其他属性,例如主题名称。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic
.请参阅更多示例-spring cloud stream kafka活页夹参考,编程模型部分。
配置
application.yaml
具体如下:lnvxswe23#
在spring boot中初始化kafka streams应用程序的另一种方法可以在
https://gist.github.com/itzg/e3ebfd7aec220bf0522e23a65b1296c8
这种方法使用kafkastreamsbean调用kafkastreams.start(),它可以提供拓扑或streambuilder bean。
jvidinwx4#
让我先说,如果你是Kafka流的新手,在上面添加spring boot会增加另一个层次的复杂性,Kafka流有一个很大的学习曲线。以下是让你开始的基本知识:pom:
现在是配置对象。下面的代码假设您正在创建两个流应用程序,请记住,每个应用程序代表其自己的处理拓扑:
现在是有趣的部分,使用streamsbuilder构建应用程序(本例中的app1)。
希望这有帮助。