我使用nats-spark-connector(https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced)连接到NATS Jetstream并使用Spark Java代码消费消息和流程。
private static void sparkNatsTester() {
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
// .config("spark.logConf", "false")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
)
// .config("spark.executor.instances", "2")
// .config("spark.cores.max", "4")
// .config("spark.executor.memory", "2g")
.getOrCreate();
System.out.println("sparkSession : "+ spark);
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "my_stream")
.option("nats.stream.subjects", "my_sub")
// wait 90 seconds for an ack before resending a message
.option("nats.msg.ack.wait.secs", 1)
//.option("nats.num.listeners", 2)
// Each listener will fetch 10 messages at a time
// .option("nats.msg.fetch.batch.size", 10)
.load();
System.out.println("Successfully read nats stream !");
StreamingQuery query;
try {
query = df.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
字符串
成功打印spark session对象和“成功读取nats流!“,然后打印下面两行,然后给出异常
Successfully read nats stream !
Status change nats: connection opened
Status change nats: connection closed
型
例外是Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
完全例外如下:
Exception in thread "stream execution thread for [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
=== Streaming Query ===
Identifier: [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.execution.streaming.ConsoleTable$@16cfe41b, 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, Append
+- StreamingExecutionRelation natsconnector.spark.NatsStreamingSource@4b0f9a63, [subject#3, dateTime#4, content#5]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:548)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1915)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1911)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1917)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1876)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1835)
at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:315)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.listBatches(HDFSMetadataLog.scala:327)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:265)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:253)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
... 4 more
型
下面是spark相关依赖项的pom.xml代码段:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<spark.version>3.5.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
型
我在执行上面的方法之前设置下面的属性,没有它就会出错:
System.setProperty("hadoop.home.dir", "C:\\Program Files\\Hadoop\\winutils-master\\hadoop-3.3.1\\");
型
另外,我已经设置了HADOOP_HOME环境变量,并根据前面的问题建议将其bin文件夹添加到Path中。
在maven依赖项中,我可以看到hadoop相关的jar版本是3.3.4。我试图将其版本与winutils-masterhadoop版本匹配,但仍然得到相同的错误。请告诉我如何修复此错误。
1条答案
按热度按时间dtcbnfnu1#
在添加HADOOP_HOME环境变量并将其bin文件夹添加到Path后,我没有重新启动系统。重新启动后,上述错误消失了!然而,我得到了另一个错误
java.lang.NoSuchMethodError
,这与spark版本不匹配有关,在我将spark版本更改为<spark.version>3.3.3</spark.version>
后得到了解决。终于能够从Nats JetStream中获取消息到我的spark代码中!