map-reduce程序将多个xml文件合并为一个xml文件

pgccezyw  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(354)

我是新的事后诸葛亮和hadoopMap减少概念。我正在尝试使用MapReduce程序将多个xml文件合并到一个xml文件中。我的意图是通过在文件名前面加上开始和结束标记,将每个xml文件合并到目标xml文件中。例如,下面的xml应该合并成一个单独的xml,如下所示
输入xml文件

<xml><a></a></xml>
<xml><b></b></xml>
<xml><c></c></xml>

输出xml文件

<xml>
 <File1Name><xml><a></a></xml><File2Name>
 <File2Name><xml><b></b></xml><File3Name>
 <File3Name><xml><c></c></xml><File3Name>
<xml>

问题1:是否可以将一个xml文件Map到每个Map器,并创建一个键-值对,将键作为文件名,将值作为每个xml文件的前缀和附加文件名作为开始和结束标记,并使用reducer将所有xml合并到一个上下文中,并输出到上面显示的xml。
问题2:如何在Map器代码中获取文件名作为键?

ubof19bj

ubof19bj1#

回答1:我不建议只向Map器发送一个xml,除非文件的大小超过1gb。可以将xml位置列表发送到Map器,然后在Map器代码中打开每个位置并将数据提取到输出中。
回答2:如果使用azure blob存储,您可以列出容器中的所有blob,并将它们分配给输入拆分。

How to create your list of InputSplits:
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();

/*Do this for each path we receive.  Creates a directory of splits in this order s = input path (S1,1),(s2,1)…(sN,1),(s1,2),(sN,2),(sN,3) etc..
 */
for (int i = numMinNameHashSplits; i <=     Math.min(numMaxNameHashSplits,numNameHashSplits–1); i++) {
for (Path inputPath : inputPaths) {
  ret.add(new ParseDirectoryInputSplit(inputPath.toString(), i));
  System.out.println(i + ” “+inputPath.toString());
 }
 }
return ret;
  }
 }

Once the List<InputSplits> is assembled, each InputSplit is handed to a Record Reader class where each Key, Value, pair is read then passed to the map task.  The initialization of the recordreader class uses the InputSplit, a string representing the location of a “folder” of invoices in blob storage, to return a list of all blobs within the folder, the blobs variable below.  The below Java code demonstrates the creation of the record reader for each hashslot and the resulting list of blobs in that location.

Public class ParseDirectoryFileNameRecordReader

extends RecordReader<IntWritable, Text> {
private int nameHashSlot;
private int numNameHashSlots;
private Path myDir;
private Path currentPath;
private Iterator<ListBlobItem> blobs;
private int currentLocation;

public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
   myDir = ((ParseDirectoryInputSplit)split).getDirectoryPath();

//getNameHashSlot tells us which slot this record reader is responsible for
   nameHashSlot = ((ParseDirectoryInputSplit)split).getNameHashSlot();

//gets the total number of hashslots
   numNameHashSlots = getNumNameHashSplits(context.getConfiguration());

//gets the input credientals to the storage account assigned to this record reader.
   String inputCreds = getInputCreds(context.getConfiguration());

//break the directory path to get account name    
   String[] authComponents = myDir.toUri().getAuthority().split(“@”);
   String accountName = authComponents[1].split(“\\.”)[0];
   String containerName = authComponents[0];
   String accountKey = Utils.returnInputkey(inputCreds, accountName);
   System.out.println(“This mapper is assigned the following     account:”+accountName);
StorageCredentials creds = new        StorageCredentialsAccountAndKey(accountName,accountKey);
CloudStorageAccount account = new CloudStorageAccount(creds);
   CloudBlobClient client = account.createCloudBlobClient();
   CloudBlobContainer container =        client.getContainerReference(containerName);
blobs = container.listBlobs(myDir.toUri().getPath().substring(1) +     “/”,     true,EnumSet.noneOf(BlobListingDetails.class), null,null).iterator();
   currentLocation = –1;
return;
 }

Once initialized, the record reader is used to pass the next key to the map task.  This is controlled by the nextKeyValue method, and it is called every time map task starts.  The blow Java code demonstrates this.

//This checks if the next key value is assigned to this task or is assigned to another mapper.  If it assigned to this task the location is passed to the mapper, otherwise return false
 @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (blobs.hasNext()) {
  ListBlobItem currentBlob = blobs.next();

//Returns a number between 1 and number of hashslots. If it matches the number assigned to this Mapper and its length is greater than 0, return the path to the map function
  if (doesBlobMatchNameHash(currentBlob) && getBlobLength(currentBlob) > 0) {
String[] pathComponents = currentBlob.getUri().getPath().split(“/”);

String pathWithoutContainer =
currentBlob.getUri().getPath().substring(pathComponents[1].length() + 1);

currentPath = new Path(myDir.toUri().getScheme(),     myDir.toUri().getAuthority(),pathWithoutContainer);

currentLocation++;
return true;
 }
    }
return false;
 }

The logic in the map function is than simply as follows, with inputStream containing the entire XML string

Path inputFile = new Path(value.toString());
FileSystem fs = inputFile.getFileSystem(context.getConfiguration());

//Input stream contains all data from the blob in the location provided by Text
FSDataInputStream inputStream = fs.open(inputFile);

资源:
http://www.andrewsmoll.com/3-hacks-for-hadoop-and-hdinsight-clusters/ “黑客3”
http://blogs.msdn.com/b/mostlytrue/archive/2014/04/10/merging-small-files-on-hdinsight.aspx

相关问题