HDFS 如何获取某个日期后的所有hive文件以进行s3上载(python)

1sbrub3j  于 2022-12-09  发布在  HDFS
关注(0)|答案(2)|浏览(186)

我正在写一个程序,每天从一个特定的数据库上传我们所有的hive表到s3。但是,这个数据库包含了很多年前的记录,对于完整的副本/distcp来说太大了。
我希望搜索HDFS中包含数据库的整个目录,仅获取last_modified_date在指定(输入)日期之后的文件
然后,我将这些匹配文件的完整distcp传递给s3。(如果我只需要将匹配文件的路径/名称复制到一个单独的文件中,然后从这个额外的文件中传递distcp,也可以。)
在网上查找时,我发现可以使用-t标志按文件的最后修改日期对文件进行排序,因此我从以下内容开始:hdfs dfs -ls -R -t <path_to_db>,但这还不够。它打印了500000个文件,我还需要弄清楚如何修剪输入日期之前的文件...

**EDIT:**我正在写一个Python脚本,很抱歉一开始没有说明!
**EDIT pt 2:**我应该注意到我需要遍历几千个,甚至几十万个文件。我已经写了一个基本的脚本试图解决我的问题,但它需要一个令人难以置信的长时间来运行。需要一种方法来加快进程....

vddsk6oq

vddsk6oq1#

我不确定你是否使用Java,但这里有一个example of what can do:。我做了一些小的修改来使用lastModified。

import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

// For Date Conversion from long to human readable.
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class FileStatusChecker {
    public static void main (String [] args) throws Exception {
        try{
            FileSystem fs = FileSystem.get(new Configuration());
            String hdfsFilePath = "hdfs://My-NN-HA/Demos/SparkDemos/inputFile.txt";
            FileStatus[] status = fs.listStatus(new Path(hdfsFilePath));  // you need to pass in your hdfs path

            for (int i=0;i<status.length;i++){
                long lastModifiedTimeLong = status[i].lastModified();
                Date lastModifiedTimeDate = new Date(lastModifiedTimeLong);
                DateFormat df = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
                System.out.println("The file '"+ hdfsFilePath + "' was accessed last at: "+ df.format(lastModifiedTimeDate));
            }
        }catch(Exception e){
            System.out.println("File not found");
            e.printStackTrace();
        }
    }
}

它将使您能够创建一个文件列表,并使用它们做“事情”。

6kkfgxo0

6kkfgxo02#

您可以使用WebHDFS提取完全相同的信息:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
这可能更适合与Python一起使用。
Examples
文件/目录的状态提交HTTP GET请求。

curl -i  "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS"
 The client receives a response with a FileStatus JSON object:

HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked

{
  "FileStatus":
  {
    "accessTime"      : 0,
    "blockSize"       : 0,
    "group"           : "supergroup",
    "length"          : 0,             //in bytes, zero for directories
    "modificationTime": 1320173277227,
    "owner"           : "webuser",
    "pathSuffix"      : "",
    "permission"      : "777",
    "replication"     : 0,
    "type"            : "DIRECTORY"    //enum {FILE, DIRECTORY}
  }
}

列出目录提交HTTP GET请求。

curl -i  "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS"
 The client receives a response with a FileStatuses JSON object:

HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 427

{
  "FileStatuses":
  {
    "FileStatus":
    [
      {
        "accessTime"      : 1320171722771,
        "blockSize"       : 33554432,
        "group"           : "supergroup",
        "length"          : 24930,
        "modificationTime": 1320171722771,
        "owner"           : "webuser",
        "pathSuffix"      : "a.patch",
        "permission"      : "644",
        "replication"     : 1,
        "type"            : "FILE"
      },
      {
        "accessTime"      : 0,
        "blockSize"       : 0,
        "group"           : "supergroup",
        "length"          : 0,
        "modificationTime": 1320895981256,
        "owner"           : "szetszwo",
        "pathSuffix"      : "bar",
        "permission"      : "711",
        "replication"     : 0,
        "type"            : "DIRECTORY"
      },
      ...
    ]
  }
}

相关问题