下面是spark ignite dataframe的一个简单示例,我得到了以下错误。
这个用例很简单,我想将读取的文件转换为df并将其写入ignite,稍后从ignite读取相同的数据
构建.sbt
name := "spark-word-count"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.ignite" % "ignite-spark" % "2.8.1"
)
字数.scala
val linesRDD = sparkSession.sparkContext.textFile(filesPath)
val filesProcessed = sparkSession.sparkContext.wholeTextFiles(filesPath).count()
val filteredLinesRDD = if (cacheType.isDefined) {
getFilteredRDD(linesRDD).persist(cacheType.get)
} else {
getFilteredRDD(linesRDD)
}
val processedLines = filteredLinesRDD.count()
sparkSession.createDataFrame(getPhraseSizesRDD(filteredLinesRDD))
.toDF("Phrase", "Size")
.write.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, igniteConfigFile)
.option(OPTION_TABLE, "PhraseSize")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "Phrase")
.option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1")
.option(OPTION_STREAMER_ALLOW_OVERWRITE, "true")
.mode(SaveMode.Append)
.save()
错误堆栈跟踪:
Exception in thread "main" java.lang.NoClassDefFoundError: org/h2/index/BaseIndex
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.ignite.internal.util.IgniteUtils.inClassPath(IgniteUtils.java:1727)
at org.apache.ignite.internal.IgniteComponentType.inClassPath(IgniteComponentType.java:160)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.initializeDefaultSpi(IgnitionEx.java:2480)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.initializeConfiguration(IgnitionEx.java:2328)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1697)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:637)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:578)
at org.apache.ignite.Ignition.getOrStart(Ignition.java:412)
at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:150)
at org.apache.ignite.spark.IgniteContext.<init>(IgniteContext.scala:63)
at org.apache.ignite.spark.IgniteContext$.apply(IgniteContext.scala:192)
at org.apache.ignite.spark.impl.IgniteRelationProvider.igniteContext(IgniteRelationProvider.scala:248)
at org.apache.ignite.spark.impl.IgniteRelationProvider.createRelation(IgniteRelationProvider.scala:102)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
移除点火索引解决了上述错误,但最终会出现不同的错误:
libraryDependencies ++= Seq(
("org.apache.ignite" % "ignite-spark" % "2.8.1")
.exclude("org.apache.ignite", "ignite-indexing")
.exclude("com.h2database", "h2"),
"com.h2database" % "h2" % "1.4.197"
)
错误:
20/06/01 23:29:41 ERROR WordCountMain: Error executing WordCountJobMain: WordCountJob is failure
class org.apache.ignite.IgniteException: Failed to execute query because indexing is disabled (consider adding module ignite-indexing to classpath or moving it from 'optional' to 'libs' folder).
at org.apache.ignite.internal.processors.query.GridQueryProcessor.checkxEnabled(GridQueryProcessor.java:2217)
at org.apache.ignite.internal.processors.query.GridQueryProcessor.querySqlFields(GridQueryProcessor.java:2388)
at org.apache.ignite.internal.processors.query.GridQueryProcessor.querySqlFields(GridQueryProcessor.java:2323)
at org.apache.ignite.internal.processors.query.GridQueryProcessor.querySqlFields(GridQueryProcessor.java:2296)
at org.apache.ignite.spark.impl.QueryHelper$.createTable(QueryHelper.scala:64)
at org.apache.ignite.spark.impl.IgniteRelationProvider.createRelation(IgniteRelationProvider.scala:160)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at main.scala.WordCountJob$.doRun(WordCountJob.scala:43)
暂无答案!
目前还没有任何答案,快来回答吧!