我正在使用spark-cassandra-connector-2.4.0-s_2.11将数据从spark写入数据库集群上的Cassandra。
我在从Spark向Cassandra写入数据时遇到了java.lang.NullPointerException。这在记录很少的情况下工作正常。
但当我尝试加载**~ 1.5亿**条记录时出现问题。
有人能帮我找到根本原因吗?
下面是代码片段:
val paymentExtractCsvDF = spark
.read
.format("csv")
.option("header", "true")
.load(/home/otl/extract/csvout/Payment)
paymentExtractCsvDF.printSchema()
root
|-- BAN: string (nullable = true)
|-- ENT_SEQ_NO: string (nullable = true)
|-- PYM_METHOD: string (nullable = true)
case class Payment(account_number: String, entity_sequence_number: String, payment_type: String)
val paymentResultDf = paymentExtractCsvDF.map(row => Payment(row.getAs("BAN"),
row.getAs("ENT_SEQ_NO"),
row.getAs("PYM_METHOD"))).toDF()
var paymentResultFilterDf = paymentResultDf
.filter($"account_number".isNotNull || $"account_number" != "")
.filter($"entity_sequence_number".isNotNull || $"entity_sequence_number" != "")
paymentResultFilterDf
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "table" -> "cassandratable", "keyspace" -> "cassandrakeyspace"))
.save()
下面是我遇到的例外:
Failed to write statements to cassandrakeyspace.cassandratable. The
latest exception was
An unexpected error occurred server side on /10.18.15.198:9042: java.lang.NullPointerException
Please check the executor logs for more exceptions and information
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:243)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:241)
at scala.Option.map(Option.scala:146)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:241)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/11/22 01:12:17 INFO CoarseGrainedExecutorBackend: Got assigned task 1095
19/11/22 01:12:17 INFO Executor: Running task 39.1 in stage 21.0 (TID 1095)
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Getting 77 non-empty blocks including 10 local blocks and 67 remote blocks
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Started 7 remote fetches in 3 ms
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Getting 64 non-empty blocks including 8 local blocks and 56 remote blocks
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Started 7 remote fetches in 1 ms
19/11/22 01:12:17 ERROR Executor: Exception in task 7.0 in stage 21.0 (TID 1012)
1条答案
按热度按时间nfs0ujit1#
看起来你的数据框中的关键字段为空值。问题可能出在你的过滤条件中。我想你应该这样做: