Spark Cassandra Java集成问题

xqkwcwgp  于 2022-11-05  发布在  Cassandra
关注(0)|答案(3)|浏览(165)

我对斯巴克和 cassandra 都是新来的。
我试图在Cassandra Data上使用spark+java实现聚合功能。
我无法在我的代码中获取Cassandra数据。我阅读了多个讨论,发现与spark和spark-Cassandra连接器存在一些兼容性问题。我尝试了很多方法来修复我的问题,但都无法修复。
找到下面的pom.xml(请不要介意额外的依赖关系。我需要确定是哪个库导致了这个问题)-

<?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>IBeatCassPOC</groupId>
<artifactId>ibeatCassPOC</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>

    <!--CASSANDRA START-->
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-mapping</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-extras</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.sparkjava</groupId>
        <artifactId>spark-core</artifactId>
        <version>2.5.4</version>
    </dependency>

    <!--https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10-->
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>2.0.0-M3</version>
    </dependency>
    <!--CASSANDRA END-->
    <!-- Kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>

    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.2</version>
    </dependency>

    <!-- Spark -->

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>

    <!-- Logging -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.1.0</version>
</dependency>

    <!-- Spark-Kafka -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
    <!-- Jackson -->
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.9.13</version>
    </dependency>

    <!-- Google Collection Library -->
    <dependency>
        <groupId>com.google.collections</groupId>
        <artifactId>google-collections</artifactId>
        <version>1.0-rc2</version>
    </dependency>

    <!--UA Detector dependency for AgentType in PageTrendLog-->
    <dependency>
        <groupId>net.sf.uadetector</groupId>
        <artifactId>uadetector-core</artifactId>
        <version>0.9.12</version>
    </dependency>
    <dependency>
        <groupId>net.sf.uadetector</groupId>
        <artifactId>uadetector-resources</artifactId>
        <version>2013.12</version>
    </dependency>

    <dependency>
        <groupId>com.esotericsoftware</groupId>
        <artifactId>kryo</artifactId>
        <version>3.0.3</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-stream</artifactId>
        <version>4.0.4</version>
    </dependency>

    <!-- MongoDb Java Connector -->
    <!-- <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId>
        <version>2.13.0</version> </dependency> -->

</dependencies>

用于获取数据的Java源代码-

import com.datastax.spark.connector.japi.CassandraJavaUtil;
    import com.datastax.spark.connector.japi.CassandraRow;
    import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;                
    import java.util.ArrayList;

    public class ReadCassData {
        public static void main(String[] args) {

            SparkConf sparkConf = new SparkConf();
            sparkConf.setAppName("Spark-Cassandra Integration");
            sparkConf.setMaster("local[4]");
            sparkConf.set("spark.cassandra.connection.host", "stagingServer22");
            sparkConf.set("spark.cassandra.connection.port", "9042");

            sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
            sparkConf.set("spark.cassandra.read.timeout_ms", "200000");

            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            String keySpaceName = "testKeyspace";
            String tableName = "testTable";

            CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);
            System.out.println("Cassandra Count" + cassandraRDD.cassandraCount());
            final ArrayList<CassandraRow> data = new ArrayList<CassandraRow>();

            cassandraRDD.reduce(new Function2<CassandraRow, CassandraRow, CassandraRow>() {
                public CassandraRow call(CassandraRow v1, CassandraRow v2) throws Exception {
                    System.out.println("hello");
                    System.out.println(v1 + " ____ " + v2);
                    data.add(v1);
                    data.add(v2);
                    return null;
                }
            });
            System.out.println( "data Size -" + data.size());

        }
    }

遇到的异常为-

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost): java.lang.NoSuchMethodError: org.apache.spark.TaskContext.getMetricsSources(Ljava/lang/String;)Lscala/collection/Seq;
        at org.apache.spark.metrics.MetricsUpdater$.getSource(MetricsUpdater.scala:20)
        at org.apache.spark.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:56)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:329)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

我在一个远程位置部署了Cassandra集群,使用的Cassandra版本是3.9。
请说明兼容的依赖项是什么。我不能改变我的Cassandra版本(目前是3.9)。请建议使用什么spark/spark-cassandra-connector版本来成功执行数据库上的map-reduce作业。

r55awzrz

r55awzrz1#

我已经尝试与Spark连接,并已使用SparkCassandra连接器在斯卡拉。
数据库连接器1.6.0”
“”“”“”“”
下面是我的工作代码-

import com.datastax.driver.dse.graph.GraphResultSet
import com.spok.util.LoggerUtil
import com.datastax.spark.connector._
import org.apache.spark._

object DseSparkGraphFactory extends App {

  val dseConn = {         

     LoggerUtil.info("Connecting with DSE Spark Cluster....")
        val conf = new SparkConf(true)
          .setMaster("local[*]")
          .setAppName("test")
          .set("spark.cassandra.connection.host", "Ip-Address")
        val sc = new SparkContext(conf)
        val rdd = sc.cassandraTable("spokg_test", "Url_p")
        rdd.collect().map(println)

  }
enxuqcxy

enxuqcxy2#

请参考Cassandra Spark Connector,以了解连接器的相关版本,具体取决于您环境中的spark版本。应该是1.5、1.6或2.0

jmo0nnb3

jmo0nnb33#

以下POM对我很有效:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>

    <dependency>
        <groupId>jdk.tools</groupId>
        <artifactId>jdk.tools</artifactId>
        <version>1.6</version>
        <scope>system</scope>
        <systemPath>D:\Jars\tools-1.6.0.jar</systemPath>
    </dependency>
</dependencies>

检查一下。我成功地从Kafka到 cassandra 摄取了流数据。类似地,你可以将数据拉入javaRDD。

相关问题