在java中向cassandra表写入Dataframe

cwxwcias  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(361)

找不到我需要的东西。scala和python中的大量代码。以下是我所拥有的:

  1. import org.apache.log4j.Logger;
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Row;
  4. public class CassandraWriter {
  5. private transient Logger logger = Logger.getLogger(CassandraWriter.class);
  6. private Dataset<Row> hdfsDF;
  7. public CassandraWriter(Dataset<Row> dataFrame) {
  8. hdfsDF = dataFrame;
  9. }
  10. public void writeToCassandra(String tableName, String keyspace) {
  11. logger.info("Writing DataFrame to table: " + tableName);
  12. hdfsDF.write().format("org.apache.spark.sql.cassandra").mode("overwrite")
  13. .option("table",tableName)
  14. .option("keyspace",keyspace)
  15. .save();
  16. logger.info("Inserted DataFrame to Cassandra successfully");
  17. }
  18. }

运行时出现的错误是:

  1. Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at http://spark.apache.org/third-party-projects.html

你知道吗?

tpxzln5u

tpxzln5u1#

您需要确保spark cassandra连接器包含在您提交的结果jar中。
这可以通过构建所谓的fatjar来完成,并提交它。例如,这里是示例(这里是完整的pom):

  1. ...
  2. <properties>
  3. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  4. <scala.version>2.11.12</scala.version>
  5. <spark.version>2.4.4</spark.version>
  6. <spark.scala.version>2.11</spark.scala.version>
  7. <scc.version>2.4.1</scc.version>
  8. <java.version>1.8</java.version>
  9. </properties>
  10. <dependencies>
  11. <dependency>
  12. <groupId>com.datastax.spark</groupId>
  13. <artifactId>spark-cassandra-connector_${spark.scala.version}</artifactId>
  14. <version>${scc.version}</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.spark</groupId>
  18. <artifactId>spark-sql_${spark.scala.version}</artifactId>
  19. <version>${spark.version}</version>
  20. <scope>provided</scope>
  21. </dependency>
  22. </dependencies>
  23. ...
  24. <build>
  25. <plugins>
  26. <plugin>
  27. <groupId>org.apache.maven.plugins</groupId>
  28. <artifactId>maven-assembly-plugin</artifactId>
  29. <version>3.2.0</version>
  30. <configuration>
  31. <descriptorRefs>
  32. <descriptorRef>jar-with-dependencies</descriptorRef>
  33. </descriptorRefs>
  34. </configuration>
  35. <executions>
  36. <execution>
  37. <phase>package</phase>
  38. <goals>
  39. <goal>single</goal>
  40. </goals>
  41. </execution>
  42. </executions>
  43. </plugin>
  44. </plugins>
  45. </build>

或者您可以指定spark cassandra连接器作为封装通过 --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.2

展开查看全部

相关问题