线程“streaming-job-executor-0”java.lang.noclassdeffounderror中出现异常

u5i3ibmn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(433)

当我在执行sparksql操作时,我被提到了一个错误。我正在运行spark1.6.3版本,并确保所有jar都属于同一个pom.xml版本,并粘贴了代码以供参考。
我尝试了很多方法来解析在sparkconsumer中传输的json数据。我被以下问题困住了。请帮忙
注意:jars中没有版本不匹配所有spark、streaming、sql都属于1.6.3。博客说,提到的错误堆栈是由于jars版本不匹配造成的。


* package datapipeline;

    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.storage.StorageLevel;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import org.json.simple.JSONObject;
    import org.json.simple.parser.JSONParser;
    import org.onosproject.net.Device;
    import scala.Tuple2;
    public final class SparkConsumer {
        //private static SparkContext sc = new SparkContext();
        private static final Pattern SPACE = Pattern.compile(" ");
        private static void setLogLevels() {
            boolean log4jInitialized = Logger.getRootLogger().getAllAppenders().hasMoreElements();
            if (!log4jInitialized) {
                // We first log something to initialize Spark's default logging, then we override the
                // logging level.
                Logger.getLogger(SparkConsumer.class).info("Setting log level to [WARN] for streaming example." +
                        " To override add a custom log4j.properties to the classpath.");
                Logger.getRootLogger().setLevel(Level.WARN);
            }
        }
        public static void main(String[] args) throws Exception{
            String jars[]={"C:\\DeviceStreaming-1.0.0.jar"};
            setLogLevels()
            SparkConf sparkConf = new SparkConf().setAppName("CustomerKafkaConsumerThread")
                    .set("spark.local.ip","16.214.240.4:9092")
                    .setMaster("local[*]").setJars(jars);
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000));
            JavaSparkContext ctx = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf));
            SQLContext sqlContext = new SQLContext(ctx);
            Map<String, Integer> topicMap = new HashMap<>();
            topicMap.put("iot", 10);
            JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc,"16.214.240.4:2181","SparkConsumer", topicMap,StorageLevel.MEMORY_ONLY());
            messages.print();

            JavaDStream<String> json = messages.map(
                    new Function<Tuple2<String, String>, String>() {
                        public String call(Tuple2<String, String> message) {

                            return message._2();
                        }
                    }
                );

            json.foreachRDD(rdd -> {

                //DataFrame df = sqlContext.read().json(rdd);
                DataFrame df=sqlContext.createDataFrame(rdd, Device.class);
                df.show();
                df.printSchema();
            });

            jssc.start();
            jssc.awaitTermination();

        }
    }*

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.iot.app.kafka</groupId>
    <artifactId>DeviceStreaming</artifactId>
    <packaging>jar</packaging>
    <version>1.0.0</version>
    <name>DeviceStreaming</name>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.7.1-1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>
        <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-network-common_2.11</artifactId> 
            <version>2.1.0</version> </dependency> -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-network-common_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-network-common_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.glassfish/javax.json -->
        <dependency>
             <groupId>org.glassfish</groupId>
             <artifactId>javax.json</artifactId>
             <version>1.0.4</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.spark-project.spark</groupId>
            <artifactId>unused</artifactId>
            <version>1.0.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.4.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/javax.json/javax.json-api -->
        <dependency>
            <groupId>javax.json</groupId>
            <artifactId>javax.json-api</artifactId>
            <version>1.1</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.4</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.10</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>14.0.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.onosproject/onos-api -->
        <dependency>
            <groupId>org.onosproject</groupId>
            <artifactId>onos-api</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
             <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
             <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.10</artifactId>
            <version>0.9.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple -->
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1</version>
</dependency>
    </dependencies>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.10.4</scala.version>
    </properties>
</project>

    *Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
        at scala.reflect.internal.util.WeakHashSet.<init>(WeakHashSet.scala:19)
        at scala.reflect.internal.util.WeakHashSet$.apply(WeakHashSet.scala:429)
        at scala.reflect.internal.SymbolTable$perRunCaches$.<init>(SymbolTable.scala:310)
        at scala.reflect.internal.SymbolTable.perRunCaches$lzycompute(SymbolTable.scala:304)
        at scala.reflect.internal.SymbolTable.perRunCaches(SymbolTable.scala:304)
        at scala.reflect.internal.Symbols$class.$init$(Symbols.scala:71)
        at scala.reflect.internal.SymbolTable.<init>(SymbolTable.scala:13)
        at scala.reflect.runtime.JavaUniverse.<init>(JavaUniverse.scala:12)
        at scala.reflect.runtime.package$.universe$lzycompute(package.scala:16)
        at scala.reflect.runtime.package$.universe(package.scala:16)
        at org.apache.spark.sql.types.AtomicType.<init>(AbstractDataType.scala:134)
        at org.apache.spark.sql.types.NumericType.<init>(AbstractDataType.scala:144)
        at org.apache.spark.sql.types.FractionalType.<init>(AbstractDataType.scala:207*
uubf1zoe

uubf1zoe1#

你在里面混用scala版本。

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <scala.version>2.10.4</scala.version>
</properties>

这里说的是2.10,但是大多数(不是所有)依赖项都使用2.11二进制文件。

<artifactId>spark-core_2.11</artifactId>

您需要保持它的一致性(最好是在2.11版本),因为scala在不同版本之间是不兼容的。

相关问题