有没有一种方法可以像使用hdfs(github示例)那样,在本地文件系统中扫描特定文件夹中的更改?使用常规路径或具有 hdfs://
似乎可以,但是使用uri file://
在它面前没有。
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
XML_PATH = "file:///home/user/in"
APP_NAME = "StreamingTest"
BATCH_DURATION = 1 # in seconds
if __name__ == "__main__":
sc = SparkContext("local[*]", appName=APP_NAME)
ssc = StreamingContext(sc, BATCH_DURATION)
lines = ssc.textFileStream(XML_PATH).pprint()
ssc.start()
ssc.awaitTermination()
奇怪的是,这似乎适用于特定的文件。什么时候 XML_PATH
已更改为 "file:///home/user/in/test.txt"
如果文件存在,则输出相同。
-------------------------------------------
Time: 2016-01-14 16:04:34
-------------------------------------------
-------------------------------------------
Time: 2016-01-14 16:04:35
-------------------------------------------
但是,当文件在流媒体传输过程中被删除时,应用程序开始抛出异常 16/01/14 16:04:37 WARN FileInputDStream: Error finding new files java.io.FileNotFoundException: File file:/home/user/in/test.txt does not exist
我假设它可以从本地目录读取。
我试着换衣服 XML_PATH
至 /tmp/in
它是hdfs上的一个目录,在运行流时上载同一个文件,这似乎是可行的
-------------------------------------------
Time: 2016-01-14 16:13:12
-------------------------------------------
-------------------------------------------
Time: 2016-01-14 16:13:13
-------------------------------------------
The Project Gutenberg EBook of Ulysses, by James Joyce
subscribe to our email newsletter to hear about new eBooks.
-------------------------------------------
Time: 2016-01-14 16:13:14
-------------------------------------------
暂无答案!
目前还没有任何答案,快来回答吧!