博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce结构及运行机制
阅读量:6979 次
发布时间:2019-06-27

本文共 4047 字,大约阅读时间需要 13 分钟。

  hot3.png

  • 结构
    一个完整的mapreduce程序在分布式运行时有三类实例进程:
  1. MRAppMaster:负责整个程序的过程调度和状态协调。
  2. MapTask:负责map阶段的数据处理流程。
  3. ReduceTask:负责reduce阶段的整个数据处理流程。
  • 运行机制及流程
  1. mr启动时,先启动MRAppMaster进程,MRAppMaster启动后根据客户端job提交形成的任务分配策略(根据待处理数据及传递的参数形成一定的分片规则),计算出需要的maptask的实例数量,然后向集群申请机器启动相应数量的maptask进程。
  2. maptask启动后,根据切片范围进行数据处理。(即从客户端指定的目录读取数据,逐行调用map方法,进行逻辑处理,并将map方法处理后的kv对收集到缓存,最后再将缓存中的kv对根据k分区,不断溢写到磁盘文件)
  3. MRAppMaster监控到所有maptask处理完成后,根据客户端设定的reducetask的启动个数,启动相应数量的reducetask进程,并告知每个reducetask要处理的数据范围。
  4. reducetask启动后,从若干maptask运行所在机器上取到map输出文件,在本地进行重组归并排序,然后按照相同的key的kv为一组,调用reduce方法进行逻辑运算并收集输出结果kv。最后,调用客户端指定的outputformat将结果输出到目标文件。
  • maptask并行数量的决定机制
  1. maptask的数量是由客户端提交job时决定的。即根据待处理的数据以及job提交时指定的参数决定。
  2. 切片的基本逻辑是由FlieInputFormat实现类的getSplits()方法完成。源码:
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */public List
getSplits(JobContext job) throws IOException { Stopwatch sw = new Stopwatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List
splits = new ArrayList
(); List
files = listStatus(job); for (FileStatus file : files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file) .getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { // Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } return splits;}
public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";  public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {	return Math.max(minSize, Math.min(maxSize, blockSize));  }    /**   * Get the lower bound on split size imposed by the format.   * @return the number of bytes of the minimal split for this format   */  protected long getFormatMinSplitSize() {    return 1;  }    /**   * Get the minimum split size   * @param job the job   * @return the minimum number of bytes that can be in a split   */  public static long getMinSplitSize(JobContext job) {    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);  }    /**   * Get the maximum split size.   * @param context the job to look at.   * @return the maximum number of bytes a split can include   */  public static long getMaxSplitSize(JobContext context) {    return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);  }

通过源码可以看出:

  • 在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由下面几个值来运算决定。
  • minSize:默认值1,可以通过参数mapreduce.input.fileinputformat.split.minsize指定
  • maxSize:默认值Long.MAX_VALUE,可以通过参数mapreduce.input.fileinputformat.split.maxsize指定
  • 在都取默认值情况下切片大小=blocksize=128M。

转载于:https://my.oschina.net/riseee/blog/1601646

你可能感兴趣的文章
配套自测连载(二)
查看>>
linux下set和eval的使用小案例精彩解答
查看>>
为什么很多人努力了却死一地
查看>>
开放产品开发(OPD):Archi 汉化工具下载
查看>>
VS code for python开发利器
查看>>
高性能的MySQL(1)锁和MVCC
查看>>
如何用VDP备份虚拟机
查看>>
虚拟机安装 Windows 10 9926 预览版 “准备就绪”...... 故障
查看>>
FTP服务器的防火墙通用设置规则
查看>>
遍历系统文本全文
查看>>
《人人都能看懂经济学》读书笔记
查看>>
Linux文本比较命令:diff
查看>>
Android开发实践:JNI函数签名生成器
查看>>
危机!测试工程师真的要小心了
查看>>
MySQL 高可用MMM
查看>>
Centos6.2_X86_64 _LNMP安装全程实录
查看>>
我的友情链接
查看>>
eclipse插件安装方法
查看>>
Android帧缓冲区(Frame Buffer)硬件抽象层(HAL)模块Gralloc的实现原理分析(1)...
查看>>
Javascript中的字符串链接和Array.join()方法时间效率对比
查看>>