我正在开发一个流脚本,它应该在文件到达hdfs后立即提取文件,聚合它们并将它们写到其他地方。
在这里,我无法让写入工作-它创建了元数据文件夹,但没有实际的写入发生。在10多个文件(结构都相同)中,只有一个是编写的,我不知道为什么
有人能帮我吗?
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import udf, input_file_name, lower
from pyspark.streaming import StreamingContext
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
now = datetime.now()
# create a contexit that supports hive
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('streaming_monitor')
ssc = StreamingContext(spark_session, 1)
print('start')
print(datetime.now())
myschema = StructType([
StructField('text', StringType())
])
#only files after stream starts
df = spark_session\
.readStream\
.option('newFilesOnly', 'true')\
.option('header', 'true')\
.schema(myschema)\
.text('hdfs://nameservice/user/user1/streamtest/')\
.withColumn("FileName", input_file_name())
output = df.createOrReplaceTempView('log')
#hive_dump = spark_session.sql("select '" + str(now) + "' as timestamp, FileName, did_it_error, solution, text from log")
output = df\
.writeStream\
.format("csv")\
.queryName('logsmonitor')\
.option("checkpointLocation", "file:///home/user1/analytics/logs/chkpoint_dir")\
.start('hdfs://nameservice/user/user1/streamtest/output/')\
.awaitTermination()
1条答案
按热度按时间4si2a6ki1#
您在这里看到的是,spark streaming读取的文件必须以原子方式放入源文件夹。否则,文件将在创建后立即读取(并且没有任何内容)。spark不会对文件中的更新数据执行操作,而是只查看一次文件。
如果您
停止流媒体作业
删除检查点目录(或将所有输入文件重命名为新的唯一名称)
将所有文件移到源文件夹中
等待移动结束
启动流媒体应用程序
当然,如果您想让此作业连续运行并添加越来越多的文件,这将不是一个解决方案,但真正的秘密在于将文件以原子方式一次放入文件夹中。
我并不完全熟悉hdfs,但通常这种原子性可以通过将数据写入另一个文件夹,然后将其移动到源文件夹中来实现。
以下是有关输入源的文档中的参考:
“文件源-读取作为数据流写入目录的文件。文件将按文件修改时间的顺序进行处理。如果设置了latestfirst,则顺序将颠倒。支持的文件格式有text、csv、json、orc、parquet。请参阅datastreamreader接口的文档以获取更为最新的列表,以及每种文件格式支持的选项。请注意,文件必须以原子方式放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作实现。”