我从Kafka主题中阅读数据,并使用PySpark将数据存储到HDFS。当我运行代码时,我试图在HDFS中实现以下目录结构:
user/hadoop/OUTPUT_FINAL1/年/月/日/小时/ csv/(所有csv文件为Parquet)images/(所有图像文件)videos/(所有视频文件)
然而,当我运行我的代码I时,在HDFS中看到的目录结构如下所示:
user/hadoop/OUTPUT_FINAL1/年/月/日/小时/
csv/file_type=csv/(csv数据)和file_type=image/(图像数据)
我的代码没有写入所需的目录结构,也没有写入视频文件。
CSV文件需要使用与扩展名parquet相同的名称编写。图像需要使用.jpg或.png扩展名编写,对于.mp4或. mov的视频也是如此。
这是我代码。
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyarrow.hdfs as hdfs
import os
# Configure logging
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("org.apache.kafka").setLevel(logging.WARN)
# Initialize SparkSession
spark = SparkSession.builder \
.appName("KafkaToHDFS") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
.getOrCreate()
# Kafka broker configuration
kafka_server = "sandbox-hdf.hortonworks.com:6667"
kafka_topic = "airbnbdata-final"
# Read data from Kafka topic
df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_server) \
.option("subscribe", kafka_topic) \
.load()
# Define a function to create the output directory
def create_directory(directory):
current_time = spark.sql("SELECT current_timestamp()").collect()[0][0]
# Extract year, month, day, and hour components
year_str = current_time.strftime("%Y")
month_str = current_time.strftime("%m")
day_str = current_time.strftime("%d")
hour_str = current_time.strftime("%H")
fulldirectory = directory + "/" + year_str + "/" + month_str + "/" + day_str + "/" + hour_str + "/"
return fulldirectory
# Define a function to process each batch
def process_batch(df, fulldirectory):
schema = StructType().add("file_type", StringType()).add("file_name", StringType()).add("data", StringType())
# Extract the key, value, and other columns
extracted_df = df.selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp", "timestampType"
)
# Parse the JSON string in the value column
extracted_df = extracted_df.withColumn("value_json", from_json(col("value"), schema))
# Extract fields from the value_json column
extracted_df = extracted_df.withColumn("file_type", col("value_json.file_type"))
extracted_df = extracted_df.withColumn("file_name", col("value_json.file_name"))
extracted_df = extracted_df.withColumn("data", col("value_json.data"))
extracted_df.printSchema()
return extracted_df
# Define a custom function to write each file to HDFS
def write_file(row, fulldirectory):
fs = hdfs.connect(host="sandbox-hdp.hortonworks.com", port=8020)
file_name = row["file_name"]
data = row["data"]
hdfs_path = fulldirectory + file_name
with fs.open(hdfs_path, "wb") as f:
f.write(data)
# Define a function to process images and videos
def process_images_and_videos(df, fulldirectory):
# Filter the dataframe to get only images and videos
image_df = df.filter(col("file_type").isin(["jpg", "png"]))
video_df = df.filter(col("file_type").isin(["mp4", "mov"]))
# Write images to HDFS
image_df.foreach(lambda row: write_file(row, fulldirectory + "images/"))
# Write videos to HDFS
video_df.foreach(lambda row: write_file(row, fulldirectory + "videos/"))
# Configure the directory path and create the output directory
directory_path = "hdfs://sandbox-hdp.hortonworks.com:8020/user/hadoop/OUTPUT_FINAL1"
fulldirectory = create_directory(directory_path)
# Process the batch
processed_df = process_batch(df, fulldirectory)
# Write each batch to HDFS as Parquet
processed_df.write \
.format("parquet") \
.mode("append") \
.partitionBy("file_type", "file_name") \
.parquet(fulldirectory + "csv/")
# Process images and videos
process_images_and_videos(processed_df, fulldirectory)
# Stop the SparkSession
spark.stop()
我使用以下命令在ambari沙箱环境ssh中运行代码:
spark-submit --jars /tmp/Airbnb_Data/jsr305-3.0.2.jar,/tmp/Airbnb_Data/snappy-java-1.1.8.4.jar,/tmp/Airbnb_Data/kafka-clients-3.3.2.jar,/tmp/Airbnb_Data/spark-sql-kafka-0-10_2.11-2.4.0.jar,/tmp/Airbnb_Data/commons-logging-1.1.3.jar,/tmp/Airbnb_Data/spark-streaming-kafka-0-10_2.12-3.4.0.jar,/tmp/Airbnb_Data/hadoop-client-api-3.3.4.jar,/tmp/Airbnb_Data/lz4-java-1.8.0.jar,/tmp/Airbnb_Data/hadoop-client-runtime-3.3.4.jar,/tmp/Airbnb_Data/scala-library-2.12.17.jar,/tmp/Airbnb_Data/spark-tags_2.12-3.4.0.jar,/tmp/Airbnb_Data/spark-token-provider-kafka-0-10_2.12-3.4.0.jar,/tmp/Airbnb_Data/slf4j-api-1.7.36.jar KafkaToHDFS.py
下面是我想要的目录结构:
user/hadoop/OUTPUT_FINAL1/
├── year/
│ ├── month/
│ │ ├── day/
│ │ │ ├── hour/
│ │ │ │ ├── csv/
│ │ │ │ │ └── <csv parquet files>
│ │ │ │ ├── images/
│ │ │ │ │ └── <image files>
│ │ │ │ └── videos/
│ │ │ │ └── <video files>
2条答案
按热度按时间h7wcgrx31#
Spark无法写入二进制图像或视频数据。
在Spark foreach中调用pyarrow函数并不是真正使用Spark文件系统编写器。另外,我相信pyarrow是在HDP 2.6.5支持结束后发布的,所以我认为该功能不会工作。
关于目录结构,让Spark为您做这件事,通过分区单个日期列,而不是收集批次并提取一个元素。
然后您将获得
year=xxxx/month=xx/day=xx/hour=xx/filetype=xxx/
数据图像需要使用.jpg或.png扩展名编写,对于.mp4或.mov的视频也是如此。
HDFS不是存放此类文件的好位置。对媒体资产使用blob存储
x759pob22#
查看您的代码,我发现需要解决一些问题,以实现所需的目录结构和文件扩展名。以下是您可以进行的修改:
1.修改
write_file
函数:更新write_file
函数以使用适当的文件扩展名保存文件。可以按如下方式修改该函数:1.更新
process_images_and_videos
函数:修改process_images_and_videos
函数,将图像和视频文件保存在各自的目录中。更新函数如下:1.更改写入CSV文件的分区:在写入CSV文件时修改分区逻辑以匹配所需的目录结构。更新代码如下:
通过这些修改,您的代码应该在HDFS中创建所需的目录结构,并使用适当的文件扩展名保存文件。