Kafka 导致错误的原因:java.lang.递归更新在java.util.concurrent.并发哈希Map.计算如果不存在(并发哈希Map. java:1760)

sigwle7e  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(94)

我正在从www.example.com上学习Apache Kafka和Confluent Kafka教程https://udemy.com/course/kafka-streams-real-time-stream-processing-master-class/learn/lecture/14243830#questions,并尝试从主题中读取avro流,并根据特定条件将数据发送到不同的主题
错误:

[2022-11-01 12:35:41,529] (org.apache.kafka.streams.KafkaStreams) - ERROR stream-client [PosFanout-78308e33-2baa-49a8-a181-22e9c5deea42] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  
com.google.common.util.concurrent.ExecutionError: java.lang.ExceptionInInitializerError
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946) ~[guava-30.1.1-jre.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:231) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:404) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:53) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66) ~[kafka-streams-avro-serde-7.2.2.jar:?]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38) ~[kafka-streams-avro-serde-7.2.2.jar:?]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:204) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:128) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:985) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1070) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:947) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:735) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588) ~[kafka-streams-3.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550) ~[kafka-streams-3.3.1.jar:?]
Caused by: java.lang.ExceptionInInitializerError
    at java.lang.Class.forName0(Native Method) ~[?:?]
    at java.lang.Class.forName(Class.java:398) ~[?:?]
    at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:95) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:72) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.specific.SpecificData.lambda$getClass$2(SpecificData.java:256) ~[avro-1.11.0.jar:1.11.0]
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[?:?]
    at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:254) ~[avro-1.11.0.jar:1.11.0]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getSpecificReaderSchema(AbstractKafkaAvroDeserializer.java:290) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getReaderSchema(AbstractKafkaAvroDeserializer.java:264) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.access$000(AbstractKafkaAvroDeserializer.java:49) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$1.load(AbstractKafkaAvroDeserializer.java:65) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$1.load(AbstractKafkaAvroDeserializer.java:60) ~[kafka-avro-serializer-7.2.2.jar:?]
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[guava-30.1.1-jre.jar:?]
    ... 21 more
Caused by: java.lang.IllegalStateException: Recursive update
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1760) ~[?:?]
    at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:254) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.specific.SpecificData.getForSchema(SpecificData.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.specific.SpecificDatumWriter.<init>(SpecificDatumWriter.java:43) ~[avro-1.11.0.jar:1.11.0]
    at guru.learningjournal.kafka.examples.types.PosInvoice.<clinit>(PosInvoice.java:1346) ~[classes/:?]
    at java.lang.Class.forName0(Native Method) ~[?:?]
    at java.lang.Class.forName(Class.java:398) ~[?:?]
    at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:95) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:72) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.specific.SpecificData.lambda$getClass$2(SpecificData.java:256) ~[avro-1.11.0.jar:1.11.0]
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[?:?]
    at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:254) ~[avro-1.11.0.jar:1.11.0]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getSpecificReaderSchema(AbstractKafkaAvroDeserializer.java:290) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getReaderSchema(AbstractKafkaAvroDeserializer.java:264) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.access$000(AbstractKafkaAvroDeserializer.java:49) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$1.load(AbstractKafkaAvroDeserializer.java:65) ~[kafka-avro-serializer-7.2.2.jar:?]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$1.load(AbstractKafkaAvroDeserializer.java:60) ~[kafka-avro-serializer-7.2.2.jar:?]
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[guava-30.1.1-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[guava-30.1.1-jre.jar:?]
    ... 21 more
[2022-11-01 12:35:41,602] (guru.learningjournal.kafka.examples.PosFanoutApp) - INFO Stopping Stream

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>guru.learningjournal.kafka.examples</groupId>
    <artifactId>16-pos-fanout-avro</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <java.version>11</java.version>
    </properties>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>7.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.19.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Maven Compiler Plugin-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- Maven Avro plugin for generating pojo-->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                            <imports>
                                <import>${project.basedir}/src/main/resources/schema/LineItem.avsc</import>
                                <import>${project.basedir}/src/main/resources/schema/DeliveryAddress.avsc</import>
                            </imports>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

MainApp.java

package guru.learningjournal.kafka.examples;

import guru.learningjournal.kafka.examples.types.PosInvoice;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public class PosFanoutApp {
    private static final Logger logger = LogManager.getLogger();

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, PosInvoice> KS0 = builder.stream(AppConfigs.posTopicName,
            Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));

        KS0.filter((k, v) -> AppConfigs.DELIVERY_TYPE_HOME_DELIVERY.equalsIgnoreCase(v.getDeliveryType().toString()))
            .to(AppConfigs.shipmentTopicName, Produced.with(AppSerdes.String(), AppSerdes.PosInvoice()));

        KS0.filter((k, v) -> AppConfigs.CUSTOMER_TYPE_PRIME.equalsIgnoreCase(v.getCustomerType().toString()))
            .mapValues(RecordBuilder::getNotification)
            .to(AppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));

        KS0.mapValues(RecordBuilder::getMaskedInvoice)
            .flatMapValues(RecordBuilder::getHadoopRecords)
            .to(AppConfigs.hadoopTopic, Produced.with(AppSerdes.String(), AppSerdes.HadoopRecord()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("Stopping Stream");
            streams.close();
        }));
    }
}

源代码如下:https://github.com/javaHelper/Apache-Kafka---Real-time-Stream-Processing-Master-Class-/tree/main/16-pos-fanout-avro

jyztefdp

jyztefdp1#

正在阅读堆栈跟踪... Avro正在查找类。

Caused by: java.lang.ExceptionInInitializerError
    at java.lang.Class.forName0(Native Method) ~[?:?]
    at java.lang.Class.forName(Class.java:398) ~[?:?]
    at org.apache.avro.util.ClassUtils.forName

基于错误Recursive update,您将查看模式中是否存在“递归定义”的内容...
查看存储库中的schema/PosInvoice.avsc,可以看到DeliveryAddress字段是DeliveryAddress类型。

解决方案:你的avro字段名不应该大写。Avro解析器会混淆哪一个是类,哪一个是字段名。更具体地说,它会尝试创建一个Java类,比如

class PosInvoice extends SpecificRecord {
    private DeliveryAddress DeliveryAddress;  // wont compile
}

此外,如果您希望交叉引用Avro类型,我建议使用.avdl files而不是.avsc

相关问题