cassandra类

col17t5w  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(561)

我正在使用spark从Kafka那里获取数据并将其插入Cassandra。我的程序是

public static void fetchAndValidateData() {
    SparkConf sparkConf = new SparkConf().setAppName("name")
            .set("spark.cassandra.connection.host", "127.0.0.1")
            .set("spark.cleaner.ttl", "3600");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    Map<String,String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "127.0.0.1");
    kafkaParams.put("group.id", App.GROUP);
    JavaPairReceiverInputDStream<String, EventLog> messages =
            KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
                    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
    JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
        @Override
        public EventLog call(Tuple2<String, EventLog> tuple2) {
            return tuple2._2();
        }
    });
    lines.foreachRDD(rdd -> { javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra(); });
    jssc.start();
    try {
        jssc.awaitTermination();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    jssc.stop();
    jssc.close();
}

我的 spark-submit 命令是 C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT.jar 我的pom文件是

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.jtv</groupId>
  <artifactId>spark.atnt</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark.atnt</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <build>
    <plugins>  
       <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>single</goal>
                  </goals>
              </execution>
          </executions>
          <configuration>
              <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
          </configuration>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.11</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.11</artifactId>
      <version>1.6.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>javax.json</groupId>
        <artifactId>javax.json-api</artifactId>
        <version>1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>    
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

我要走了 java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil 错误。
怎么解决?
编辑:
我找出了问题的原因。它是 org.apache.kafka:kafka_2.10:0.8.0 . 当我加上 provided 为了它,我得到了 Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:single (default) on project spark.atnt: Failed to create assembly: Failed to resolve dependencies for project: com.jtv:spark.atnt:jar:0.0.1-SNAPSHOT: Could not transfer artifact com.sun.jdmk:jmxtools:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory 我的错误 mvn package 当我把它移走的时候 java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil 我的错误 spark-submit 命令。

mefy6pfw

mefy6pfw1#

解决这个问题最简单的方法是在jar文件中打包cassandra库。
为此,您可以在pom.xml中使用maven assembly插件:

<plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>

这个插件将把所有依赖项打包到jar文件中。如果您想阻止打包某些依赖项(例如spark),则需要添加标记 <scope>provided</scope> . 例如:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
        <scope>provided</scope>
    </dependency>

请注意,如果您使用上述程序集插件,您将在目标文件夹中获得两个jar文件。如果要使用完整的jar,则需要运行: C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT-jar-with-dependencies.jar

相关问题