- 结构 一个完整的mapreduce程序在分布式运行时有三类实例进程:
- MRAppMaster:负责整个程序的过程调度和状态协调。
- MapTask:负责map阶段的数据处理流程。
- ReduceTask:负责reduce阶段的整个数据处理流程。
- 运行机制及流程
- mr启动时,先启动MRAppMaster进程,MRAppMaster启动后根据客户端job提交形成的任务分配策略(根据待处理数据及传递的参数形成一定的分片规则),计算出需要的maptask的实例数量,然后向集群申请机器启动相应数量的maptask进程。
- maptask启动后,根据切片范围进行数据处理。(即从客户端指定的目录读取数据,逐行调用map方法,进行逻辑处理,并将map方法处理后的kv对收集到缓存,最后再将缓存中的kv对根据k分区,不断溢写到磁盘文件)
- MRAppMaster监控到所有maptask处理完成后,根据客户端设定的reducetask的启动个数,启动相应数量的reducetask进程,并告知每个reducetask要处理的数据范围。
- reducetask启动后,从若干maptask运行所在机器上取到map输出文件,在本地进行重组归并排序,然后按照相同的key的kv为一组,调用reduce方法进行逻辑运算并收集输出结果kv。最后,调用客户端指定的outputformat将结果输出到目标文件。
- maptask并行数量的决定机制
- maptask的数量是由客户端提交job时决定的。即根据待处理的数据以及job提交时指定的参数决定。
- 切片的基本逻辑是由FlieInputFormat实现类的getSplits()方法完成。源码:
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */public ListgetSplits(JobContext job) throws IOException { Stopwatch sw = new Stopwatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List splits = new ArrayList (); List files = listStatus(job); for (FileStatus file : files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file) .getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { // Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } return splits;}
public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"; public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } /** * Get the lower bound on split size imposed by the format. * @return the number of bytes of the minimal split for this format */ protected long getFormatMinSplitSize() { return 1; } /** * Get the minimum split size * @param job the job * @return the minimum number of bytes that can be in a split */ public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); } /** * Get the maximum split size. * @param context the job to look at. * @return the maximum number of bytes a split can include */ public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); }
通过源码可以看出:
- 在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由下面几个值来运算决定。
- minSize:默认值1,可以通过参数mapreduce.input.fileinputformat.split.minsize指定
- maxSize:默认值Long.MAX_VALUE,可以通过参数mapreduce.input.fileinputformat.split.maxsize指定
- 在都取默认值情况下切片大小=blocksize=128M。