nats-spark-connector with Java giving an error

aydmsdu9  于 2023-11-22  发布在  Apache
关注(0)|答案(1)|浏览(214)

我使用nats-spark-connector(https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced)连接到NATS Jetstream并使用Spark Java代码消费消息和流程。

private static void sparkNatsTester() {
        SparkSession spark = SparkSession.builder()
                .appName("spark-with-nats")
                .master("local")
//              .config("spark.logConf", "false")
                  .config("spark.jars",
                  "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
                  )
//                .config("spark.executor.instances", "2")
//                .config("spark.cores.max", "4")
//                .config("spark.executor.memory", "2g")
                  .getOrCreate();
        System.out.println("sparkSession : "+ spark);
        Dataset<Row> df = spark.readStream()
                .format("nats")
                .option("nats.host", "localhost")
                .option("nats.port", 4222)
                .option("nats.stream.name", "my_stream")
                .option("nats.stream.subjects", "my_sub")
                // wait 90 seconds for an ack before resending a message
                .option("nats.msg.ack.wait.secs", 1)
                //.option("nats.num.listeners", 2)
                // Each listener will fetch 10 messages at a time
               // .option("nats.msg.fetch.batch.size", 10)
                .load();
        System.out.println("Successfully read nats stream !");
        
        StreamingQuery query;
        try {
            query = df.writeStream()
                      .outputMode("append")
                      .format("console")
                      .start();
            query.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }

字符串
成功打印spark session对象和“成功读取nats流!“,然后打印下面两行,然后给出异常

Successfully read nats stream !
Status change nats: connection opened
Status change nats: connection closed


例外是Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
完全例外如下:

Exception in thread "stream execution thread for [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
=== Streaming Query ===
Identifier: [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.execution.streaming.ConsoleTable$@16cfe41b, 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, Append
+- StreamingExecutionRelation natsconnector.spark.NatsStreamingSource@4b0f9a63, [subject#3, dateTime#4, content#5]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
    at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
    at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:548)
    at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1915)
    at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1911)
    at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1917)
    at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1876)
    at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1835)
    at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:315)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.listBatches(HDFSMetadataLog.scala:327)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:265)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:253)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
    ... 4 more


下面是spark相关依赖项的pom.xml代码段:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <spark.version>3.5.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>


我在执行上面的方法之前设置下面的属性,没有它就会出错:

System.setProperty("hadoop.home.dir", "C:\\Program Files\\Hadoop\\winutils-master\\hadoop-3.3.1\\");


另外,我已经设置了HADOOP_HOME环境变量,并根据前面的问题建议将其bin文件夹添加到Path中。
在maven依赖项中,我可以看到hadoop相关的jar版本是3.3.4。我试图将其版本与winutils-masterhadoop版本匹配,但仍然得到相同的错误。请告诉我如何修复此错误。

dtcbnfnu

dtcbnfnu1#

在添加HADOOP_HOME环境变量并将其bin文件夹添加到Path后,我没有重新启动系统。重新启动后,上述错误消失了!然而,我得到了另一个错误java.lang.NoSuchMethodError,这与spark版本不匹配有关,在我将spark版本更改为<spark.version>3.3.3</spark.version>后得到了解决。终于能够从Nats JetStream中获取消息到我的spark代码中!

相关问题