pyspark java.lang.ClassNotFoundException:找不到数据源:冰山,使用Java自定义目录时出现问题

fae0ux8s  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(150)

我们正在使用Java自定义目录与冰山。表是正确创建的,但我们得到一个问题,当我们插入数据。

  1. public String createCustomTable(String tableName) {
  2. try {
  3. TableIdentifier tableIdentifier = TableIdentifier.of(name(), tableName);
  4. Schema schema = readSchema(tableIdentifier);
  5. Map<String, String> properties = ImmutableMap.of(
  6. TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()
  7. );
  8. PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
  9. .identity(getPartitionKeyfromSchema(tableIdentifier.name()))
  10. .build();
  11. String tableLocation = defaultLocation + tableIdentifier.namespace().toString() + "/" + tableIdentifier.name();
  12. catalog.createTable(tableIdentifier, schema, partitionSpec, tableLocation, properties);
  13. catalog.loadTable(TableIdentifier.of(name(), tableName));
  14. return "Table created";
  15. } catch (Exception e) {
  16. return e.getMessage();
  17. }
  18. }
  19. public String insertData(String tableName, String csvPath) throws IOException {
  20. Table icebergTable = catalog.loadTable(TableIdentifier.of(name(), tableName));
  21. SparkSession spark = SparkSession.builder()
  22. .config("spark.master", "local")
  23. .getOrCreate();
  24. String headerJson = readHeaderJson(tableName);
  25. LOGGER.info("Header JSON for {}: {}", tableName, headerJson);
  26. String[] columns = headerJson.split(",");
  27. Dataset<Row> df = spark.read()
  28. .option("header", "false")
  29. .option("inferSchema", "false")
  30. .option("comment", "#")
  31. .option("sep", "|")
  32. .csv(csvPath)
  33. .toDF(columns);
  34. LOGGER.info("Actual columns: {}", Arrays.toString(df.columns()));
  35. for (String col : df.columns()) {
  36. df = df.withColumn(col, df.col(col).cast("string"));
  37. }
  38. df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());
  39. LOGGER.info("Data inserted successfully into table: {}", tableName);
  40. }

字符串
当我们通过Java中的主程序执行时,它可以完美地工作
然而,当我们创建一个jar并从Spark调用它时,它会给我们这个错误

  1. ERROR:root:Error: An error occurred while calling o0.insertData.
  2. : java.lang.ClassNotFoundException:
  3. Failed to find data source: iceberg. Please find packages at
  4. http://spark.apache.org/third-party-projects.html
  5. at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
  6. at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
  7. at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
  8. at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
  9. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  10. at com.xyz.catalog.CustomCatalog.insertData(CustomCatalog.java:178)
  11. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  12. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  13. at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  14. at java.base/java.lang.reflect.Method.invoke(Method.java:566)
  15. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  16. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  17. at py4j.Gateway.invoke(Gateway.java:282)
  18. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  19. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  20. at py4j.GatewayConnection.run(GatewayConnection.java:238)
  21. at java.base/java.lang.Thread.run(Thread.java:829)
  22. Caused by: java.lang.ClassNotFoundException: iceberg.DefaultSource
  23. at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
  24. at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
  25. at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
  26. at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
  27. at scala.util.Try$.apply(Try.scala:210)
  28. at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
  29. at scala.util.Failure.orElse(Try.scala:221)
  30. at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
  31. ... 16 more


任何帮助将不胜感激
我们正在尝试使用自定义目录在冰山表中创建和插入数据。我们希望插入能够正常工作

  1. df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());


然而,我们得到一个错误

2sbarzqh

2sbarzqh1#

此错误表明您知道您尝试使用的Datasheet(iceberg)不在Spark的运行时类路径上。这意味着您可能没有在作业提交中包含iceberg-spark-runtime包。
如何执行此操作的示例在入门指南中显示

  1. spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.1

字符串
这将向Spark Job的Driver和Executors添加依赖项。
类似的packages arg应该添加到您的Spark Job提交或运行Spark的方法的相应属性中。更多信息请参见Spark Docs中的spark.jars.packages

omvjsjqw

omvjsjqw2#

您正在创建一个新会话,而不是使用现有会话。

  1. SparkSession spark = SparkSession.builder()
  2. .config("spark.master", "local")
  3. .getOrCreate();

字符串
要么把它换成

  1. SparkSession spark = SparkSession.getActiveSession();


或者,无论出于何种原因,您需要一个新的会话,然后使用正确的选项创建它:

  1. SparkSession spark = SparkSession.builder()
  2. .config("spark.master", "local")
  3. .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.1")
  4. .getOrCreate();

展开查看全部

相关问题