将kafka主题中的所有数据复制到接收器(文件或配置单元表)的最推荐方法是什么?

mccptt67  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(164)

我正在使用kafka使用者api将kafka主题中的所有数据复制到一个配置单元表中。为了做到这一点,我使用hdfs作为中间步骤。我使用一个唯一的组id并将偏移量重置为“earliest”,以便从一开始就获取所有数据,并在执行之后忽略提交。然后,我迭代kafka主题中的记录,并将每个记录保存到hdfs中的临时文件中。然后我使用spark从hdfs读取数据,并使用日期作为文件名将其保存到一个parquet文件中。然后在配置单元表中创建一个带有日期的分区,最后将文件作为分区加载到parquet中。
正如您在下面的代码中看到的,我使用了几个中间步骤,这使得我的代码远远不是最佳的。这是从Kafka主题复制所有数据的最推荐的方法吗?我做了一些研究,到目前为止,这是我设法得到工作的变通办法,然而,随着记录量每天增加,我的执行时间达到了可容忍的极限(从20分钟到6小时,两周)。
代码如下:

def start( lowerDate: String, upperDate: String )={

    // Configurations for kafka consumer
    val conf = ConfigFactory.parseResources("properties.conf")
    val brokersip = conf.getString("enrichment.brokers.value")
    val topics_in = conf.getString("enrichment.topics_in.value")

    // Crea la sesion de Spark
    val spark = SparkSession
      .builder()
      .master("yarn")
      .appName("ParaTiUserXY")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._

    val properties = new Properties
    properties.put("key.deserializer", classOf[StringDeserializer])
    properties.put("value.deserializer", classOf[StringDeserializer])
    properties.put("bootstrap.servers", brokersip)
    properties.put("auto.offset.reset", "earliest")
    properties.put("group.id", "ParaTiUserXYZZ12345")

    //Schema para transformar los valores del topico de Kafka a JSON
    val my_schema = new StructType()
        .add("longitudCliente", StringType)
        .add("latitudCliente", StringType)
        .add("dni", StringType)
        .add("alias", StringType)
        .add("segmentoCliente", StringType)
        .add("timestampCliente", StringType)
        .add("dateCliente", StringType)
        .add("timeCliente", StringType)
        .add("tokenCliente", StringType)
        .add("telefonoCliente", StringType)

    val consumer = new KafkaConsumer[String, String](properties)
    consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents")   )

    val fs = {
      val conf = new Configuration()
      FileSystem.get(conf)
    }

    val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")
    if( fs.exists(temp_path)){
      fs.delete(temp_path, true)
    }

    while(true)
    {
        val records=consumer.poll(100)
        for (record<-records.asScala){
            val data = record.value.toString
            //println(data)
            val dataos: FSDataOutputStream = fs.create(temp_path)
            val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))
            bw.append(data)
            bw.close
            val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")
            val fechaCliente = data_schema.select("dateCliente").first.getString(0)

            if( fechaCliente < upperDate && fechaCliente >= lowerDate){
                data_schema.select("longitudCliente", "latitudCliente","dni", "alias", 
                "segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",
                "tokenCliente", "telefonoCliente")
                   .coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)

            }
            else if( fechaCliente < lowerDate){
                //
            }
            else if( fechaCliente >= upperDate){
              break;
            }

        }
   }

      consumer.close()

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题