java.lang.classcastexception:class model-在主题中以消息形式发送json对象时,springboot kafka集成中出现错误

fae0ux8s  于 2021-08-20  发布在  Java
关注(0)|答案(1)|浏览(351)

我是Kafka新手,我面临mymodel类用户的以下问题[请求处理失败;嵌套异常为org.apache.kafka.common.errors.serializationexception:无法将class model.user的值转换为value.serializer]中指定的class org.apache.kafka.common.serialization.stringserializer,根本原因为java.lang.classcastexception:无法将class model.user转换为class java.lang.string(model.user位于加载器'应用';java.lang.string位于org.apache.kafka.common.serialization.stringserializer.serialize(stringserializer.java:28)~[kafka-clients-2.7.1.jar:na]处的加载程序“bootstrap”的模块java.base中*
我怀疑这是由于在Kafkanconfiguration中错误导入了stringserializer和jsonserializer
1-Kafka形象

package config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.connect.json.JsonSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import com.fasterxml.jackson.databind.ser.std.StringSerializer;

import model.User;

@Configuration
public class KafkaConfiguration {

    @Bean
    public ProducerFactory<String,User> producerFactory()
    {
        Map<String,Object> config=new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String,User> kafkaTemplate()
    {
        return new KafkaTemplate<>(producerFactory());
    }

}

2-用户资源类

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import model.User;

@RestController
@RequestMapping("kafka")
public class UserResource {

    @Autowired
    KafkaTemplate<String,User> kafkatemplate;
    public static final String TOPIC="Kafka_Example";

    @GetMapping("/publish/{name}")
    public String postMessage(@PathVariable("name") final String name)
    {

    kafkatemplate.send(TOPIC,new User(name,"Technology",12000L));

    return "Published successfully";
    }
}

3-用户类

package model;

public class User {

    private String name;
    private String dept;
    private long salary;

    public User(String name, String dept, long salary) {
        super();
        this.name = name;
        this.dept = dept;
        this.salary = salary;
    }

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getDept() {
        return dept;
    }
    public void setDept(String dept) {
        this.dept = dept;
    }
    public long getSalary() {
        return salary;
    }
    public void setSalary(long salary) {
        this.salary = salary;
    }

}

有人能告诉我哪里出了问题吗?是不是与进口有关(如果是,正确的是什么)?
谢谢

06odsfpq

06odsfpq1#

您的代码正确,但您导入了错误的stringserializer,请在导入下面使用

org.apache.kafka.common.serialization.StringSerializer
org.springframework.kafka.support.serializer.JsonSerializer

相关问题