使用PySpark将数据从Kafka主题写入HDFS时,无法实现所需的目录结构

zsohkypk  于 2023-06-20  发布在  HDFS
关注(0)|答案(2)|浏览(230)

我从Kafka主题中阅读数据,并使用PySpark将数据存储到HDFS。当我运行代码时,我试图在HDFS中实现以下目录结构:
user/hadoop/OUTPUT_FINAL1/年/月/日/小时/ csv/(所有csv文件为Parquet)images/(所有图像文件)videos/(所有视频文件)
然而,当我运行我的代码I时,在HDFS中看到的目录结构如下所示:
user/hadoop/OUTPUT_FINAL1/年/月/日/小时/
csv/file_type=csv/(csv数据)和file_type=image/(图像数据)
我的代码没有写入所需的目录结构,也没有写入视频文件。
CSV文件需要使用与扩展名parquet相同的名称编写。图像需要使用.jpg或.png扩展名编写,对于.mp4或. mov的视频也是如此。
这是我代码。

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyarrow.hdfs as hdfs
import os

# Configure logging
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("org.apache.kafka").setLevel(logging.WARN)

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("KafkaToHDFS") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .getOrCreate()

# Kafka broker configuration
kafka_server = "sandbox-hdf.hortonworks.com:6667"
kafka_topic = "airbnbdata-final"

# Read data from Kafka topic
df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", kafka_topic) \
    .load()

# Define a function to create the output directory
def create_directory(directory):
    current_time = spark.sql("SELECT current_timestamp()").collect()[0][0]

    # Extract year, month, day, and hour components
    year_str = current_time.strftime("%Y")
    month_str = current_time.strftime("%m")
    day_str = current_time.strftime("%d")
    hour_str = current_time.strftime("%H")
    fulldirectory = directory + "/" + year_str + "/" + month_str + "/" + day_str + "/" + hour_str + "/"
    return fulldirectory

# Define a function to process each batch
def process_batch(df, fulldirectory):
    schema = StructType().add("file_type", StringType()).add("file_name", StringType()).add("data", StringType())

    # Extract the key, value, and other columns
    extracted_df = df.selectExpr(
        "CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp", "timestampType"
    )

    # Parse the JSON string in the value column
    extracted_df = extracted_df.withColumn("value_json", from_json(col("value"), schema))

    # Extract fields from the value_json column
    extracted_df = extracted_df.withColumn("file_type", col("value_json.file_type"))
    extracted_df = extracted_df.withColumn("file_name", col("value_json.file_name"))
    extracted_df = extracted_df.withColumn("data", col("value_json.data"))

    extracted_df.printSchema()
    return extracted_df

# Define a custom function to write each file to HDFS
def write_file(row, fulldirectory):
    fs = hdfs.connect(host="sandbox-hdp.hortonworks.com", port=8020)
    file_name = row["file_name"]
    data = row["data"]

    hdfs_path = fulldirectory + file_name
    with fs.open(hdfs_path, "wb") as f:
        f.write(data)

# Define a function to process images and videos
def process_images_and_videos(df, fulldirectory):
    # Filter the dataframe to get only images and videos
    image_df = df.filter(col("file_type").isin(["jpg", "png"]))
    video_df = df.filter(col("file_type").isin(["mp4", "mov"]))

    # Write images to HDFS
    image_df.foreach(lambda row: write_file(row, fulldirectory + "images/"))

    # Write videos to HDFS
    video_df.foreach(lambda row: write_file(row, fulldirectory + "videos/"))

# Configure the directory path and create the output directory
directory_path = "hdfs://sandbox-hdp.hortonworks.com:8020/user/hadoop/OUTPUT_FINAL1"
fulldirectory = create_directory(directory_path)

# Process the batch
processed_df = process_batch(df, fulldirectory)

# Write each batch to HDFS as Parquet
processed_df.write \
    .format("parquet") \
    .mode("append") \
    .partitionBy("file_type", "file_name") \
    .parquet(fulldirectory + "csv/")

# Process images and videos
process_images_and_videos(processed_df, fulldirectory)

# Stop the SparkSession
spark.stop()

我使用以下命令在ambari沙箱环境ssh中运行代码:

spark-submit --jars /tmp/Airbnb_Data/jsr305-3.0.2.jar,/tmp/Airbnb_Data/snappy-java-1.1.8.4.jar,/tmp/Airbnb_Data/kafka-clients-3.3.2.jar,/tmp/Airbnb_Data/spark-sql-kafka-0-10_2.11-2.4.0.jar,/tmp/Airbnb_Data/commons-logging-1.1.3.jar,/tmp/Airbnb_Data/spark-streaming-kafka-0-10_2.12-3.4.0.jar,/tmp/Airbnb_Data/hadoop-client-api-3.3.4.jar,/tmp/Airbnb_Data/lz4-java-1.8.0.jar,/tmp/Airbnb_Data/hadoop-client-runtime-3.3.4.jar,/tmp/Airbnb_Data/scala-library-2.12.17.jar,/tmp/Airbnb_Data/spark-tags_2.12-3.4.0.jar,/tmp/Airbnb_Data/spark-token-provider-kafka-0-10_2.12-3.4.0.jar,/tmp/Airbnb_Data/slf4j-api-1.7.36.jar KafkaToHDFS.py

下面是我想要的目录结构:

user/hadoop/OUTPUT_FINAL1/
    ├── year/
    │   ├── month/
    │   │   ├── day/
    │   │   │   ├── hour/
    │   │   │   │   ├── csv/
    │   │   │   │   │   └── <csv parquet files>
    │   │   │   │   ├── images/
    │   │   │   │   │   └── <image files>
    │   │   │   │   └── videos/
    │   │   │   │       └── <video files>
h7wcgrx3

h7wcgrx31#

Spark无法写入二进制图像或视频数据。
在Spark foreach中调用pyarrow函数并不是真正使用Spark文件系统编写器。另外,我相信pyarrow是在HDP 2.6.5支持结束后发布的,所以我认为该功能不会工作。
关于目录结构,让Spark为您做这件事,通过分区单个日期列,而不是收集批次并提取一个元素。
然后您将获得year=xxxx/month=xx/day=xx/hour=xx/filetype=xxx/数据
图像需要使用.jpg或.png扩展名编写,对于.mp4或.mov的视频也是如此。
HDFS不是存放此类文件的好位置。对媒体资产使用blob存储

x759pob2

x759pob22#

查看您的代码,我发现需要解决一些问题,以实现所需的目录结构和文件扩展名。以下是您可以进行的修改:
1.修改write_file函数:更新write_file函数以使用适当的文件扩展名保存文件。可以按如下方式修改该函数:

def write_file(row, fulldirectory):
    fs = hdfs.connect(host="sandbox-hdp.hortonworks.com", port=8020)
    file_type = row["file_type"]
    file_name = row["file_name"]
    data = row["data"]

    if file_type == "csv":
        file_name += ".parquet"

    hdfs_path = f"{fulldirectory}/{file_type}/{file_name}"
    with fs.open(hdfs_path, "wb") as f:
        f.write(data)

1.更新process_images_and_videos函数:修改process_images_and_videos函数,将图像和视频文件保存在各自的目录中。更新函数如下:

def process_images_and_videos(df, fulldirectory):
    # Filter the dataframe to get only images and videos
    image_df = df.filter(col("file_type").isin(["jpg", "png"]))
    video_df = df.filter(col("file_type").isin(["mp4", "mov"]))

    # Write images to HDFS
    image_df.foreach(lambda row: write_file(row, f"{fulldirectory}/images"))

    # Write videos to HDFS
    video_df.foreach(lambda row: write_file(row, f"{fulldirectory}/videos"))

1.更改写入CSV文件的分区:在写入CSV文件时修改分区逻辑以匹配所需的目录结构。更新代码如下:

# Write each batch to HDFS as Parquet
processed_df.write \
    .format("parquet") \
    .mode("append") \
    .partitionBy("file_type") \
    .parquet(f"{fulldirectory}/csv")

通过这些修改,您的代码应该在HDFS中创建所需的目录结构,并使用适当的文件扩展名保存文件。

相关问题