有没有办法在hadoop中向字符串添加分隔符?

vuktfyat  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(357)

我正在考虑在hadoop中编辑文件中的每一行并添加分隔符。在hadoop中这样做是否有利,因为我有非常大的文件?
例子:
输入文件:

001012489MAR01856400004400
001012489FEB01856400004400

输出文件将

0010|12489|MAR|018564|0000|44|00
0010|12489|FEB|018564|0000|44|00

我怎样才能做到这一点?我搜索了很多博客,但没有找到一个方法。

0s7z1bwu

0s7z1bwu1#

另一种方法是使用hive(负责编程部分)
1) 创建一个指向hdfs原始数据文件位置的配置单元tmp表,

CREATE EXTERNAL TABLE tmp (raw String) 
LOCATION '<hdfs_path>'

2) 用管道分隔器创建格式化的数据表

CREATE TABLE formatted_data(
col1 string,col2 string,
col3 string,col4 string, 
col5 string,col6 string,col7 string) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '|';

3) 将数据从tmp表插入格式化的数据表:

INSERT INTO formatted_data 
select substr(raw,0, 4),substr(raw,4, 9),substr(raw,9, 12),
substr(raw,12, 18),substr(raw,18, 22),substr(raw,22, 24),
substr(raw,24,26) from TMP ;

4) 验证“格式化的数据”表的hdfs文件

hadoop fs -cat /hive/warehouse/formatted_data/000000_0
16/08/30 10:47:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

 0010|012489MAR|9MAR01856400|R01856400004400|400004400|04400|400
 0010|012489FEB|9FEB01856400|B01856400004400|400004400|04400|400
icnyk63a

icnyk63a2#

这可以通过map reduce或spark job实现( substring() ):
MapReduce(java):在这种情况下,您只需要mapper。只需将输入字符串行与分隔的字符串行Map:

public class Delimeters {

    public static class DelimetersMapper extends Mapper<Object, Text, Text, Text> {

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            //001012489FEB01856400004400
            String lineWithDelimeter = value.toString().substring(0, 4) + "|" + value.toString().substring(4, 9)
                    + "|" + value.toString().substring(9, 12) + "|" + value.toString().substring(12, 18)
                    + "|" + value.toString().substring(18, 22) + "|" + value.toString().substring(22, 24)
                    + "|" + value.toString().substring(24,26);

            System.out.println(lineWithDelimeter); //0010|12489|MAR|018564|0000|44|00

            context.write(new Text(lineWithDelimeter),new Text(""));    
        }   
    }
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Add-Delimeters-to-flat-file");

        job.setJarByClass(Delimeters.class);
        job.setMapperClass(DelimetersMapper.class); 
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileSystem fs = null;
        Path dstFilePath = new Path(args[1]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        job.waitForCompletion(true);
    } 

}

Spark(scala):

object delimeter {
    def main(args: Array[String]) {

            val inputFile = args(0)

                val conf = new SparkConf().setAppName("Add-Delimeters-to-flat-file").setMaster("local")
                val sc = new SparkContext(conf)

                val txtFileLines = sc.textFile(inputFile)

                val fields = txtFileLines.map(line => line.substring(0, 4) + "|" + line.substring(4, 9)
                    + "|" + line.substring(9, 12) + "|" + line.substring(12, 18)
                    + "|" + line.substring(18, 22) + "|" + line.substring(22, 24)
                    + "|" + line.substring(24,26))

        fields.foreach(x => println(x))

            fields.saveAsTextFile(args(1))    
        }
}

更新:
你可以用 file:/// uri,让hadoop知道如何查找本地文件系统作为源(spark适用相同的规则):

hadoop jar <app.jar> <package.classname> <file:///path/to/local/dir> </path/to/hdfs/>

例子:

[cloudera@quickstart Desktop]$ hadoop jar hadoop-stack.jar so.Delimeters file:///home/cloudera/Desktop/test.txt /user/cloudera/delim
    [cloudera@quickstart Desktop]$ hadoop fs -cat /user/cloudera/delim/*
    0010|12489|FEB|018564|0000|44|00    
    0010|12489|MAR|018564|0000|44|00

您可以在中有源文件 hdfs 并在处理成功后删除应用程序本身的源文件:

int exitcode = job.waitForCompletion(true)? 0: -1;

if (exitcode == 0){
try {
    Path sourcePath = new Path(args[0]);
    fs = sourcePath.getFileSystem(conf);
    if (fs.exists(sourcePath))
        fs.delete(sourcePath, true);
} catch (IOException e1) {
    e1.printStackTrace();
}
}

创建 oozie 运行向目标目标添加分隔符的应用程序和在结尾处删除源文件/目录的shell脚本的工作流

相关问题