当您每天收到包含所有数据(旧数据和新数据)的xml或csv文件时,如何只处理新数据

bogh5gae  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(356)

我每天都会收到一个包含所有数据(旧数据和新数据)的xml或csv文件。例如,如果昨天.xml包含3条记录,那么今天.xml包含4条记录(3条旧记录和1条新记录)。
我只关心最后一行(新行),因为我只想处理新数据,旧数据每天都在处理。
使用spark和kafka实现这一点的最佳方法是什么?数据示例:

OpportunityNo, OpprotunityTitle,Field
--- yesterday data----
Row1:1,OppTit1,IT
Row2:2,OppTit2,HEALTH
Row3:3,OppTit3,Finance
-------today data---------
Row4:4,OppTit4,Engineering

附加说明:
这个文件很大。i、 e.处理该文件的成本很高,特别是它可能在today.xml中包含与昨天的数据相关的第二行的更新,但由于该文件发生了更新,因此应将其视为今天的更新。
我需要将新的(和更新的)数据写入数据库(作为接收器)。
所有这些的目的都是向有相同专业的人推荐新的记录。i、 e如果我的专业是it,那么在登录时,我会得到推荐机会1,一旦我打开这个机会1,我会被移到一个历史推荐表中,这样,任何新的数据都会被插入到一个推荐表中,一旦打开就会移到一个历史推荐表中,用户可以在那里得到并看到它。
如果除了处理xml之外没有别的方法,我想知道您对如何设计它的建议。
下面是我的代码,但是它把每个新文件上的所有数据都放到目录下,我只想得到新的数据而不是所有的数据。

import org.apache.log4j._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ StructType, StructField, IntegerType, DoubleType, StringType, TimestampType, DateType }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger

object Demo {
    def main(args: Array[String]) {
            Logger.getLogger("org").setLevel(Level.ERROR)

            val conf = new SparkConf()
            conf.set("spark.app.name", "GrantAnalytics")
            conf.set("spark.master", "local")
            val sc = new SparkContext(conf)

            val spark = SparkSession.builder().appName("GrantAnalytics").master("local[*]").getOrCreate()

            spark.conf.set("spark.sql.shuffle.partitions", 5)
            val schema = new StructType(Array(
              new StructField("OpportunityID", IntegerType, true),
              new StructField("OpportunityTitle", StringType, true),
              new StructField("OpportunityNumber", StringType, true),
              new StructField("CFDANumbers", DoubleType, true),
              new StructField("CategoryOfFundingActivity", StringType, true)))

            val streamingDF = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).option("header", "true").format("csv").load("C:/datasets/output/*.csv")

            val query = streamingDF.select(concat(col("OpportunityID"), lit("~"), col("OpportunityTitle"), lit("~"), col("OpportunityNumber"), lit("~"), col("CFDANumbers"), lit("~"), col("CategoryOfFundingActivity")).alias("value")).writeStream.format("kafka").outputMode(OutputMode.Update()).option("kafka.bootstrap.servers", "localhost:9092").option("topic", "grants").option("checkpointLocation", "C:/deleteme/kafka/").start()
            query.awaitTermination()
  }
}
vpfxa7rd

vpfxa7rd1#

如果你想和Kafka一起工作,那么在设计你的应用程序的时候,最好考虑到个人和独立的事件。
这意味着,您应该将xml拆分成行,并通过数据/时间戳对其进行过滤,以将每条消息发送给kafka。在kafka中,每条消息都只包含示例数据中的一行。最后,在第二天之后,Kafka应该包含以下四个信息:

1,OppTit1,IT
2,OppTit2,HEALTH
3,OppTit3,Finance
4,OppTit4,Engineering

另外,一定要为Kafka的信息应用一个有用的键。
一旦数据作为单个事件在kafka中可用,就可以使用spark消费和处理仅引用当前日期的新消息,因为以前消费的消息将不会再次被消费。

相关问题