批量插入到hbase:consumerrecord不可序列化

k4emjkb1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(378)

我有一个kafka客户机,它在一个主题中轮询记录并将其存储为 consumerRecords: ConsumerRecords[String, String] . 我想遍历每个记录并编写 (offset, value) 作为 (k, v) 到hbase表。我正试图通过spark将这些记录并行化,以便将其Map到 RDD 以便批量插入到hbase。

val hbaseTable: String = "/app/raphattack/TEST"
val conf: Configuration = HBaseConfiguration.create()
val admin: Admin = ConnectionFactory.createConnection(conf).getAdmin
val connection: Connection = ConnectionFactory.createConnection(admin.getConfiguration)
val table: Table = connection.getTable(TableName.valueOf(hbaseTable))

val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, table)

val spark: SparkSession = SparkSession.builder.enableHiveSupport.getOrCreate
val records: RDD[ConsumerRecord[String, String]] = spark.sparkContext.parallelize(consumerRecords.toSeq)

val rdd: RDD[(ImmutableBytesWritable, KeyValue)] = records.map(record => {
  val kv: KeyValue = new KeyValue(Bytes.toBytes(record.offset()), "cf".getBytes(), "c1".getBytes(), s"${record.value}".getBytes())
  (new ImmutableBytesWritable(Bytes.toBytes(record.offset())), kv)
})

rdd.saveAsNewAPIHadoopFile("/tmp/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)

我遇到了一个例外:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. 
Exception during serialization: java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 14691347, timestamp = 0, producer = null, key = 1, value = {"id":1.0,"name":"test"}))

有没有可能 ConsumerRecord 对象可序列化?如果没有,我如何在不牺牲对hbase的写入速度的情况下遍历记录?

2ul0zpep

2ul0zpep1#

在unittest中我也在尝试做同样的事情。
实际上,您需要在sparkconf上设置一个序列化程序

.set("spark.serializer", classOf[KryoSerializer].getName)

相关问题