我使用下面的Java Spark代码连接到NATS。
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar")
.config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "newstream")
.option("nats.stream.subjects", "newsub")
.option("nats.durable.name", "cons1")
.option("nats.msg.ack.wait.secs", 120)
.load();
字符串
我在创建sparkSession时使用的2个外部jar文件位于“libs”文件夹下,并已添加到类路径中
.config("spark.jars","libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar")
型
的数据
的
当我从Eclipse IDE运行时,这段代码运行良好。现在我使用maven pom.xml构建一个jar:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.delta/delta-core -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.3.0</version>
</dependency>
<!-- *** COMMENT START **********
<dependency>
<groupId>external.group</groupId>
<artifactId>nats-spark-connector-balanced_2.12</artifactId>
<version>1.1.4</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/nats-spark-connector-balanced_2.12-1.1.4.jar</systemPath>
</dependency>
<dependency>
<groupId>external.group</groupId>
<artifactId>jnats</artifactId>
<version>2.17.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/jnats-2.17.1.jar</systemPath>
</dependency>
*** COMMENT END ********** -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.5.7</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<mainClass>com.optiva.MinIOTester</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
型
当我通过在类路径中提供libs文件夹(带有2个外部jar)来运行生成的jar时,
java -cp "../libs/*.jar" -jar spark-learning-0.0.1-SNAPSHOT.jar
型
我得到下面的错误:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
Caused by: java.lang.ClassNotFoundException:
Failed to find data source: nats. Please find packages at
https://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:157)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144)
at com.test.MinIOTester.sparkNatsTesterNewOnLocal(MinIOTester.java:387)
at com.test.MinIOTester.main(MinIOTester.java:31)
... 8 more
Caused by: java.lang.ClassNotFoundException: nats.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:151)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
型
如果我看到错误Caused by:java.lang.ClassNotFoundException:nats.DefaultSource,看起来它没有使用我们在运行java -cp "../libs/*.jar"
命令时添加到类路径的2个外部jar。我尝试为外部jar文件夹提供绝对路径,甚至jar名称。但仍然得到相同的错误。我错过了什么?
1条答案
按热度按时间ukdjmx9f1#
它通过在类路径中传递外部依赖项,使用spark-submit命令成功运行
字符串
感谢@JoachimSauer的提示“-cp在使用-jar时被忽略,因为只有jar文件中指定的类路径才会被使用”。在我之前使用java -jar的命令中,-cp被忽略了。