从Spark写入Cassandra时出现空指针异常

dy1byipe  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(196)

我正在使用spark-cassandra-connector-2.4.0-s_2.11将数据从spark写入数据库集群上的Cassandra。
我在从Spark向Cassandra写入数据时遇到了java.lang.NullPointerException。这在记录很少的情况下工作正常。
但当我尝试加载**~ 1.5亿**条记录时出现问题。
有人能帮我找到根本原因吗?
下面是代码片段:

val paymentExtractCsvDF = spark
                          .read
                          .format("csv")
                          .option("header", "true")
                          .load(/home/otl/extract/csvout/Payment)

    paymentExtractCsvDF.printSchema()

root
 |-- BAN: string (nullable = true)
 |-- ENT_SEQ_NO: string (nullable = true)
 |-- PYM_METHOD: string (nullable = true)

case class Payment(account_number: String, entity_sequence_number: String, payment_type: String)
val paymentResultDf = paymentExtractCsvDF.map(row => Payment(row.getAs("BAN"),
        row.getAs("ENT_SEQ_NO"),
        row.getAs("PYM_METHOD"))).toDF()

var paymentResultFilterDf = paymentResultDf
                            .filter($"account_number".isNotNull || $"account_number" != "")
                            .filter($"entity_sequence_number".isNotNull || $"entity_sequence_number" != "")

paymentResultFilterDf
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode("append")
  .options(Map( "table" -> "cassandratable", "keyspace" -> "cassandrakeyspace"))
  .save()

下面是我遇到的例外:

Failed to write statements to cassandrakeyspace.cassandratable. The
latest exception was
  An unexpected error occurred server side on /10.18.15.198:9042: java.lang.NullPointerException

Please check the executor logs for more exceptions and information

    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:243)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:241)
    at scala.Option.map(Option.scala:146)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:241)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/11/22 01:12:17 INFO CoarseGrainedExecutorBackend: Got assigned task 1095
19/11/22 01:12:17 INFO Executor: Running task 39.1 in stage 21.0 (TID 1095)
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Getting 77 non-empty blocks including 10 local blocks and 67 remote blocks
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Started 7 remote fetches in 3 ms
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Getting 64 non-empty blocks including 8 local blocks and 56 remote blocks
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Started 7 remote fetches in 1 ms
19/11/22 01:12:17 ERROR Executor: Exception in task 7.0 in stage 21.0 (TID 1012)
nfs0ujit

nfs0ujit1#

看起来你的数据框中的关键字段为空值。问题可能出在你的过滤条件中。我想你应该这样做:

var paymentResultFilterDf = paymentResultDf
                        .filter($"account_number".isNotNull && $"account_number" != "")
                        .filter($"entity_sequence_number".isNotNull && $"entity_sequence_number" != "")

相关问题