我有一个kafka客户机,它在一个主题中轮询记录并将其存储为 consumerRecords: ConsumerRecords[String, String]
. 我想遍历每个记录并编写 (offset, value)
作为 (k, v)
到hbase表。我正试图通过spark将这些记录并行化,以便将其Map到 RDD
以便批量插入到hbase。
val hbaseTable: String = "/app/raphattack/TEST"
val conf: Configuration = HBaseConfiguration.create()
val admin: Admin = ConnectionFactory.createConnection(conf).getAdmin
val connection: Connection = ConnectionFactory.createConnection(admin.getConfiguration)
val table: Table = connection.getTable(TableName.valueOf(hbaseTable))
val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, table)
val spark: SparkSession = SparkSession.builder.enableHiveSupport.getOrCreate
val records: RDD[ConsumerRecord[String, String]] = spark.sparkContext.parallelize(consumerRecords.toSeq)
val rdd: RDD[(ImmutableBytesWritable, KeyValue)] = records.map(record => {
val kv: KeyValue = new KeyValue(Bytes.toBytes(record.offset()), "cf".getBytes(), "c1".getBytes(), s"${record.value}".getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(record.offset())), kv)
})
rdd.saveAsNewAPIHadoopFile("/tmp/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
我遇到了一个例外:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it.
Exception during serialization: java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 14691347, timestamp = 0, producer = null, key = 1, value = {"id":1.0,"name":"test"}))
有没有可能 ConsumerRecord
对象可序列化?如果没有,我如何在不牺牲对hbase的写入速度的情况下遍历记录?
1条答案
按热度按时间2ul0zpep1#
在unittest中我也在尝试做同样的事情。
实际上,您需要在sparkconf上设置一个序列化程序