pyspark 用Python设计数据质量检查应用程序

ztmd8pv5  于 2023-04-29  发布在  Spark
关注(0)|答案(1)|浏览(207)

我正在开发一个应用程序,它可以对输入文件执行数据质量检查,并根据数据中报告的DQ故障捕获计数。我使用的方法是否有意义,或者会推荐更好的方法来做到这一点?
我正在尝试用Python编写一个应用程序,它将捕获数据中的DQ错误并收集计数。我本可以使用Pandas,Numpy来实现这一点,然而,由于数据量巨大~100 GB,我决定通过Spark来实现。这是我的第三个Python应用程序,所以虽然我可以用它写代码,但如果这真的是最佳的方法,我就不知道了。
所以总结一下,我正在阅读多个CSV文件,并在上面创建一个单一的Parquet文件,然后创建一个临时视图,我可以查询以查找DQ问题。然后,我将查询结果捕获到一个变量中,然后将其写入一个列表。此列表稍后用于编写CSV,该CSV将成为 Jmeter 板报告的输入。代码如下。

# Importing required libraries
import time,datetime
from pyspark.sql import SparkSession

# Initiating Spark Session
spark = SparkSession.builder.appName("DQ-checks").getOrCreate()

# Initializing Variables
time1 = datetime.datetime.now()
src_file_01 = r'\All kinds of data files\craigslistVehicles.csv'
target_filename = r'\All kinds of data files\craigslistVehicles.parquet'

# Read the CSV file through Spark
df = spark.read.csv(src_file_01, header="true", inferSchema="true")

# Update code to make it flexible enough to read multiple files

# Write the contents of the CSV file into a Parquet file
df.write.format("parquet").save(target_filename, mode="Overwrite")
print("created a parquet file")

# Create a temporary view over the Parquet file to query data
df2 = spark.read.parquet(target_filename)
df2.createOrReplaceTempView("craigslistVehicles")

# Create a column list from the header of the Spark View
column_list = df2.columns
print(column_list)

# Capturing time before start of the query for benchmarking
time2 = datetime.datetime.now()
result_store = []
# Iterate through all the columns and capture null counts for each column
rule_type = 'Null Check'
results={}
for column_names in column_list:
    query = "Select count(*) from craigslistVehicles where {} is null".format(column_names)
#    print(query)
    df3 = spark.sql(query).collect()
    for i in df3:
        results.update(i.asDict())
        res_in_num=results['count(1)']
    result_store=[rule_type,column_names,res_in_num]
    print (result_store)

# Next Steps - Update code to add more data quality checks based on requirement.

# Next Steps - Insert results of the queries into a spark table that can be used as a log and becomes an input for a dashboard report.

# Capturing time after end of the query for benchmarking
time3 = datetime.datetime.now()
print("Query time is.{}",time3-time2)
print("Total Job run time is.{}",time3-time1)

# Spark Session Stop
spark.stop()

目前,这是可行的。我可以处理1。1 GB文件在一分钟内。
我的问题是-
这个设计有意义吗?如果你必须这么做你会怎么做?有什么明显的地方我可以改变,使代码更干净?

xqkwcwgp

xqkwcwgp1#

您应该在您的环境中考虑以下strategies for data quality:在ETL过程中嵌入质量检查、与警报系统集成、实现记录级和批处理级检查以及创建分析检查代码生成工具。那你就成功了。

相关问题