分布式计算存储

hadoop HDFS mapreduce yarn

大量化(10TB)                                                                                                                                  多样化:
结构化 excel表,关系型数据库My SQL
半结构化 具有一定的层次结构,介于结构化和非结构化之间 XML文件,JSON文件,电
子邮件(标题,正文,附件)
非结构化(没有固定格式)文本文件,图片,音乐
大数据技术要面对的基本问题,也是最核心的问题:就是是海量数据如何可靠存储和高效计算。

1.2

GOOGLE的三驾马车

GFS:The Google File SYstem

MapReduce:Simplifed Data Processing On Large Clusters

Bigtable: A Distributed Storage System for Structured Data 

GFS Master :节点管理所有文件系统元数据,包括命名空间、访问控制信息、文件和块的映射信息以及当前块的位置信息

GFS存储的文件被分割成固定大小的块,每个块会被复制到多个块服务器上(可靠性) 块的冗余默认为3

GFS Master还管理着系统范围内的活动,比如块服务器之间的数据迁移等

GFS Master 与每个块服务器通信(发送心跳包),发送指令、获取状态

副本位置选择的策略要满足两大目标:最大化数据可靠性和可用性

MapReduce 采用“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个子节点共同完成,然后整合各个子节点的中间结果,得到最终的结算结果。简而言之,MapReduce就是“分散任务,汇总结果”

1.3Hadoop的概述

1.Hadoop是什么?

    1.Hadoop是一个由Apache基金会所开发的分布式系统的基础框架

    2.主要解决海量数据的存储和海量数据的分析计算问题

    3.广义上来说,Hadoop通常是指一个更为广泛的概念—Hadoop生态圈

2.Hadoop根据是Google三篇论文实现

    HDFS ————》GFS

    MapReduce—————》MapReduce

    HBase——》BigTable

3.HDFS

HDFS:是Hadoop的项目的核心子项目,是分布式计算中数据存储管理的基础

1.4Hadoop的优势

(1)高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失

(2)高扩展性:在集群间分配任务数据,可方便的拓展数以千计的节点

  (3)高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度

(4)高容错性:能够自动将失败的任务重新分配

1.5Hadoop的适用场景

Hadoop的生产环境     

1.HDFS架构

(1)NameNode:存储文件的元数据,如文件名、文件目录结构、文件属性(生成时间、副本数、文件权限),以及每个文件的块列表所在的DataNode等。

 (2)DataNode:在本地文件系统存储文件块数据,以及块数据的校验。

 (3)Secondary NameNode:每隔一段时间对NameNode元数据备份。(不是热备)

2.Yarn架构概述

ResourceManager(RM):整个集群资源(内存、cpu等)的老大

NodeManager(NM):单个节点服务器资源老大

ApplicationMaster:单个任务运行的老大(1)负责数据的切分(2)为应用程序申请资源并且分配给内部任务(3)任务的监控与容错 

Container:容器,相当于一台独立的服务器,里面封装了任务运行所需要的资源,如内存、cpu、磁盘、网络等。

说明:

1.客户端可以有多个

2.集群上可以运行多个ApplicationMaster

3.每个NodeManager上可以有多个Container

3.MapReduce架构概述

MapReduce将计算分为两个阶段:map和reduce

Map阶段并行处理输入数据

Reduce阶段对Map结果进行汇总

安装环境‘

1.用来将一个docker镜像从docker /cg/images/hadoop_node.tar.gz   

docker load < /cg/images/hadoop_node.tar.gz

2.

docker run --name master --privileged --ulimit nofile=65535:65535 --hostname master --ip 172.18.0.2 --add-host=slave1:172.18.0.3  --add-host=slave2:172.18.0.4 --add-host=slave3:172.18.0.5 -itd -v /cgsrc:/cgsrc:ro -v /headless/course/:/course hadoop_node /service_start.sh

docker run --name slave1 --privileged --ulimit nofile=65535:65535 --hostname slave1 --ip 172.18.0.3 --add-host=master:172.18.0.2  --add-host=slave2:172.18.0.4 --add-host=slave3:172.18.0.5  -itd -v /cgsrc:/cgsrc:ro hadoop_node /service_start.sh

docker run --name slave2 --privileged --ulimit nofile=65535:65535 --hostname slave2 --ip 172.18.0.4 --add-host=master:172.18.0.2 --add-host=slave1:172.18.0.3  --add-host=slave3:172.18.0.5 -itd -v /cgsrc:/cgsrc:ro hadoop_node /service_start.sh

docker run --name slave3 --privileged --ulimit nofile=65535:65535 --hostname slave3 --ip 172.18.0.5 --add-host=master:172.18.0.2 --add-host=slave1:172.18.0.3  --add-host=slave2:172.18.0.4 -itd -v /cgsrc:/cgsrc:ro hadoop_node /service_start.sh

docker run --name master --privileged --ulimit nofile=65535:65535 --hostname master --ip 172.18.0.2 --add-host=slave1:172.18.0.3 --add-host=slave2:172.18.0.4 --add-host=slave3:172.18.0.5 -itd -v /cgsrc:/cgsrc:ro -v /headless/course/:/course hadoop_node /service_start.sh 

--name master:master容器的名字

--privileged 赋予容器特权模式,允许它访问主机上所有设备

--utimit nofile=65535:65535设置文件描述的软限制和硬限制   

--hostname master --ip 172.18.0.2 --add-host=slave1:172.18.0.3 --add-host=slave2:172.18.0.4 --add-host=slave3:172.18.0.5    hostname容器的主机名为master,老大,slave

-itd

2.查看启动的docker

docker ps

3.docker ps -a查看所有容器    

    

4.进入master容器

docker exec -it --privileged master /bin/bash

安装java环境:

1创建文件夹java

mkdir /usr/local/java

2.复制     cp被复制文件的地址 复制到的地址

3。将压缩包 ***解压

tar -zxvf jdk-8u171-linux-x64.tar.gz

第三周

配置分布模式

      hdfs  NameNode(1个)   DataNode(多个)   SecondaryNameNode(1个)(尽量不跟NameNode放在同一个服务器上)

Yarn:ResourceManager(1个)    NodeManager(多个)    ResourceManager很消耗内存,尽量不要和NameNode,SecondaryNode配置在同一台机器上

hadoop1 hadoop2 hadoop3 hadoop4
NameNode DataNode DataNode

DataNode

SecondaryNameNode

NodeManager NodeManager NodeManager NodeManager

ping ip地址:ICMP协议,测试两台计算机之间的连通性(OSI第三层的协议)

SSH

Secure Shell(安全外壳协议)

一台计算机 ----》 一台计算机(一般情况下需要密码)

公钥(锁),私钥(钥匙)

寄一个箱子给彭文璟,但是箱子比较私密,中途不想让任何人打开

1.彭文璟造一把锁(公钥)和一把钥匙(私钥)。

2.他把锁给魏佳星,(把锁给服务器)钥匙彭文璟自己藏好了

3.魏佳星就把箱子用这把锁,锁住(用公钥加密)

4.中途没有人可以打开

5.只有有钥匙的人(有私钥)才能打开

魏佳星:服务器

彭文璟:客户端   

1.生成公钥和私钥的命令

ssh-keygen -t rsa
known_hosts 记录ssh访问过的计算机的公钥
id_rsa 生成的私钥
id_rsa.pub 生成的公钥
authorized_keys 存放授权过的无密码登录服务器的公钥

2.将公钥文件追加到另一个文件authorized_keys中

cat ./id_rsa.pub >> ./authorized_keys
scp ~/.ssh/id_rsa.pub root@slave1:/root
scp ~/.ssh/id_rsa.pub root@slave2:/root
scp ~/.ssh/id_rsa.pub root@slave3:/root

从自己本机(master)把~/.ssh/id_rsa.pub路径下的文件复制root@slave1:/root(slave1这台机器上,用的登录的账户root,在slave1这台机器/root这个路径下)

cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

将~id_rsa.pub 文件的内容追加到~/.ssh/authorized_keys这个文件的末尾

任务:slave1免密登录到master

1.在slave1里面ssh master  不需要密码

2.slave1公钥给master

3.给了master 公钥放到slave1的~/.sshauthorized_keys里面

4.slave1里面的使用ssh master

安装Hadoop环境

1.将hadoop- 3.4.0 复制到user local下

cp /cgsrc/hadoop-3.4.0.tar.gz /usr/local/

2.tar 用于打包、解包、压缩和解压完压缩文件

-zxvf

-z使用gzip解压

x 解压文件

v 显示解压过程

f 指定文件名

tar -zxvf hadoop-2.7.1.tar.gz

3.重命名

mv 原本的名字 新的名字

4.配置环境变量,vim ~/.bashrc

让环境变量生效,source ~/.bashrc

5.查看是否安装好hadoop

hadoop version

1. bin:存放操作命令,具体包含如下图(hdfs,mapred,yarn)

2.etc:所有配置文件

3.include:头文件

4.lib:本地库(native库)压缩的动态链接库

5.libexec:拓展库

6.sbin:集群相关的命令

7.share:学习的资料,文档

配置集群环境

在hadoop下面的etc 的配置文件

需要配置的文件有:wokers、core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml、hadooop-evn.sh

1.workers

里面写了谁可以放DataNode,数据节点

master slave1 slave2 slave3

2.core-site.xml

<configuration>
    <!-- 默认文件系统的内部URI地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master:9000</value>
    </property>
    <!-- hadoop的临时工作目录(默认一个月就会删掉) -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/usr/local/hadoop/tmp</value>
    </property>
</configuration>

3.hdfs-site.xml

<configuration>
    <!--secondary访问地址 -->
	<property>
		<name>dfs.namenode.secondary.http-address</name>
		<value>master:50090</value>
	</property>
    <!-- HDFS设置数据存储份数 -->
	<property>
		<name>dfs.replication</name>
		<value>3</value>
	</property>
    <!-- namenode的实际存储地址 -->
	<property>
		<name>dfs.namenode.name.dir</name>
		<value>file:/usr/local/hadoop/tmp/dfs/name</value>
	</property>
    <!--datanode的实际存储地址 -->
	<property>
		<name>dfs.datanode.data.dir</name>
		<value>file:/usr/local/hadoop/tmp/dfs/data</value>
	</property>
</configuration>

4.配置mapreduce的配置文件

<configuration>
    <!-- mapreduce程序运行在yarn上 -->
	<property>
		<name>mapreduce.framework.name</name>
		<value>yarn</value>
	</property>
    <!-- 任务运行的历史的服务端地址(内部通讯端口) --> 
	<property>
		<name>mapreduce.jobhistory.address</name>
		<value>master:10020</value>
	</property>
     <!-- 任务运行的历史的服务器web端地址 -->
	<property>
		<name>mapreduce.jobhistory.webapp.address</name>
		<value>master:19888</value>
	</property>
</configuration>

5.yarn-site.xml

<configuration>	
    <!-- Resource Manager的主机名 -->
	<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>master</value>
	</property>
    <-- NodeManager提供的辅助服务,运行MapReduce必配-->
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
    <-- NodeManager监控本地磁盘的健康-->
	<property>
  	<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
  	<value>98.5</value>
	</property>
</configuration>

6.配置hadoop-env.sh

export JAVA_HOME=/usr/local/java/jdk1.8.0_171
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

配置slave节点

-z用gzip解压压缩文件

-x

-c

tar-zcf 压缩文件的路径和名字 原本将被压缩的文件

tar -zcf ~/hadoop.master.tar.gz ./hadoop 

找一下启动命令是不是在hadoop/sbin里面

启动hdfs 

start-dfs.sh

master 上有namenode、 sencondary namenode,(datanode(多个)取决于workers里面有没有localhost)

slave1,2,3都有且只有datanode

访问172.18.0.2:9870

启动

启动yarn

start-yarn.sh

ResourceManager(1个)、NodeManager(多个)

172.18.0.2.8088

测试集群环境

启动hdfs   start-dfs.sh    关闭stop-dfs.sh

启动yarn   start-yarn.sh    关闭stop-yarn.sh

启动顺序 stop -dfs.sh --> start-yarn.sh

关闭顺序stop-yarn.sh--->stop-dfs.sh

测试hdfs上传文件

1.在一台机器上创建一个文档.txt

2.创建一个hdfs的文件夹

3.将文件上传到hdfs系统

hadoop fs -put 本地文件地址和名字 上传到hdfs的地址

4.查看文件上传成功

hadoop fs -cat 文件地址和文件名

hadoop fs -cat 文件地址和文件名

逻辑地址:hdfs系统  /pInput/pengwenjng.txt

物理地址:hdfs-site.xml里面记录了实际的物理地址(datanode的实际存放地址/usr/local/hadoop/tmp/dfs/data)

注意:/mInput/pengwenjng.txt     hdfs默认存3份   slave1,slave2,slave3,master(分配机制只存储了3台)

怎么去看

在availablity下面有的机器上例如上图()去执行如下步骤

最后的地址:/usr/local/hadoop/tmp/dfs/data/current/BP-1172179653-172.18.0.2-1774316468148/current/finalized/subdir0/subdir0

通过实际的物理地址查看文件

QQ邮箱 3451292220@qq,com (逻辑地址)登录到qq邮箱的网站,未读邮件

物理地址:(开发需要知道)

测试上传大文件

1.本地大文件上传到hdfs系统

2.把本地大文件上传到hdfs系统

3.需要知道大文件hdfs系统是怎么存放的

所有的文件都是按照块存储,大文件可能分为多个块进行存储如上图存储(例如上图,分为了2个块分别进行存放)

hdfs系统里面的/bigFiles/jdk-8u171-linux-x64.tar.gz   逻辑地址

物理地址,真正存放的路径 /usr/local/hadoop/tmp/dfs/data

通过物理地址查看文件(去两个块都存在的机器,)

cd /usr/local/hadoop/tmp/dfs/data/current/BP-1172179653-172.18.0.2-1774316468148/current/finalized/subdir0/subdir0

先到这个路径,可以看到两个新的块,将两个块合并

cat blk_1073741826 >> tmp.tar.gz

cat blk_1073741827 >> tmp.tar.gz

解压压缩包tar -zxvf tmp.tar.gz

测试wordcount

1.

2.hadoop-mapreduce-examples-3.4.0.jar wordcount /pInput/pengwenjing.txt /pOutput2

hadoop jar   jar包的位置     jar 里面的那个方法   hdfs输入路径   hdfs输出路径(一定不是之前的)

3.日志

.

多了一个pOutput2这个文件

mapreduce里面成功的记录到_SUCCESS  

如下图:可以看到统计了单词出现的频率

可以研究一下8088这个端口查看整个

4,第二章面试重点

1.常用端口号

hadoop3.X

HDFS NameNode内部通常端口 8020/9000/9820
HDFS NameNode 对用户查询的端口 9870
Yarn查看任务运行情况的 8088
历史服务器 19888

hadoop2.X

HDFS NameNode内部通常端口 8022/9000
HDFS NameNode对用户的查询端口 50070
Yarn查看任务运行情况的 8088
历史服务器 19888

2.常用的配置文件

3.x  core-site.xml    hdfs-site.xml    yarn-site.xml    mapred-site.xml  workers   /hadoop/etc/hadoop

2.x   core-site.xml    hdfs-site.xml    yarn-site.xml    mapred-site.xml  slaves

HDFS

一HDFS概述

1.1HDFS产生背景

    随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种。

1.2HDFS定义

    HDFS(Hadoop Distributed File System),它是一个文件系统,用来存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。

     HDFS的使用场景:适合一次写入,多次读出的场景,且不支持文件的修改。适合用来做分析,并不适合用来做网盘应用。

1.3HDFS优缺点

优点:

(1)高容错性

数据自动保存多个副本(3个)。它通过增加副本的形式,提高容错性。

某个副本丢失后,它可以自动恢复。

(2)适合处理大数据

1.数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据。

2.文件规模:能够处理百万规模以上的文件数量,数量相当之大。

(3)可构建在廉价机器上,通过多副本机制,提高可靠性。

缺点:

(1)不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。

(2)无法高效的对大量小文件进行存储

         存储大量小文件的话,他会占用NameNode大量的内存来存储文件目录和块信息。这样是不可取的,因为NameNode的内存总是有限的;(理解:书包含目录和具体内容)

         小文件的存储寻址时间会超过读取时间,它违反了HDFS的设计目标

(3)不支持并发写入(同时写第一行),文件随机修改。

        一个文件只能有一个写,不允许多个线程同时写;

        仅支持数据append(追加),不支持文件的随机修改。

1.4HDFS组织架构

NameNode(nn):就是Master(老板),它是一个主管、管理者。

  1.       管理HDFS的名称空间
  2.      配置副本策略;
  3.      管理数据块(Block)映射信息;
  4.      处理客户端的读写请求

DataNode:就是Slave.NameNode下达命令,DataNode执行实际的操作。

  1.       存储实际的数据块
  2.       执行数据块的读写操作

Client:就是客户端。

  1.      文件切分。文件上传HDFS的时候,Client将文件切分成为一个一个的Block,然后进行上传;
  2.      跟NameNode交互,获取文件的位置信息。
  3.      与DataNode交互,读取或者写入数据;
  4.      Client提供一些命令来管理HDFS,比如NameNode格式化
  5.      Client可以通过一些命令来访问HDFS,比如对HDFS增删改查操作

Secondary NameNode:并非NameNode的热备,当NameNode挂掉的时候,它不能马上替换NameNode并提供服务

  1.      辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode
  2.      在紧急情况下,可辅助恢复NameNode

内存  硬盘

Fsimage(硬盘):为了防止断电数据丢失,

Edits:

Fsimage1+Edits+Edits2同步到NameNode

1.5HDFS文件块大小(面试重点)

HDFS中文件在物理上是分块存储(Block),块的大小可以通过配合参数(dfs.blocksize)来规定默认大小,在Hadoop新版里(2.x)是128M,老版本是64M

block1    block2   block3    block4    ......      block66

如果寻址时间约为10ms,即查找到目标block的时间约为10ms

寻址时间为传输时间的1%时,则为最佳状态。因此,上面的传输时间=10ms/0.01=1000ms=1s

目前磁盘的输出速率普遍为100MB/s     块大小:100M/s  *  1s =100M     128M

普通机械硬盘  80M/s - 90M/s                块大小:80M/s  *  1s =80M   80M-90M    64M,128M

固态硬盘     200M/s-300M/s                  块大小:200M/s  *  1s =200M   200M-300M    256M

思考:为什么块的大小不能设置太小,也不能设置太大?

(1)HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置

(2)如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始新的位置所需要的时间(寻址时间)。导致程序在处理这块数据时,会非常慢

总结:HDFS块的大小设置主要取决于磁盘传输的速率

1.6HDFS的shell操作(10分)

1.基本语法

bin/hadoop fs 具体的命令

bin/hdfs dfs    具体的命令

2.常用的操作命令

(0)启动Hadoop集群

sbin/start-dfs.sh  启动hdfs

sbin/start-yarn.sh  启动yarn

(1)-help 输出这个命令的参数

(2)-ls:显示目录信息

hadoop fs -ls

(3)-mkdir:在HDFS上创建目录

(4)moveFromLocal:从本地剪切粘贴到hdfs

(5).-appendToFile:追加一个文件到已经存在的文件的末尾

1.要有一个本地文件

touch文件名(创建一个空文件)

vi文件名(编辑这个文件)

cat文件名(查看这个文件)

2.把本地文件追加到已经存在的HDFS系统里面的文件的末尾

hadoop fs -appendToFile 本地文件和地址 HDFS已经存在的文件路径以及文件名

(6).-cat 文件名:查看文件

(7). -chgrp, -chmod,  -chown:Linux文件系统中的用法一样,修改文件所属权限(ch代表change,grp代表group,mod代表mode,own代表所有者)

-chgrp[R]组名 文件或目录名:改变文件所属的组别

chown 改变文件夹所属者的命令

hadoop fs -chown文件所有者:组别的名字 hdfs的文件或者文件夹       

chmod [用户类别][操作符][权限]  文件/目录

1.用户类别

u:所有者(user)

g:所属者组(group)

o:其他用户(others)

a:所有用户(all,默认值)

2.操作符

+:添加权限

-:移除权限

=:直接设置权限(覆盖所有权限)

3.权限

r:读

w:写

x:执行

原本的权限 语句 新的权限
rw- chmod u+x script.sh rwx
rw- chmod g-w data.txt r--
rwx chmod o=r file.txt r--

二数字模式

chmod [数字组合] 文件/目录

1.权限对应数字

r=4

w=2

x=1

-(无权限)  =0

2.数字组合规则

将三类用户(所有者,组,其他用户)的权限,得到三位数字

第一位:所有者权限

第二位:组权限

第三位:其他用户权限

例1:rwxr-xr--

所有者:rwx 4+2+1=7

组:       r-x  4+0+1=5

其他用户:r--  4+0+0=4

例2: rw-r--r--

所有者:rw- 4+2+0=6

组:       r--  4+0+0=4

其他用户:r--    4+0+0=4

hadoop fs -chmod 666 文件名:将这个文件的三类用户都给了读,写的权限

(8). -copyFromLocal: 从本地文件系统中拷贝到HDFS路径去。

hadoop fs -copyFromLocal  本地文件 HDFS的路径(要放的路径下)

(9)-copyToLocal:从HDFS拷贝到本地

(10)-cp:从HDFS的一个路径拷贝到HDFS的另一个路径

(11)-mv:在HDFS目录中移动文件

(12)-get等同于copyToLocal,就是从HDFS下载到本地

(13)-put:等同于copyFromLocal

hadoop fs -put 本地文件 上传到HDFS的系统位置

(14)-tail:显示一个文件的末尾

(15)-rm:删除文件或者文件夹

删文件夹及文件夹下面所有的文件:

hadoop fs -rm -r HDFS文件夹路径及名字

(16)-du统计文件夹的大小信息

(17)-setrep:设置HDFS文件的副本数量

如果想要在web端可以直接修改文件副本数,需要将文件的other给到写的权限

Maven:版本管理器,jdk1.8,xml文件

(18)创建空文件(作业)

hadoop fs -touchz 空文件的名字

1.7HDFS客户端操作

集群(集群,master,slave1,slave2,slave3)

windows(不属于集群的机器)  (hdfs客户端代码   ) ---》

1.pom.xml文件

<!--dependencies整个项目的依赖,maven会去找junit 4.12-->
<dependencies>
    <!-- junit 单元测试-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <!--log4j打印日志, -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <!-- hadoop的客户端-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

2.log4j.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>
 
    </Appenders>
 
    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>
 
        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>
 
</Configuration>
package org.pengwenjing;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;

public class HdfsClient {
    @Test
    public void testmkdir() throws IOException, InterruptedException {
        //1.创建HDFS客户端,传入uri,配置文件,用户,请求连接172.18.0.2(namenode所在的ip)的集群
        FileSystem fileSystem = FileSystem.get(URI.create("hdfs://172.18.0.2:9000"),new Configuration(),"root");
        //2.第一步连接已经成功,操作集群,所以去创建一个叫xiyouji的文件夹
        fileSystem.mkdirs(new Path("/xi you ji"));
        //3.关闭连接
        fileSystem.close();
    }
}

API操作

public void testCopyFromLocalFile() throws URISyntaxException, IOException, InterruptedException {
        //1.连接集群的name node地址
        URI uri=new URI("hdfs://172.18.0.2:9000");
        //创建一个配置文件
        Configuration configuration=new Configuration();
        //登录用户
        String user="root";
        //获取到客户端对象
        FileSystem fileSystem = FileSystem.get(uri,configuration,user);
        //2.copyFronLocalFile从本地文件上传到HDFS系统的方法
        //参数1:表示要不要删掉本地的原始数据,参数2:是否可以覆盖
        //参数3:要上传的本地路径的地址  , 参数4:HDFS参数的地址
        fileSystem.copyFromLocalFile(false,true,new Path("/root/IdeaProjects/HdfsDemo/src/main/resources/pengwenjingApi.txt"),new Path("/xiyouji"));
        //3.关闭资源
        fileSystem.close();
    }
package org.pengwenjing;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HDFSClientV1 {
    private FileSystem fileSystem;

    @Before
    public void init() throws URISyntaxException, IOException, InterruptedException {
        //1.连接HDFS
        URI uri=new URI("hdfs://172.18.0.2:9000");
        Configuration configuration=new Configuration();
        String user="root";
        //get方法里面可能没有三个参数
        fileSystem = FileSystem.get(uri,configuration,user);
    }

    @After
    public void close() throws IOException {
        //3.
        fileSystem.close();
    }

    @Test
    public void testCopyFromLocalFile() throws URISyntaxException, IOException, InterruptedException {

        //2
        fileSystem.copyFromLocalFile(false,true,new Path("/root/IdeaProjects/HdfsDemo/src/main/resources/pengwenjingApi.txt"),new Path("/xiyouji"));

    }
}

hdfs参数优先级

参数优先级排序:(1)客户端代码中设置的值>(2)ClassPath下用户自定义配置文件   >  (3)服务器的自定义配置(hadoop/etc/hadoop/xxx-site.xml)>(4)服务器的默认配置(xxx-default.xml)

hdfs文件下载

public void testCopyToLocalFile() throws IOException {
        //从HDFS系统下载文件到本地
        //参数1 delsrc:源文件是否删除  ,参数2  src:HDFS里面要下载的文件路径
        //参数3 dst:文件要下载到本地的什么路径下 参数4:useRawLocalFileSystem:是否开启文件校验
        //参数4,如果不开启校验,下载的文件会多一个crc结尾的校验文件;选择了true,就不会生成crc校验文件
        fileSystem.copyToLocalFile(false,
                new Path("/wangzherongyao/fayu/maKeboluo.txt"),
                new Path("/root/IdeaProjects/HdfsDemo/src/main/resources/"),false);
    }

作业:测试两个boolean类型,如果修改成true会怎么样

HDFS删除

//hdfs里面的删除
public void testDelete() throws IOException {
        //参数1:要删除的路径  参数2:是否递归删除
        fileSystem.delete(new Path("/xiyouji/pengwenjingApi.txt"), false);
    }

文件名字的修改

原本名字:

操作步骤:

 @Test
    public void testRename() throws IOException {
        //文件名字的修改
        //参数1:HDFS系统里面原本文件的路径和名字,参数2:hdfs系统里面修改后的路径和名字
        fileSystem.rename(new Path("/wangzherongyao/fayu/luban.txt"),
                new Path("/wangzherongyao/fayu/ludan.txt"));
    }

结果图:

移动文件:

原本:

移后:

HDFS文件详情查看

查看文件名称、权限、长度、块信息

查看根目录的:

输出结果:

查看/bigFiles的:

输出结果

代码注释:

 public void testListFiles() throws IOException {
        //listFiles返回该目录下所有文件和子目录的详细信息,包括文件的长度,块大小,备份数,修改时间,所有者,权限
        //参数1:HDFS系统文件夹的位置以及名字  ,参数2:boolean recursive:是否递归遍历子目录,设置为true时,会返回路径下所有子目录中的文件,false时仅返回当前目录下的直接文件。
        //左边:右边返回的所有文件的信息都存储在listFiles对象里面
        RemoteIterator<LocatedFileStatus> listFiles=fileSystem.listFiles(new Path("/bigFiles"),true);
        //遍历listFiles
        while(listFiles.hasNext()){
            //获取到listFiles里面具体的某条信息,存储在status对象里面。
            LocatedFileStatus status=listFiles.next();
            //输出详细信息
            //文件名称
            System.out.println(status.getPath().getName());
            //文件长度
            System.out.println(status.getLen());
            //文件权限
            System.out.println(status.getPermission());
            //文件分组
            System.out.println(status.getGroup());
            System.out.println("---------beautiful-----------");
            
            //获取存储块的信息,同一个文件的所有块信息都存储在blockLocations对象里面
            BlockLocation[] blockLocations=status.getBlockLocations();
            //对于大文件,一个文件有多个块的情况。所以我们需要遍历blockLocations,去获取具体某一块的信息
            for(BlockLocation blockLocation:blockLocations){
                //获取存储这个块的host(一般是有多少个备份就有多少个节点)
                String[] hosts=blockLocation.getHosts();
                遍历所有节点的host
                for(String host:hosts){
                    System.out.println(host);
                }
            }
            System.out.println("++++++++yitiaoxinxijieshu++++++++");
        }

判断是否为文件:

结果:

代码注释:

 public void testListStatus() throws IOException {
        //获取在hdfs系统的/wangzherongyao/fayu路径下,所有文件以及文件夹的状态
        FileStatus[] listStatus=fileSystem.listStatus(new Path("/wangzherongyao/fayu"));
        //遍历listStatus
        for(FileStatus fileStatus:listStatus){
            //isFile()方法,判断是不是文件
            if(fileStatus.isFile()){
                System.out.println("file:"+fileStatus.getPath().getName());
            }else{
                System.out.println("wenjianjia"+fileStatus.getPath().getName());
            }

        }
    }

查看是否为文件夹

判断文件是否存在

File file = new File("/");

if(file.exists()){

    System.out.println("cunzai");

}

1.8 hdfs写数据流程(面试重点)

(1)客户端通过Distributed FileSystem 模块向NameNode请求上传文件,Namenode检查目录文件是都已经存在,父目录是否存在

(2)NameNode返回是否可以上传

(3)客户端请求第一个Block上传到哪几个DataNode服务器上

(4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3

(5)客户端通过FSDataOutStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成

(6)dn1,dn2,dn3逐级应答客户端

(7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每上传以一个packet会放入一个应答队列等待应答

(8)当一个block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器(重复执行3-7 )

网络拓扑-节点距离计算

在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。那么这个最近距离怎么计算?  

机架感知(副本存储节点选择)

机架感知(replication副本存储节点xuanze)数据可靠性、传输速率快

副本节点选择:

第一个副本client所处的节点上,如果客户端在集群外,随机选一个。(上传速度最快)

第二个副本在另一个机架的随机一个节点。(数据可靠性)

第三个副本在第二个副本所在机架的随机节点(速度)

1.9hdfs读取数据流程

(1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的NameNode地址

(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据

(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)

(4)客户端以Packet为单位接受,先在本地缓存,然后写入目标文件 

1.10namenode (nn)和secondarynamenode(2nn)

NameNode中元数据存储在哪儿?(内存,磁盘)

NameNode的元数据(磁盘+内存):FsImage存储数据+edits追加修改(关机或者checkpoint这两个合并到fsimage,开机的时候就会将fsimage加载到内存)

第一阶段:NameNode启动

(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志(edits)和镜像文件(Fsimage)到内存。

(2)客户端对元数据进行增删改的请求

(3)NameNode记录操作日志,更新滚动日志

(4)NameNode在内存中对元数据进行增增删改

第二阶段:Secondary NameNode工作

(1)Secondary NameNode询问NameNode是否需要checkPoint。直接带回NameNode是否检查结果

(2)Secondary NameNode请求执行CheckPoint

(3)NameNode滚动正在写的Edits日志

(4)将滚动前的编辑日志和镜像文件拷贝到SecondaryNameNode

(5)SecondaryNameNode加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint

(7)拷贝fsimage.chkpoint到NameNode

(8)NameNode将fsimage.chkpoint重新命名为fsimage

1.11DataNode工作机制

(1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳

(2)DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息()

(3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个块数据。如果超过10分钟没有收到某个DataNode心跳,则认为该节点不可用(datanode跟NameNode报告,我还活着)

(4)集群运行中可以安全加入和退出一些机器(DataNode10-100)

MapReduce(40分)

MapReduce编程模型

1.1MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

1.2MapReduce优缺点

1.2.1优点

1.MapReduce易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行

2,良好的拓展性

当你的计算机资源不能得到满足时,你可以通过简单的增加机器来扩展他的计算能力

3.高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成

4.适合PB级以上海量数据的离线处理

可以实现上千台服务器集群并发工作,提高数据处理能力

1.2.2缺点

1.不擅长实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

2.不擅长流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的

3.不擅长DAG(有向图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下

1.3MapReduce进程(172.18.0.2:8088)

一个完整的MapReduce程序在分布式运行时有三类实例进程:

(1)MrAppMaster:负责在整个程序的过程中调度及状态协调。(yarn中的一个)

(2)MapTask:负责Map阶段的整个数据处理流程

(3)ReduceTask:负责Reduce阶段的整个数据处理流程

1.4常用数据序列化类型

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable

1.5MapReduce编程规范

用户编程的程序分为三个部分:Mapper,Reducer和Driver。

1.Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据时KV(键值对)对的形式(kv的类型可自定义) <0,I am somebody>

(3)Mapper中的业务逻辑写在map()方法中

(4)Mapper的输出数据是KV对的形式(KV的类型可以自定义)(<LongWritable,Text,Text,IntWritable>前两个是输入的key和value,后两个是key和value)

  (5)map()方法(MapTask进程)对每一个<k,v>调用一次

2.Reduce阶段

(1)用户自定义的Reduce要继承自己的父类

(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是kv

(3)Reducer的业务逻辑写在reduce()方法中

(4)ReduceTask进程对每一组相同的k的<kv>组调用一次reduce()方法

3.Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce

程序相关运行参数的job对象

Yarn中MapReduce作业运行机制

wordcount案例

统计文本文件单词的个数,输出每个单词出现的总次数

I am somebody

I am smart and kind

I am important

I am starve of education

I have palces to go

I have people to impress

I have world to change

mapper

0,I am somebody

mapper

key1代表偏移量,v1这一行的内容

<0,I am somebody>

0 1 2 3 4 5 6 7 8 9 10 11 12 13
I a m s o m e b o d y 换行符

<14I am smart and kind>

14 15 16 17 18
I a m s m a r t a n d

<29,I am important>

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//输入的key类型:LongWritable
//输入的value类型:Text
//输入的key类型:Text
//输出的value类型:IntWritable

//输出<0,I am sombody>
//输出:
//<I,1>
//<am,1>
//<somebody,1>
public class wordCountMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value,.Context context) throws IOException, InterruptedException {
        //1.toString()将数据转成String类型
        String line = value.toString();
        //split(" ")切割,以空格切割
        String[] words = line.split(" ");// I am somebody
        //for(小的部分的类型 word你自己给小部分起的名字:代表我们要遍历的对象)
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));//IntWritable(1)表示出现次数初始为1
        }
    }
}

Reducer

mapper输出

//输出<0,I am sombody>
//输出:
//<I,1>
//<am,1>
//<somebody,1>

//<I,1>

//<I,1>

//<I,1>

shuffle

<I,<1,1,1,1,1,1,1>>

Reducer的输入

<I,<1,1,1,1,1,1,1>> key是单词,value是所有的集合

Reducer的输出

<I,7>key是单词,value是个数

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
//reducer的输入key的类型:Text
//reduce的输入value类型:IntWritable
//reduce的输出key类型:Text
//reduce的输出value类型:IntWritable
public class wordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text k3,Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
        //累加:初始为0
        int total =0;
        //v3:<1,1,1,1,1,1,1>
        for(IntWritable v:v3){
            total+=v.get();//v.get()=1
        }
        //输出
        context.write(k3,new IntWritable(total));
    }
}

Driver(要记住)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class wordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1.获取配置信息以及封装任务,获取job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2.设置jar包加载路径(就是它这个类生成的class文件)
        job.setJarByClass(wordCountDriver.class);

        //3.设置Mapper和Reduce类生成的class文件
        job.setMapperClass(wordCountMapper.class);
        job.setReducerClass(wordCountReduce.class);

        //4.设置map输出的key和value的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5.设置最终输出的key和value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6.设置输入和输出路径(输入可以有多个文件Paths)
        FileInputFormat.setInputPaths(job, new Path("/root/IdeaProjects/mapReduceTest/src/main/resources/pengwenjing.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/root/IdeaProjects/mapReduceTest/src/main/resources/output"));
        
        //7.提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

作业:

输入数据:

学号,姓名,第一次提交成绩 ,第二次提交成绩,第三次提交成绩

01,xiaoming,45,60,80

02,xiaoli,70,67,80

03,xiaohuang,80,60,80

04,xiaolan,90,89,80

输出:

学号最高成绩

01 80

02 80

03 80

04 90

  

本地测试

输入文件为本地,输出文件为本地

输入文件地址为:/root/IdeaProjects/mapReduceTest/src/main/resources/pengwenjing.txt

输出文件地址为:/root/IdeaProjects/mapReduceTest/src/main/resources/output

执行Driver类 会生成一个output文件

测试使用HDFS系统的文件作为输入输出

1.需要启动hdfs系统

2.修改Driver类的输入输出路径如下

3.结果如下

在集群上测试:

1.启动HDFS系统

2.启动yarn

3.修改代码

driver类的输入输出路径:

修改pom.xml文件,让maven下载依赖(需要注意的点,mainClass必须是你写的Driver类的名字)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>mapReduceTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>mapReduceTest Maven Webapp</name>
    <url>http://maven.apache.org</url>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
            <exclusions>
                <!-- ’d Log4j 1.x -->
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <!-- ’d SLF4J ’ Log4j 1.x „e¥ -->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <finalName>mapReduceTest</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin </artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>wordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

4.打包

打包之前必须确认整个项目没有报红(打包之前没有target)

双击如下图的package,执行打包操作

生成了target,并且下面有生成两个jar包

5.目前生成的jar是在cg这台电脑里面,我们需要将它上传到master执行

执行命令如下:

scp /root/IdeaProjects/mapReduceTest/target/mapReduceTest-jar-with-dependencies.jar root@172.18.0.2:/usr/local/hadoop

确保master已经上传了jar包

6.执行jar包

hadoop jar mapReduceTest-jar-with-dependencies.jar /wangzherongyao/fayu/ludan.txt /wangzherongyao/fayu/ludanOutput

7.结果如下:

MapReduce核心框架原理

一、InputFormat数据输入

inputFormat作用:切片,为Mapper提供输入数据

Block存放,

1.切片

MapTask个数,取决于有多少个切片

MapTask并行度决定Map阶段的任务处理并发度,进而影响整个JOB处理速度。MapTask不是越多越好

1.MapTask并行度决定机制

      数据块(Block):Block是HDFS物理上把数据分成一块一块的

      数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储

MapReduce并行制度

  (1)一个Job的Map阶段并行度(mapTask有多少个)由客户端在提交Job时的切片数决定

  (2)每一个Split切片 分配一个MapTask并行实例(exist考试)

  (3)默认情况下,切片大小分=BlockSize

  (4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

输出:data1.txt   300M,data2.txt   1M

设置切片大小为128m

切片1:0-128m 切片2:129-256m 切片3:257-300 切片4:0M-1M

MapTask有4个

切片大小的公式:

Math.max(minSize,Math.min(maxSize,blockSize))    (minSize默认为1,maxSize默认为Long的最大值)

Math.min(maxSize,blockSize)=128  (Long的最大值,128)他们两谁小?  128

Math.max(minSize,128)=128  (比较1和128谁大)

2.FileInputFormat实现类

InputFormat是一个抽象类,定义了一个MapReduce作业必须实现的标准规范

FileInputFormat同样是一个抽象类,它继承自InputFormat

   FileInputFormat常见的接口实现类包括:TextInputFormat、keyValueTextinputFormat、

NLineinputFormat、ConbineTextinputFormat和自定义InputFormat

TextInputFormat是默认的FileInputFormat实现类

MapReduce的默认输入格式是? TextinputFormat

二、输出数据OutputFormat

OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口

OutputFormat的实现类有TextOutputFormat,SequenceFileOutputFormat,自定义OutputFromat

默认的输出格式是TextOutputFormat,他把每条记录都写在文本行

三、Shuffle

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

  3.1分区Partition

(1)问题引入

   要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(2)默认Partitioner分区

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区

什么是分区?  Mapper任务划分数据的过程称为Partition。负责实现数据的类称为Partitioner,默认的分区是Hash分区(Hash Partition)

Partition作用:将map阶段产生的所有<key,value>对分配给不同的Reducer处理,可以将Reduce阶段的处理负载进行分摊。

这里回答前面的问题,什么决定Reduce任务里面的数量,答案是:Partition的数量决定Reducer的数量

一般Reduce的任务数默认值是1,用户可以通过job.setNumReduceTasks(数字)去设置Reduce的个数。

3.2Combiner

3.3Shuffle完整流程理解

归并排序

在分区前有溢写。

2、3部分属于Shuffle

Yarn

用start-yarn.sh命令启动YARN之后,用jps命令查看YARN的基本组件,包括:ResoureManager和NodeManager

通过8088端口可以查看YARN的Web监控页面,该页面包含了YARN集群的基本信息、所有应用的基本信息等

下图为YARN的架构图,它由Container、ResourceManager、NodeManager、ApplicationMaster几个主要部分组成。

YARN的架构是从主从架构,主机为ResourceManager,从机为NodeManager,其中ResourceManager负责接收客户端的作业请求以及为作业分配相应的NodeManager资源,在NodeManager启动Container资源容器,在资源容器中运行相关作业

(1)Container(容器):YARN中资源包括内存、CPU、磁盘输入输出等等。Container是YARN中资源的抽象,它封装了某个节点上的多维度资源

(2) ResourceManager(资源管理器):
ResourceManager负责整个系统的资源分配和管理,是一个全局的资源管理器。主要由两个组件构成:调度器和应用程序管理器:
 
调度器(Scheduler):
调度器根据资源情况为应用程序分配封装在Container中的资源。
 
应用程序管理器(Application Manager):
应用程序管理器负责管理整个系统中所有应用程序。
 
(3) NodeManager(节点管理器)
NodeManager是每个节点上的资源和任务管理器。
定时向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态;
接收并处理来自ApplicationManager的Container启动/停止等请求。

(4)ApplicationMaster(主应用)
 
ApplicationMaster是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。
 
用户提交的每一个应用程序均包含一个ApplicationMaster。
 
主要功能包括:
1)、与ResourceManager调度器协商以获取抽象资源(Container);
2)、负责应用的监控,跟踪应用执行状态,重启失败任务等;
3)、并且与NodeManager协同工作完成Task的执行和监控。
 
yarn中应用运行机制

yarn监控

Yarn调度器

YARN调度器分三种:

(1)FIFO Scheduler 先进先出调度器

(2)Capacity Scheduler  容器调度器(可以看作FIFO Scheduler多个队列版本)(yarn默认采用容器调度器)

(3)Fair Scheduler   公平调度器

三种调度器比较

调度器 工作方法
FIFO Scheduler

(1)单队列

(2)先进先出的原则

Capacity Scheduler

(1)多队列

(2)计算能力调度器,选择资源使用量占最小、优先级高的先执行

(3)多用户的情况下,可以最大化集群的吞吐和利用率

Fair Scheduler

(1)多队列

(2)公平i调度,所有的job具有相同的资源

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐