spring云流kafka-serde类:org.apache.kafka.common.serialization.serde$stringserde

wztqucjr  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(471)

我正在尝试使用spring云流框架构建一个简单的kafka流应用程序。我可以连接到流以推送原始数据进行处理。但是,当我尝试处理流以便按键进行事件计数时,我得到 Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde 运行应用程序时出现异常。我检查了我的项目包括图书馆,我可以找到 Serde 同学们,它没有丢失。我不知道为什么在运行时它没有被加载!
下面是我的源文件。 com.pgp.learn.kafka.analytics.AnalyticsApplication ```
package com.pgp.learn.kafka.analytics;

import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
@EnableBinding(AnalyticsBinding.class)
public class AnalyticsApplication {

public static void main(String[] args) {
    SpringApplication.run(AnalyticsApplication.class, args);
}

@Component
public static class PageViewEventSource implements ApplicationRunner {
    private final MessageChannel pageViewOut;
    private final Log log = LogFactory.getLog(getClass());

    public PageViewEventSource(AnalyticsBinding binding) {
        this.pageViewOut = binding.pageViewsOut();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        List<String> names = Arrays.asList("Peter", "Tom", "Ady", "Nency", "George", "Kevin", "Chelsey", "Thomas");
        List<String> pages = Arrays.asList("About", "Contact", "Blogs", "Gallery", "Music", "Site-map", "News");

        Runnable runner = () -> {
            String rName = names.get(new Random().nextInt(names.size()));
            String rPage = pages.get(new Random().nextInt(pages.size()));

            PageViewEvent event = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10L : 1000L);

            Message<PageViewEvent> message = MessageBuilder
                    .withPayload(event)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, event.getUserId().getBytes())
                    .build();

            try {
                this.pageViewOut.send(message);
                log.info("Sent: " + message.toString());
            } catch (Exception e) {
                log.error(e);
            }
        };

        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runner, 1, 1, TimeUnit.SECONDS);
    }
}

@Component
public static class PageViewEventProcessor {
    private final Log log = LogFactory.getLog(getClass());

    @StreamListener
    @SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
    public static KStream<String, Long> process(
            @Input(AnalyticsBinding.PAGE_VIEW_IN_CHANNEL) KStream<String, PageViewEvent> events) {
        return events
                .filter((s, pageViewEvent) -> pageViewEvent.getDuration() > 10)
                .map((s, pageViewEvent) -> new KeyValue<>(pageViewEvent.getPage(), 0L))
                .groupByKey()
                //Also tried using below line, but no luck

//.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))

.count(Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))
.toStream();
}
}
}
`com.pgp.learn.kafka.analytics.AnalyticsBinding`
package com.pgp.learn.kafka.analytics;

import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface AnalyticsBinding {

String PAGE_VIEW_OUT_CHANNEL = "pvout";
String PAGE_VIEW_IN_CHANNEL = "pvin";
String PAGE_COUNT_MV = "pcmvc";
String PAGE_COUNT_OUT = "pcout";
String PAGE_COUNT_IN = "pcin";

@Input (PAGE_VIEW_IN_CHANNEL)
KStream<String, PageViewEvent> pageViewsIn();

@Output (PAGE_VIEW_OUT_CHANNEL)
MessageChannel pageViewsOut();

@Output (PAGE_COUNT_OUT)
KStream<String, Long> pageCountOut();

}
`application.properties`

defaults

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serde$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serde$StringSerde
spring.cloud.stream.kafka.binder.brokers=54.173.206.255

page views out

spring.cloud.stream.bindings.pvout.destination=pvs
spring.cloud.stream.bindings.pvout.producer.header-mode=raw

page views in

spring.cloud.stream.bindings.pvin.destination=pvs
spring.cloud.stream.bindings.pvin.consumer.header-mode=raw

page count out

spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
`pom.xml`


4.0.0

org.springframework.boot
spring-boot-starter-parent
2.1.1.RELEASE


com.pgp.learn.kafka
analytics
0.0.1-SNAPSHOT
analytics
Demo project for Spring Boot

<properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Greenwich.RC2</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-test-support</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
    </repository>
</repositories>
vu8f3i0k

vu8f3i0k1#

您的配置属性中有一个输入错误: spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serde$StringSerde 在塞德找不到“s”。应该是 ...serialization.Serdes$StringSerde

相关问题