Hadoop

Hadoop系统的基本组成架构包含两个部分:分布式存储和并行计算两部分

  • HDFS

NameNode作为分布式存储的主控节点,用以存储和管理分布式文件系统的元数据,同时使用DataNode作为实际存储大规模数据从节点。

  • MapReduce

Hadoop使用JobTracker作为MapReduce框架的主控节点,用来管理和调度作业的执行,用TaskTracker管理每个计算从节点上任务的执行。

HDFS

Hadoop Distributed File System (HDFS)

旨在存储大量信息,通常为PB。不支持随机读写

  • 单文件TB PB/千万级文件数
  • 顺序访问 提高大规模数据访问效率,支持顺序读,随机访问负载高
  • 一次写多次读访问文件,不支持更新,允许文件尾部添加。

块结构文件系统来完成的。单个文件被拆分为固定大小的块,这些块存储在集群上。由多个块组成的文件通常不会将所有块存储在同一台机器上。 为了确保可靠性,块在集群中存在副本。复制因子默认是3,每个块在集群中存在3份。通常64MB为一块

数据块越大,减少寻址频度和时间开销

  • NameNote 保存文件系统的元数据(数据块与文件名映射表,数据块副本位置)和命名空间(系统目录结构),处理访问文件的请求
  • DataNode 存储组成文件的块

NameNode和DataNode进程可以在一台机器上运行,但HDFS集群通常由运行NameNode进程的专用服务器和可能有数千台运行DataNode进程的计算机组成。

NameNode

metadata 包括 文件名,文件权限以及每个文件的每个块的位置。保存在内存。 NameNode还跟踪块的复制因子,确保机器故障不会导致数据丢失。

由于NameNode是单点故障,因此可以使用辅助NameNode生成主NameNode内存结构的快照,从而降低NameNode失败时数据丢失的风险。 当DataNode失败时,NameNode将复制丢失的块以确保每个块满足最小复制因子

FsImage EditLog

SecondNameNode

  • 解决EditLog变大,启动慢的问题

    SecondNameNode 每隔一段时间拉取合并NameNode里的FsImage EditLog,这段时间的更新写入到NameNode的EditLog. new合并结束后,新FsImage发送到NameNode覆盖旧的FsImage,EditLog.new覆盖EditLog

  • 如果NameNode在合并时间段内发生故障,系统会丢数据。

DataNote

读过程

写过程

复制流程 数据接收者会向发送者发送确认包

恢复数据

NameNode

DataNode

数据

MD5 sha1

局限

高级操作

  • archive 压缩

  • balancer DataNode数据块副本均匀分布

  • distcp hdfs拷贝数据

MapReduce

Mapreduce并行模型

Mapreduce执行过程

inputformat

负责以什么样的格式输入数据,InputFormat可以验证作业数据的输入形式和格式;将输入数据分割为若干个逻辑意义上的InputSplits。RecordReader负责从数据分块中读取数据记录转化为键值对

split

split基本上和hdfs的基本存储块block同样大小,一个split对应一个map,你可以把它当做map的单位块来理解,投喂进map的时候必须要这样的格式

Block and InputSplit

map

处理一堆键值对,mapper会顺序单独处理每个键值对,产生若干个output键值对

shuffle

分为map端操作和reduce端操作

map

  • map任务的输出默认是放在本地磁盘的临时缓冲目录中,每当缓冲区到达上限80%的时候,就会启动一个Spill(溢写)进程,它的作用是把内存里的map task的结果写入到磁盘。
  • Partitioner 分区 按照key分别映射给不同的reduce,默认是hash(key)%R哈希函数分散到各个reducer去
  • combiner (optional 自定义 <’a’,1>和<’a’,1> -> <’a’,2>) 减轻网络压力,提高程序效率。 Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。 Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
  • Sort 默认升序
  • Merge 每次溢写会在磁盘上生成一个溢写文件,当Map输出结果真的很大,存在多个溢写文件(<’a’,1>和<’a’,1> -> <’a’,<1,1» )将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。

reduce

reduce从JobTracker那里获取map task是否完成的信息,得到通知,Shuffle的后半段过程开始启动。

  • copy Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求从不同的map端 TaskTracker获取map task的输出文件
  • spill merge combiner sort -> 最终的那个文件
  • 当Reducer的输入文件已定,整个Shuffle才最终结束。默认情况下,这个文件是存放于磁盘中

reduce

唯一key包含不唯一的值,reducer为每个唯一key合并它们的value,产生若干键值对

mapreduce 优化

MapReduce Job Optimization

  • 配置优化
  • Set mapred.compress.map.output to true to enable LZO compression
  • 任务量大,块大小提高,更长时间的任务>1min,reduce任务的数量等于或略小于集群中reduce槽的数量。
  • Combiner

数据输入小文件处理

  • 合并小文件:对小文件进行归档(har)、自定义inputformat将小文件存储成sequenceFile文件
  • 采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。
  • 对于大量小文件Job,可以开启JVM重用

map

  • 增大环形缓冲区大小,缓冲区溢写的比例
  • 减少对溢写文件的merge次数
  • 采用combiner提前合并,减少 I/O

reduce

  • 合理设置map和reduce数:两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。
  • 设置map、reduce共存:调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。
  • 规避使用reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
  • 增加每个reduce去map中拿数据的并行数
  • 集群性能可以的前提下,增大reduce端存储数据内存的大小。

IO

  • 采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。
  • 使用SequenceFile二进制文件

YARN

https://www.ibm.com/developerworks/cn/data/library/bd-yarn-intro/index.html

局限性

经典 MapReduce 的最严重的限制主要关系到可伸缩性、资源利用和对与 MapReduce 不同的工作负载的支持。在 MapReduce 框架中,作业执行受两种类型的进程控制:

一个称为 JobTracker 的主要进程,它协调在集群上运行的所有作业,分配要在 TaskTracker 上运行的 map 和 reduce 任务。 许多称为 TaskTracker 的下级进程,它们运行分配的任务并定期向 JobTracker 报告进度。

  • 单个 JobTracker 导致的可伸缩性瓶颈
  • Hadoop 设计为仅运行 MapReduce 作业。随着替代性的编程模型(比如 Apache Giraph 所提供的图形处理)的到来, 除 MapReduce 外,越来越需要为可通过高效的、公平的方式在同一个集群上运行并共享资源的其他编程模型提供支持。

改变

我们减少了单个 JobTracker 的职责,将部分职责委派给 TaskTracker,因为集群中有许多 TaskTracker。 在新设计中,这个概念通过将 JobTracker 的双重职责(集群资源管理和任务协调)分开为两种不同类型的进程来反映。

不再拥有单个 JobTracker,一种新方法引入了一个集群管理器,它惟一的职责就是跟踪集群中的活动节点和可用资源,并将它们分配给任务。 对于提交给集群的每个作业,会启动一个专用的、短暂的 JobTracker 来控制该作业中的任务的执行。有趣的是,短暂的 JobTracker 由在从属节点上运行的 TaskTracker 启动。 因此,作业的生命周期的协调工作分散在集群中所有可用的机器上。得益于这种行为,更多工作可并行运行,可伸缩性得到了显著提高。

yarn

(资源管理,任务调度,任务监控)

  • ResourceManager 代替集群管理器 资源管理
  • ApplicationMaster 代替一个专用且短暂的 JobTracker 任务调度,任务监控
  • NodeManager 代替 TaskTracker
  • 一个分布式应用程序代替一个 MapReduce 作业

在 YARN 架构中,一个全局 ResourceManager 以主要后台进程的形式运行,它通常在专用机器上运行,在各种竞争的应用程序之间仲裁可用的集群资源。 ResourceManager 会追踪集群中有多少可用的活动节点和资源,协调用户提交的哪些应用程序应该在何时获取这些资源。 ResourceManager 是惟一拥有此信息的进程,所以它可通过某种共享的、安全的、多租户的方式制定分配(或者调度)决策(例如,依据应用程序优先级、队列容量、ACLs、数据位置等)。

在用户提交一个应用程序时,一个称为 ApplicationMaster 的轻量型进程实例会启动来协调应用程序内的所有任务的执行。 这包括监视任务,重新启动失败的任务,推测性地运行缓慢的任务,以及计算应用程序计数器值的总和。 这些职责以前分配给所有作业的单个 JobTracker。 ApplicationMaster 和属于它的应用程序的任务,在受 NodeManager 控制的资源容器中运行。

NodeManager 是 TaskTracker 的一种更加普通和高效的版本。没有固定数量的 map 和 reduce slots, NodeManager 拥有许多动态创建的资源容器。容器的大小取决于它所包含的资源量,比如内存、CPU、磁盘和网络 IO。目前,仅支持内存和 CPU (YARN-3)。 未来可使用 cgroups 来控制磁盘和网络 IO。一个节点上的容器数量,由配置参数与专用于从属后台进程和操作系统的资源以外的节点资源总量(比如总 CPU 数和总内存)共同决定。

有趣的是,ApplicationMaster 可在容器内运行任何类型的任务。例如,MapReduce ApplicationMaster 请求一个容器来启动 map 或 reduce 任务,而 Giraph ApplicationMaster 请求一个容器来运行 Giraph 任务。您还可以实现一个自定义的 ApplicationMaster 来运行特定的任务,进而发明出一种全新的分布式应用程序框架,改变大数据世界的格局。您可以查阅 Apache Twill,它旨在简化 YARN 之上的分布式应用程序的编写。

一个可运行任何分布式应用程序的集群

ResourceManager、NodeManager 和容器都不关心应用程序或任务的类型。所有特定于应用程序框架的代码都转移到它的 ApplicationMaster, 以便任何分布式框架都可以受 YARN 支持 — 只要有人为它实现了相应的 ApplicationMaster

  • 向yarn提交应用程序
  • ResourceManager接收处理客户端请求,为应用程序分配一个容器,与容器内NodeManager通信,启动ApplicationMaster。
  • ApplicationMaster创建后向ResourceManager注册,ResourceManager可以了解应用程序状态。
  • ApplicationMaster向ResourceManager申请资源,成功后,与容器内NodeManager通信,启动Tasks。
  • Tasks通过RPC反馈状态到ApplicationMaster。Task失败会ApplicationMaster重启
  • Tasks完成,ApplicationMaster向ResourceManager注销,关闭自己。ApplicationMaster失败会ResourceManager重启,知道task完成。