我下面的代码是在spark structure streaming中从producer读取avro格式时给出错误row.key和row.value。请帮助解决这个问题。由于找不到row.key符号和row.value,因此出现错误。我想在spark中读取数据并将其写入hadoop系统中的cassandra,我发现这是在spark structure streaming中读取avro源数据的唯一方法。请让我知道,如果有任何其他方式读取Kafka数据从生产者在avro格式。
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.runtime.universe._
import scala.collection.JavaConverters._
object ReadKafkaAvro {
case class DeserializedFromKafkaRecord(key: String, value: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ReadKafkaAvro")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectValueName = topics + "-value"
spark.sparkContext.setLogLevel("ERROR")
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
val client = new CachedSchemaRegistryClient(schemaRegistryURL, 20)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093")
.option("subscribe", topics)
.option("specific.avro.reader", "true")
.option("startingOffsets", "earliest")
.option("group_id","b24_ptlf_eim_processing")
.option("security.protocol","SSL")
.option("ssl.keystore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\kafka-eim-dev.jks")
.option("ssl.keystore.password","BW^1=|sY$j")
.option("ssl.key.password","BW^1=|sY$j")
.option("ssl.truststore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\cpbp-ca-dev.jks")
.option("ssl.truststore.password","iB>3v$6m@9")//remove for prod
.load()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topics, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueJsonString = valueDeserializer.deserialize(topics, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination()
}
}
1条答案
按热度按时间lx0bsm1f1#
我发现它不容易阅读Kafkaavro格式的数据。我用twitter双射开发了spark流的代码,但是我得到了反转字节错误的建议。
我使用的新代码: