Kafka java.lang.ClassCastException:类java.lang.String不能强制转换为类[B(java.lang.String和[B位于加载程序'bootstrap'的模块java.base中

z3yyvxxp  于 2022-12-11  发布在  Apache
关注(0)|答案(1)|浏览(603)

I am using Spring Boot 2.7.0 and Spring Cloud Microservices stack where I'm trying to send the notification though kafka and getting the below error -
Error -

2022-06-12 13:18:51.114  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2] Instantiated an idempotent producer.
2022-06-12 13:18:51.179  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-06-12 13:18:51.180  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-06-12 13:18:51.181  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1655020131179
2022-06-12 13:18:51.221  INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: tK0r-yw2TY-UsYDTFnMFsQ
2022-06-12 13:18:51.223  INFO [order-service,,] 21889 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] ProducerId set to 8004 with epoch 0
2022-06-12 13:18:51.332  INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Resetting the last seen epoch of partition t-order-0 to 0 since the associated topicId changed from null to 0H45cT8iQniqxpT2hWBtiQ
2022-06-12 13:18:51.372 ERROR [order-service,52ea0644f93a80da,a074556857b1af66] 21889 --- [o-auto-1-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] threw exception

java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:954) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) ~[kafka-clients-3.1.1.jar:na]
    at brave.kafka.clients.TracingProducer.send(TracingProducer.java:129) ~[brave-instrumentation-kafka-clients-5.13.9.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403) ~[spring-kafka-2.8.6.jar:2.8.6]
    at com.example.orderservice.service.OrderService.placeOrder(OrderService.java:98) ~[classes/:na]
    at com.example.orderservice.service.OrderService$$FastClassBySpringCGLIB$$35786a76.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.20.jar:5.3.20]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.20.jar:5.3.20]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-5.3.20.jar:5.3.20]
    at com.example.orderservice.service.OrderService$$EnhancerBySpringCGLIB$$fd989b0.placeOrder(<generated>) ~[classes/:na]
    at com.example.orderservice.controller.OrderController.lambda$0(OrderController.java:33) ~[classes/:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[na:na]

Order-service

package com.example.orderservice.service;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.transaction.Transactional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;

import com.example.orderservice.dto.InventoryResponse;
import com.example.orderservice.dto.OrderDto;
import com.example.orderservice.dto.OrderLineItemsDto;
import com.example.orderservice.dto.OrderRequest;
import com.example.orderservice.model.Order;
import com.example.orderservice.model.OrderLineItems;
import com.example.orderservice.repository.OrderRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import brave.Span;
import brave.Tracer;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
@Transactional
public class OrderService {

    private static final String INVENTORY_SERVICE_URI = "http://inventory-service/api/inventory";

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private WebClient.Builder webClientBuilder;

    @Autowired
    private Tracer tracer;

    @Autowired
    private StreamBridge streamBridge;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    public String placeOrder(OrderRequest orderRequest) {
        log.info("OrderService | placeOrder method called ");
        List<OrderLineItems> orderLineItems = orderRequest.getOrderLineItemsDtoList()
                .stream()
                .map(this::mapToDto)
                .collect(Collectors.toList());

        Order order = new Order();
        order.setOrderNumber(UUID.randomUUID().toString());
        order.setOrderLineItemsList(orderLineItems);

        List<String> skuCodes = order.getOrderLineItemsList()
                .stream()
                .map(orderLineItem -> orderLineItem.getSkuCode())
                .collect(Collectors.toList());

        Span inventoryServiceLookup = tracer.nextSpan().name("InventoryServiceLookup");

        try (Tracer.SpanInScope isLookup = tracer.withSpanInScope(inventoryServiceLookup.start())){
            inventoryServiceLookup.tag("call", "inventory-service");

            // Call Inventory Service, and place order if product is in stock
            boolean allProductsInStock = webClientBuilder.build()
                    .get()
                    .uri(INVENTORY_SERVICE_URI, uriBuilder -> uriBuilder.queryParam("skuCode", skuCodes).build())
                    .retrieve()
                    .bodyToMono(InventoryResponse[].class)
                    .map(e -> Arrays.stream(e))
                    .block()
                    .allMatch(InventoryResponse::isInStock);

            if(allProductsInStock){
                orderRepository.save(order);

                // send notification to RabbitMQ
                streamBridge.send("notificationEventSupplier-out-0", this.getMessage(order));

                // Kafka
                String orderStr = getJsonString(new OrderDto(order.getOrderNumber()));
                kafkaTemplate.send("t-order", orderStr);

                return "Order Placed Successfully";
            } else {
                throw new IllegalArgumentException("Product is not in the stock, please try again later");
            }

        }finally {
            inventoryServiceLookup.flush();
        }
    }

    private OrderLineItems mapToDto(OrderLineItemsDto orderLineItemsDto) {
        OrderLineItems orderLineItems = new OrderLineItems();
        orderLineItems.setPrice(orderLineItemsDto.getPrice());
        orderLineItems.setQuantity(orderLineItemsDto.getQuantity());
        orderLineItems.setSkuCode(orderLineItemsDto.getSkuCode());
        return orderLineItems;
    }

    private Message<OrderDto> getMessage(Order order){
        return MessageBuilder.withPayload(new OrderDto(order.getOrderNumber())).build();
    }

    private String getJsonString(OrderDto orderDto) {
        String json = null;
        try {
            json = objectMapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException e) {
            log.error("JsonProcessingException | e", e);
        }
        return json;
    }

}

application.properties

spring.application.name=order-service

#MySQL DB
spring.datasource.url=jdbc:mysql://localhost:3306/order-service
spring.datasource.username=root
spring.datasource.password=Password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect

spring.jpa.hibernate.ddl-auto=update
#spring.jpa.show-sql=true
spring.sql.init.mode=always
#spring.jpa.properties.hibernate.format_sql=true

server.port=0

#Eureka
#eureka.instance.prefer-ip-address=true
eureka.instance.hostname=localhost
eureka.client.serviceUrl.defaultZone=http://eureka:password@localhost:8761/eureka

eureka.client.register-with-eureka=true
eureka.client.fetch-registry=true
#spring.cloud.discovery.enabled=true


management.health.circuitbreakers.enabled=true
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

#Resilinece4j Properties
resilience4j.circuitbreaker.instances.inventory.registerHealthIndicator=true
resilience4j.circuitbreaker.instances.inventory.event-consumer-buffer-size=10
resilience4j.circuitbreaker.instances.inventory.slidingWindowType=COUNT_BASED
resilience4j.circuitbreaker.instances.inventory.slidingWindowSize=5
resilience4j.circuitbreaker.instances.inventory.failureRateThreshold=50
resilience4j.circuitbreaker.instances.inventory.waitDurationInOpenState=5s
resilience4j.circuitbreaker.instances.inventory.permittedNumberOfCallsInHalfOpenState=3
resilience4j.circuitbreaker.instances.inventory.automaticTransitionFromOpenToHalfOpenEnabled=true

#Resilience4J Timeout Properties
resilience4j.timelimiter.instances.inventory.timeout-duration=3s

#Resilience4J Retry Properties
resilience4j.retry.instances.inventory.max-attempts=3
resilience4j.retry.instances.inventory.wait-duration=5s

#Spring Cloud Stream Kafka Properties
spring.cloud.stream.output-bindings=notificationEventSupplier
spring.cloud.stream.bindings.notificationEventSupplier-out-0.destination=notification-events
spring.sleuth.integration.enabled=true

#Zipkin Properties
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.probability=1
c3frrgcw

c3frrgcw1#

您还没有在任何地方定义StringSerializer(至少在所示代码中)。
因此,KafkaTemplate默认配置为使用ByteArraySerializer
您可以使用objectMapper.writeValueAsBytes而不是objectMapper.writeValueAsString,并将所有相关的方法和泛型Kafka值类型切换为byte[]

相关问题