I am using Spring Boot 2.7.0 and Spring Cloud Microservices stack where I'm trying to send the notification though kafka and getting the below error -
Error -
2022-06-12 13:18:51.114 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Instantiated an idempotent producer.
2022-06-12 13:18:51.179 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.1
2022-06-12 13:18:51.180 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 97671528ba54a138
2022-06-12 13:18:51.181 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1655020131179
2022-06-12 13:18:51.221 INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: tK0r-yw2TY-UsYDTFnMFsQ
2022-06-12 13:18:51.223 INFO [order-service,,] 21889 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to 8004 with epoch 0
2022-06-12 13:18:51.332 INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Resetting the last seen epoch of partition t-order-0 to 0 since the associated topicId changed from null to 0H45cT8iQniqxpT2hWBtiQ
2022-06-12 13:18:51.372 ERROR [order-service,52ea0644f93a80da,a074556857b1af66] 21889 --- [o-auto-1-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] threw exception
java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:954) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) ~[kafka-clients-3.1.1.jar:na]
at brave.kafka.clients.TracingProducer.send(TracingProducer.java:129) ~[brave-instrumentation-kafka-clients-5.13.9.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403) ~[spring-kafka-2.8.6.jar:2.8.6]
at com.example.orderservice.service.OrderService.placeOrder(OrderService.java:98) ~[classes/:na]
at com.example.orderservice.service.OrderService$$FastClassBySpringCGLIB$$35786a76.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.20.jar:5.3.20]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.20.jar:5.3.20]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-5.3.20.jar:5.3.20]
at com.example.orderservice.service.OrderService$$EnhancerBySpringCGLIB$$fd989b0.placeOrder(<generated>) ~[classes/:na]
at com.example.orderservice.controller.OrderController.lambda$0(OrderController.java:33) ~[classes/:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) ~[na:na]
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[na:na]
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[na:na]
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[na:na]
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[na:na]
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[na:na]
Order-service
package com.example.orderservice.service;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.transaction.Transactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import com.example.orderservice.dto.InventoryResponse;
import com.example.orderservice.dto.OrderDto;
import com.example.orderservice.dto.OrderLineItemsDto;
import com.example.orderservice.dto.OrderRequest;
import com.example.orderservice.model.Order;
import com.example.orderservice.model.OrderLineItems;
import com.example.orderservice.repository.OrderRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import brave.Span;
import brave.Tracer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@Transactional
public class OrderService {
private static final String INVENTORY_SERVICE_URI = "http://inventory-service/api/inventory";
@Autowired
private OrderRepository orderRepository;
@Autowired
private WebClient.Builder webClientBuilder;
@Autowired
private Tracer tracer;
@Autowired
private StreamBridge streamBridge;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public String placeOrder(OrderRequest orderRequest) {
log.info("OrderService | placeOrder method called ");
List<OrderLineItems> orderLineItems = orderRequest.getOrderLineItemsDtoList()
.stream()
.map(this::mapToDto)
.collect(Collectors.toList());
Order order = new Order();
order.setOrderNumber(UUID.randomUUID().toString());
order.setOrderLineItemsList(orderLineItems);
List<String> skuCodes = order.getOrderLineItemsList()
.stream()
.map(orderLineItem -> orderLineItem.getSkuCode())
.collect(Collectors.toList());
Span inventoryServiceLookup = tracer.nextSpan().name("InventoryServiceLookup");
try (Tracer.SpanInScope isLookup = tracer.withSpanInScope(inventoryServiceLookup.start())){
inventoryServiceLookup.tag("call", "inventory-service");
// Call Inventory Service, and place order if product is in stock
boolean allProductsInStock = webClientBuilder.build()
.get()
.uri(INVENTORY_SERVICE_URI, uriBuilder -> uriBuilder.queryParam("skuCode", skuCodes).build())
.retrieve()
.bodyToMono(InventoryResponse[].class)
.map(e -> Arrays.stream(e))
.block()
.allMatch(InventoryResponse::isInStock);
if(allProductsInStock){
orderRepository.save(order);
// send notification to RabbitMQ
streamBridge.send("notificationEventSupplier-out-0", this.getMessage(order));
// Kafka
String orderStr = getJsonString(new OrderDto(order.getOrderNumber()));
kafkaTemplate.send("t-order", orderStr);
return "Order Placed Successfully";
} else {
throw new IllegalArgumentException("Product is not in the stock, please try again later");
}
}finally {
inventoryServiceLookup.flush();
}
}
private OrderLineItems mapToDto(OrderLineItemsDto orderLineItemsDto) {
OrderLineItems orderLineItems = new OrderLineItems();
orderLineItems.setPrice(orderLineItemsDto.getPrice());
orderLineItems.setQuantity(orderLineItemsDto.getQuantity());
orderLineItems.setSkuCode(orderLineItemsDto.getSkuCode());
return orderLineItems;
}
private Message<OrderDto> getMessage(Order order){
return MessageBuilder.withPayload(new OrderDto(order.getOrderNumber())).build();
}
private String getJsonString(OrderDto orderDto) {
String json = null;
try {
json = objectMapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
log.error("JsonProcessingException | e", e);
}
return json;
}
}
application.properties
spring.application.name=order-service
#MySQL DB
spring.datasource.url=jdbc:mysql://localhost:3306/order-service
spring.datasource.username=root
spring.datasource.password=Password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
spring.jpa.hibernate.ddl-auto=update
#spring.jpa.show-sql=true
spring.sql.init.mode=always
#spring.jpa.properties.hibernate.format_sql=true
server.port=0
#Eureka
#eureka.instance.prefer-ip-address=true
eureka.instance.hostname=localhost
eureka.client.serviceUrl.defaultZone=http://eureka:password@localhost:8761/eureka
eureka.client.register-with-eureka=true
eureka.client.fetch-registry=true
#spring.cloud.discovery.enabled=true
management.health.circuitbreakers.enabled=true
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
#Resilinece4j Properties
resilience4j.circuitbreaker.instances.inventory.registerHealthIndicator=true
resilience4j.circuitbreaker.instances.inventory.event-consumer-buffer-size=10
resilience4j.circuitbreaker.instances.inventory.slidingWindowType=COUNT_BASED
resilience4j.circuitbreaker.instances.inventory.slidingWindowSize=5
resilience4j.circuitbreaker.instances.inventory.failureRateThreshold=50
resilience4j.circuitbreaker.instances.inventory.waitDurationInOpenState=5s
resilience4j.circuitbreaker.instances.inventory.permittedNumberOfCallsInHalfOpenState=3
resilience4j.circuitbreaker.instances.inventory.automaticTransitionFromOpenToHalfOpenEnabled=true
#Resilience4J Timeout Properties
resilience4j.timelimiter.instances.inventory.timeout-duration=3s
#Resilience4J Retry Properties
resilience4j.retry.instances.inventory.max-attempts=3
resilience4j.retry.instances.inventory.wait-duration=5s
#Spring Cloud Stream Kafka Properties
spring.cloud.stream.output-bindings=notificationEventSupplier
spring.cloud.stream.bindings.notificationEventSupplier-out-0.destination=notification-events
spring.sleuth.integration.enabled=true
#Zipkin Properties
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.probability=1
1条答案
按热度按时间c3frrgcw1#
您还没有在任何地方定义
StringSerializer
(至少在所示代码中)。因此,KafkaTemplate默认配置为使用ByteArraySerializer
您可以使用
objectMapper.writeValueAsBytes
而不是objectMapper.writeValueAsString
,并将所有相关的方法和泛型Kafka值类型切换为byte[]