线程“main”java.lang.noclassdeffounderror中出现异常:org/spark\u project/guava/cache/cacheloader

ghhaqwfi  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(594)

当我试图执行我的KafkaSpark项目。我得到以下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/spark_project/guava/cache/CacheLoader
    at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:73)
    at org.apache.spark.SparkConf.<init>(SparkConf.scala:68)
    at org.apache.spark.SparkConf.<init>(SparkConf.scala:55)

我尝试了以下方法,这些方法已经在论坛上提出:1)添加

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.iot.app.kafka</groupId>
<artifactId>iot-kafka-producer</artifactId>
<version>1.0.0</version>
<name>IoT Kafka Producer</name>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-network-common_2.11</artifactId>
        <version>1.6.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>

    <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> 
        <version>2.1.0</version> </dependency> -->
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.spark-project.spark</groupId>
        <artifactId>unused</artifactId>
        <version>1.0.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.6</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>19.0</version>
    </dependency>

</dependencies>

code:spark consumer 代码

package datapipeline;

import java.io.FileInputStream;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class CustomerKafkaConsumerThread {
    String broker;
    private static final Pattern SPACE = Pattern.compile(" ");

    public void  sparkKafkaConsumer(String topics,String broker){
        this.broker=broker;
        SparkConf conf=new SparkConf().setAppName("CustomerKafkaConsumerThread").setMaster("local");
        JavaStreamingContext jssc=new JavaStreamingContext(conf, new Duration(2000));

        Map<String, String> kafkaParams=new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list",broker);
        Set<String> topicSet=Collections.singleton(topics);

         // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> messages=KafkaUtils.createDirectStream(
        jssc, 
        String.class,
        String.class, 
        StringDecoder.class, 
        StringDecoder.class, 
        kafkaParams, 
        topicSet);

        JavaDStream<String> lines = messages.map(Tuple2::_2);
        System.out.println("........." + lines);
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)));
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
            .reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();

        // Start the computation
        jssc.start();
        jssc.awaitTermination();

    }
}

2) 从eclipse的构建路径中删除google.guava jarfile,并再次作为外部jar添加。
但以下两种方法对我的情况没有帮助。
请有人帮我解决这个问题。提前谢谢

tp5buhyn

tp5buhyn1#

你需要用同样的方法 scala 依赖项中的版本。请换衣服试试 spark-streaming-kafka_2.10spark-streaming-kafka_2.11 :

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.3</version>
</dependency>

并使用相同的 spark 版本。例如 1.6.3 :

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-network-common_2.11</artifactId>
    <version>1.6.3</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.3</version>
</dependency>    
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>1.6.3</version>
</dependency>

相关问题