kafka streams异常找不到org.apache.kafka.common.serialization.serdes$wrapperserde的公共无参数构造函数

jecbmhm3  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(472)

使用kafka流时获取以下错误堆栈跟踪
更新:根据@matthias-j-sax,我已经实现了我自己的 Serdes 使用默认构造函数 WrapperSerde 但仍有以下例外

org.apache.kafka.streams.errors.StreamsException: stream-thread [streams-request-count-4c239508-6abe-4901-bd56-d53987494770-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:836)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class myapps.serializer.Serdes$WrapperSerde
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:972)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
    at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: java.lang.NullPointerException
    at myapps.serializer.Serdes$WrapperSerde.configure (Serdes.java:30)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:968)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
    at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)

以下是我的用例:
我将获得json响应作为流的输入,我要计算状态代码不是200的请求。最初,我浏览了官方文档中关于Kafka流的文档以及confluent,然后实现了它 WordCountDemo 这是工作得很好,然后我试图写这段代码,但得到这个例外,我是非常新的Kafka流,我通过堆栈跟踪,但不能理解上下文,因此来这里寻求帮助!!!
这是我的密码 LogCount.java ```
package myapps;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import myapps.serializer.JsonDeserializer;
import myapps.serializer.JsonSerializer;
import myapps.Request;

public class LogCount {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-request-count");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    JsonSerializer<Request> requestJsonSerializer = new JsonSerializer<>();
    JsonDeserializer<Request> requestJsonDeserializer = new JsonDeserializer<>(Request.class);
    Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());
    final StreamsBuilder builder = new StreamsBuilder();

    KStream<String, Request> source = builder.stream("streams-requests-input");
    source.filter((k, v) -> v.getHttpStatusCode() != 200)
            .groupByKey()
            .count()
            .toStream()
            .to("streams-requests-output", Produced.with(Serdes.String(), Serdes.Long()));
    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);

    System.out.println(topology.describe());
    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.cleanUp();
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}

}
`JsonDeserializer.java`
package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;

public class JsonDeserializer implements Deserializer {

private Gson gson = new Gson();
private Class<T> deserializedClass;

public JsonDeserializer(Class<T> deserializedClass) {
    this.deserializedClass = deserializedClass;
}

public JsonDeserializer() {
}

@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
    if(deserializedClass == null) {
        deserializedClass = (Class<T>) map.get("serializedClass");
    }
}

@Override
public T deserialize(String s, byte[] bytes) {
     if(bytes == null){
         return null;
     }

     return gson.fromJson(new String(bytes),deserializedClass);

}

@Override
public void close() {

}

}
`JsonSerializer.java`
package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.Charset;
import java.util.Map;

public class JsonSerializer implements Serializer {

private Gson gson = new Gson();

@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public byte[] serialize(String topic, T t) {
    return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}

@Override
public void close() {

}

}

如前所述,我将获取json作为输入,结构如下,
{
“requestid”:“1f6b2409”,“protocol”:“http”,“host”:“.com”,“method”:“get”,“httpstatuscode”:“200”,“user agent”:“curl%2f7.54.0”,}
相应的 `Request.java` 文件看起来像这样

package myapps;

public final class Request {
private String requestID;
private String protocol;
private String host;
private String method;
private int httpStatusCode;
private String userAgent;

public String getRequestID() {
    return requestID;
}
public void setRequestID(String requestID) {
    this.requestID = requestID;
}
public String getProtocol() {
    return protocol;
}
public void setProtocol(String protocol) {
    this.protocol = protocol;
}
public String getHost() {
    return host;
}
public void setHost(String host) {
    this.host = host;
}
public String getMethod() {
    return method;
}
public void setMethod(String method) {
    this.method = method;
}
public int getHttpStatusCode() {
    return httpStatusCode;
}
public void setHttpStatusCode(int httpStatusCode) {
    this.httpStatusCode = httpStatusCode;
}
public String getUserAgent() {
    return userAgent;
}
public void setUserAgent(String userAgent) {
    this.userAgent = userAgent;
}

}

编辑:当我退出 `kafka-console-consumer.sh` ,它在说 `Processed a total of 0 messages` . 
3pvhb19x

3pvhb19x1#

这个 requestSerde.getClass().getName() 不适合我。我需要提供我自己的 WrapperSerde 内部类中的实现。你可能也需要这样做:

public class MySerde extends WrapperSerde<Request> {
    public MySerde () {
        super(requestJsonSerializer, requestJsonDeserializer);
    }
}
chhkpiq4

chhkpiq42#

如错误所示,类缺少 Serdes$WrapperSerde :

Could not find a public no-argument constructor

问题在于这种结构:

Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());
``` `Serdes.serdeFrom` 返回 `WrapperSerde` 没有空默认构造函数的。因此,您不能将其传递到 `StreamsConfig` . 你可以用 `Serdes` 仅当您将对象传递到相应的api调用(即,覆盖默认值)时才生成这样的结果 `Serde` 对于某些操作员)。
为了使其工作(即,能够在配置中设置serde),您需要实现一个适当的类来实现 `Serde` 接口。

相关问题