row.key和row.value在spark结构流式代码中不起作用

klh5stk1  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(256)

我下面的代码是在spark structure streaming中从producer读取avro格式时给出错误row.key和row.value。请帮助解决这个问题。由于找不到row.key符号和row.value,因此出现错误。我想在spark中读取数据并将其写入hadoop系统中的cassandra,我发现这是在spark structure streaming中读取avro源数据的唯一方法。请让我知道,如果有任何其他方式读取Kafka数据从生产者在avro格式。

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.runtime.universe._
import scala.collection.JavaConverters._

object ReadKafkaAvro {

  case class DeserializedFromKafkaRecord(key: String, value: String)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder
      .appName("ReadKafkaAvro")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

      val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
      val topics = "b24_tx_financial_formatted_clean"
      val subjectValueName = topics + "-value"

    spark.sparkContext.setLogLevel("ERROR")
    val restService = new RestService(schemaRegistryURL)

    val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

    //Use Avro parsing classes to get Avro Schema
    val parser = new Schema.Parser
    val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

    //key schema is typically just string but you can do the same process for the key as the value
    val keySchemaString = "\"string\""
    val keySchema = parser.parse(keySchemaString)

    //Create a map with the Schema registry url.
    //This is the only Required configuration for Confluent's KafkaAvroDeserializer.
    val props = Map("schema.registry.url" -> schemaRegistryURL)
    val client = new CachedSchemaRegistryClient(schemaRegistryURL, 20)

    //Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
    var keyDeserializer: KafkaAvroDeserializer = null
    var valueDeserializer: KafkaAvroDeserializer = null

    //Create structured streaming DF to read from the topic.
    val rawTopicMessageDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093")
      .option("subscribe", topics)
       .option("specific.avro.reader", "true")
      .option("startingOffsets", "earliest")
      .option("group_id","b24_ptlf_eim_processing")
      .option("security.protocol","SSL")
      .option("ssl.keystore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\kafka-eim-dev.jks")
      .option("ssl.keystore.password","BW^1=|sY$j")
      .option("ssl.key.password","BW^1=|sY$j")
      .option("ssl.truststore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\cpbp-ca-dev.jks")
      .option("ssl.truststore.password","iB>3v$6m@9")//remove for prod
      .load()

    //instantiate the SerDe classes if not already, then deserialize!
    val deserializedTopicMessageDS = rawTopicMessageDF.map{
      row =>
        if (keyDeserializer == null) {
          keyDeserializer = new KafkaAvroDeserializer
          keyDeserializer.configure(props.asJava, true)  //isKey = true
        }
        if (valueDeserializer == null) {
          valueDeserializer = new KafkaAvroDeserializer
          valueDeserializer.configure(props.asJava, false) //isKey = false
        }

        //Pass the Avro schema.
        val deserializedKeyString = keyDeserializer.deserialize(topics, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
      val deserializedValueJsonString = valueDeserializer.deserialize(topics, row.value, topicValueAvroSchema).toString

        DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
    }

    val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()

  }
}
lx0bsm1f

lx0bsm1f1#

我发现它不容易阅读Kafkaavro格式的数据。我用twitter双射开发了spark流的代码,但是我得到了反转字节错误的建议。

Error : Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.twitter.bijection.InversionFailure: Failed to invert: [B@5335860

我使用的新代码:

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.{KafkaAvroDecoder, KafkaAvroDeserializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import org.apache.avro.hadoop.io.AvroDeserializer
import org.apache.commons.codec.StringDecoder

object ReadKafkaAvro1 {

   object Injection {

  val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
 val topics = "b24_tx_financial_formatted_clean"
   val subjectValueName = topics + "-value"
    val restService = new RestService(schemaRegistryURL)
    val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
  val parser = new Schema.Parser()
 //   val schema = parser.parse(getClass.getResourceAsStream("src\\main\\resources\\b24_tx_financial_formatted_clean.avsc"))
    val schema = parser.parse((valueRestResponseSchema.getSchema))
        val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
 }

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("ReadKafkaAvro").setMaster("local[*]")
    val streamingCtx = new StreamingContext(conf,Seconds(30))
    val schemaRegistryURL1 = "http://vtorppsdv01.corp.moneris.com:8081"
    val topics = Array("b24_tx_financial_formatted_clean")

    streamingCtx.sparkContext.setLogLevel("ERROR")

    val kafkaParms = Map[String,Object]("bootstrap.servers" -> "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
      "group.id" -> "b24_ptlf_eim_processing" ,
      "auto.offset.reset" -> "earliest",
      "auto.commit.interval.ms" -> "2000",
      "schema.registry.url" -> schemaRegistryURL1,
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "security.protocol" -> "SSL",
      "ssl.keystore.location" -> "C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\kafka-eim-dev.jks",
      "ssl.keystore.password" -> "BW^1=|sY$j",
      "ssl.key.password" -> "BW^1=|sY$j",
      "ssl.truststore.location" -> "C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\cpbp-ca-dev.jks",
      "ssl.truststore.password" -> "iB>3v$6m@9",
      "ssl.keystore.type" -> "JCEKS",
      "ssl.truststore.type" -> "JCEKS",
      "specific.avro.reader" -> "True"
    )

    val inputStream = KafkaUtils.createDirectStream[String,Array[Byte]](streamingCtx,PreferConsistent,Subscribe[String,Array[Byte]](topics,kafkaParms))

    val recordStream = inputStream.map(msg => Injection.injection.invert(msg.value()).get)
   // .map(record => (record.get("AuthorizationTransactionSource"),record.get("AuthorizationTransactionSourceID")))

    inputStream.map(x => (x.key,x.value)).print()

    //recordStream.print()

    recordStream.print()

    streamingCtx.start()
    streamingCtx.awaitTermination()

  }
}

相关问题