1.大数据技术发展


大数据技术要面对的基本问题,也是最核心的问题:

就是是海量数据如何可靠存储和高效计算

②易观OLAP(比赛,可不记)

2.Goole的“三驾马车”


一台计算机变成多台计算机

①GFS:The Google File System(GFS思想生成了HDFS)

MapReduce:Simplified Data Processing on Large Clusters 大型集群上的简单数据处理

Bigtable :A Distributed Storage System for Structured Data 一个分布式的结构化数据存储系统

Chunk Server是存数据

Master是管理

GFS架构:
 
(1)GFS Master节点管理所有的文件系统元数据,包括命名空间、访问控制权、文件和块的映射信息以及当前块的位置信息。
 
(2)GFS存储的文件都被分割成固定大小的块,每个块都会被复制到多个块服务器上(可靠性)。块的冗余度默认为3

(3)GFS Master还管理着系统范围内的活动,比如块服务器之间的数据迁移等

(4)GFS Master 与每个块服务器通信(发送心跳包),发送指令。获取状态

副本的位置选择的策略要满足两个目标:最大化数据的可靠性和可用性。

③MapReduce的思想

MapReduce采用“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个子节点共同完成,然后整合各个子节点的中间结果,得到最终的计算结果。MapReduce就是“分散任务,汇总结果”。

Map的输出是Reduce的输入

④BigTable

3.Hadoop的概述
①hadoop是什么?什么是分布式计算基础框架:不是一台就能搞定的


(1)hadoop是一个由Apache基金会所开发的分布式计算基础框架

(2)主要解决,海量数据的存储和海量数据的分析计算问题。所以适合大数据

(3)广义上来讲,Hadoop通常是指一个更广泛的概念--Hadoop生态圈。

②Hadoop根据是Google的三篇论文实现
HDFS → GFS

MapReduce → MapReduce

HBase → BigTable

(1)HDFS:Hadoop Distributed File System,是Hadoop项目的核心子项目,是分布式计算中数据存储管理的基础

4.Hadoop的发展


5.Hadoop的优势


- 高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失。

- 高扩展性:在集群分配任务数据,可方便的扩展数以千计的节点。

- 高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。

- 高容错性:能够自动将失败的任务从新分配。

6.Hadoop生态圈

组件 功能
HDFS 分布式文件系统
YARN  资源管理和调度器
MapReduce 分布式并行编程模型
HBase Hadoop上的非关系型的分布式数据库
Hive Hadoop上的数据仓库
Pig   一个基于Hadoop的大规模数据分析平台,提供类似SQL的查询语言Pig Latin
Flume   一个高可用的,高可靠的,分布式的海量日志采集,聚合和传输的系统
Sqoop  用于在Hadoop与传统数据库之间进行数据传递
Zookeeper 提供分布式协调一致性服务
Spark 类似于Hadoop MapReduce的通用并行框架


NameNode:存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间,副本数,文件权限),以及每个文件的块列表和块所爱的DataNode等

DataNode:在本地文件系统存储文件块数据,以及块数据的校验和。

Secondary NameNode:每隔一段时间对NameNode元数据备份((判断题)高可用:保持)

7.Yam的框架启动yarn会多一个R,多个N
ResourceManager:整个集群资源(内存,cpu等)的老大:(1个)

NodeManager单个节点服务器资源老大:(多个)

ApplicationMaster:单个任务运行的老大。

ApplicationMaster的作用:

(1)负责数据切分(2)为应用程序申请资源并分配内部任务(3)任务的监控与容错

Container:容器,相当于一台独立的服务器,里面封锁了任务运行所需要的资源,如内存,cpu,磁盘,网络等

1.4.3 Mapreduce架构
Mapreduce将计算过程分为两个阶段:Map和Redce

Map和Reudce中间有一个sshuffle过程

Map阶段并行处理输入数据

Reduce阶段对Map结果进行汇总
 

2.配置分布式模式

 1.配置分布式模式

HDFS:NameNode(1个)      DataNode(多个)   SecondaryNameNode(1个)

NameNode和SecondaryNameNode尽量不要安装在同一台服务器

因为有一个坏了就都坏了

YARN:ResourceManager(1个)  NodeManager(多个)

ResourceManager很消耗内存,尽量不要和NameNode,SecondaryNameNode放在同一个机器上

hadoop1 hadoop2 hadoop3 hadoop4

NameNode

DataNode DataNode

DataNode

SecondaryNameNode

NodeManager

NodeManager

ResourceManager

NodeManager NodeManager

2.ping ip地址:ICMP协议,测试两台计算机之间的连通性(OSI第三层)

ping master

ssh

Secure Shell(安全外壳协议)

一台电脑 ——> 另外一台电脑(有密码才可以)

客户端——>服务端

场景:陈长湦 寄一个箱子给高姗姗,这个箱子比较私密,中途不能让人打开,只能高姗姗打开

1.高姗姗打造一把锁(公钥)和一把钥匙(私钥)。

2.高姗姗把这把锁(公钥)陈长湦,但是钥匙没有给,高姗姗自己藏好了(把公钥放到服务器)

3.陈长湦把箱子用高姗姗给他的那把锁锁起来了。(用公钥加密)

4.箱子寄出去,中途不会有人打开

5.只有高姗姗可以用钥匙打开

高姗姗:客户端

陈长湦:服务端

1.

生成公钥和私钥

ssh-keygen -t rsa

.ssh文件夹下的文件功能解释:

known_hosts 记录ssh访问过计算机的公钥(public key)
id_rsa 生成的私钥
id_rsa.pub 生成的公钥
authorized_keys

存放授权过的无密码登录服务器公钥

2.

将公钥文件追加到另一个文件authorized_keys中

cat ./id_rsa.pub >> /authorized_keys

3.scp从一台电脑的~/.ssh/id_rsa.pub的文件,复制到slave1这台电脑上,用户是root放到了slave1里面的/root这个路径下面

scp ~/.ssh/id_rsa.pub root@slave1:/root

master登录slave1,slave2,slave3(免密)

任务:slave1登录到master(没有密码)

实现场景:在slave1里面执行命令ssh master,就可以不输入密码master

1.slave1生成私钥和公钥

2.slave1的公钥给到master

scp

3.在master上,公钥给放到~/.ssh/authorized_keys

4.去slave1里面测试ssh master

3.环境测试

启动hdfs       start-dfs.sh

namenode(1个)      secondary namenode(1个)   datanode(slave1,slave2,slave3,master)

启动yarn      start-yarn.sh

resoucemanager(1个)          nodemanager(master,slave1,2,3)

启动顺序:start-dfs.sh   -》 start-yarn.sh

关闭的顺序:stop-yarn.sh -》stop-dfs.sh

1.常用端口号

hadoop3.X

172.18.0.2:9870     172.18.0.2(namenode所在机器的ip)      9870是默认的web端访问hdfs的端口

172.18.0.2:8088      172.18.0.2(resouce manager所在电脑的ip地址)   8088是默认的web端访问yam的端口号

测试hdfs上传文件

1.创建一个本地文件penglian.txt

2.创建一个hdfs的文件夹

hadoop fs -mkdir文件夹的路径和名字

3.将本地文件上传到hdfs系统里面的文件夹里

hadoop fs -put 本地文件的路径和名字 要上传到hdfs这个系统的具体路径

4.查看文件是否上传成功

引入:QQ邮箱  123456@qq.com,逻辑地址(并不是真正意义上存放文件的地址)

物理地址:真实存放文件的地址

hdfs系统

逻辑地址:/pinput/penglian.txt

物理地址:hdfs里面配置了/usr/local/hadoop/tmp/dfs/data

代码:file:/usr/local/hadoop/tmp/dfs/data

/usr/local/hadoop/tmp/dfs/data/current/BP-169714833-172.18.0.2-1774841890238/current/finalized/subdir0/subdir0

问题:hdfs存放数据,datanode真正存放数据的节点。hdfs默认存三份

master,slave1,slave2,slave3存在那几台机器上了?如下图,存放位置在master,slave1,slave3上面

测试大文件上传

1.本地有一个大文件

cp被复制文件的地址和文件名 粘贴到哪儿的地址

2.在HDFS系统里面创建一个新的文件夹

3.本地大文件上传到新的文件夹

4.查看,反思

hdfs系统存放文件是按照块存储。如果是大文件,分成多个块,分别存储。

hdfs:/bigFiles/jdk-8u171-linux-x64.tar.gz

物理地址(实际存放的位置)/usr/local/hadoop/tmp/dfs/data         hdfs-site.xml

找相同的机器上操作

cat bik_1073741826 >> tmp.tar.gz    将blk_1073741826追加tmp.tar.gz

tar -zxvf tmp.tar.gz

<property>
  <name>yarn.app.mapreduce.am.env</name>
  <value>HADOOP_MAPRED_HOME=/usr/local/hadoop</value>
</property>
<property>
  <name>mapreduce.map.env</name>
  <value>HADOOP_MAPRED_HOME=/usr/local/hadoop</value>
</property>
<property>
  <name>mapreduce.reduce.env</name>
  <value>HADOOP_MAPRED_HOME=/usr/local/hadoop</value>
</property>

wordcount测试

副线任务:

修改配置文件mapred-site.xml

为什么要配这那个属性?

主线任务:

上一节课上传了小文件,上传到HDFS(/pinput/penglian.txt),我们统计这个文件里面每个单词出现的频率

1.确保你的HDFS系统里面/pinput/penglian.txt这个文件是存在的

2.测试wordcount是否执行结果

cd /usr/local/hadoop/share/hadoop/mapreduce/

hadoop jar hadoop-mapreduce-examples-3.4.0.jar wordcount /pInput/penglian.txt /pOutput
hadoop jar  jar包的名字  调用方法的名字(wordcount)    输入路径(/pInput/penglian.txt 这个输入文件必须存在)     输出路径(必须不存在,如果已经存在会报错)

3.解读wordcount运行过程和结果展示

mapreduce:map+reduce

是否成功:job completed successfully

多了一个pOutput,说明有输出

真正的输出结果在part-r-00000里面

查看结果

hadoop fs -cat文件路径和文件名

4.面试重点

1.常用端口号

hadoop3.X

HDFS NameNode内部通常端口 8020/9000/9820

HDFS NameNode对用户的查询端口

9870

Yarn查看任务运行情况的 8088
历史服务器 19888

hadoop2.X

HDFS NameNode内部通常端口 8020/9000
HDFS NameNode对用户的查询端口 50070
Yarn查看任务运行情况的

8088

历史服务器 19888

2.常用的配置文件

3.x core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml workers  (/hadoop/etc/hadoop)

2.x core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml slaves

HDFS

一.HDFS概述:

1.1 HDFS产生背景:

随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。

HDFS只是分布式文件系统管理中的一种

1.2 HDFS定义:

HDFS(Hadoop Distributed File System)它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色

HDFS的使用场景:适合一次写入,多次读出的场景,且不支持文件的修改,适合用来做数据分析,并不适合来做网盘应用。

1.3 HDFS优缺点

优点:

1.高容错性:

—数据自动保存多个副本。它通过增加副本的形式,提高容错性

—某一个副本丢失以后,它可以自动恢复。

2.适合处理大数据(不适合处理大量小文件)

—数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据;

—文件规模:能够处理百万规模以上的文件数量,数量相当之大

3.可构建在廉价机器上,通过多副本机制,提高可靠性

缺点:

1.不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。

2.无法高效的对大量小文件进行存储

①存储大量小文件的话,他会占用NameNode大量的内存来存储文件目录和块信息,这样是不可取的,因为NameNode的内存总是有限的;(理解  书包含目录和内容)

②小文件存储的寻址时间会超过读取时间,他违反了HDFS的设计目标

3.不支持并发写入,文件随机修改

一个文件只能有一个写,不允许多个线程同时写

仅支持数据append(追加),不支持文件的随机修改

1.4 HDFS组成架构

NameNode(nn):就是Master(老板),它是一个主管、管理者。

  1. 管理HDFS的名称空间;
  2. 配置副本策略;
  3. 管理数据块的映射信息;
  4. 处理客户端的读写请求;

DataNode:就是Slave. NameNode下达命令,DataNode执行实际的操作

  1. 存储实际的数据块
  2. 执行数据块的读/写操作

Client:就是客户端

  1. 文件切分,文件上传HDFS的时候,Client将文件切分成一个一个Block(默认大小:128兆),然后进行上传
  2. 与NameNode交互,获取文件的位置信息
  3. 与DataNode交互,读取或者写入数据
  4. Client提供一些命令来管理HDFS,比如NameNode格式化
  5. Client可以通过一些命令来访问HDFS,比如对HDFS增删改查操作

Secondary NameNode:并非NameNode的热备(并非时时保持工作),当NameNode挂掉的时候,它不能马上替换NameNode并提供服务(没有解释,作业:Fsimage和Edits是干什么的,原理是什么)

  1. 辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode;
  2. 在紧急情况下,可辅助恢复NameNode

作业Fsimage和Edits:

内存(快,断电容易丢失) edits:编辑文件

硬盘 Fsimage :为了防止断电数据丢失,会落盘为fsimage

以上内容全部存为fsimage1

edits:2026年4月14日10点30-10点40 新的东西

edits:2026年4月14日10点40-10点50 新的东西

Secondary NameNode:fsimage2=fsimage1+edits1+edits2

同步传给namenode,会存fsimage2

secondary namenode

只不过namenode会多一个edits inprogress

1.namenode fsimage edits 会跟secondarynamenode同步 

2.secondarynamenode会把fsimage edits 加起来得到一个新的fsimage,传回给namenode

namenode(物理地址)

/usr/local/hadoop/tmp/dfs

1.5 Hdfs文件块大小(面试重点)

HDFS中的文件在物理上是分块存储(Block),块的大小可以通过参数配置(dfs.blocksize)来规定,默认大小在Hadoop2.X版本是128M,老版本是64M

block1    block2  block3   block4    .........block65   block66

1.如果寻址时间约为10ms,即找到目标block的时间为10ms(计算机组成原理)

2.寻址时间为传输时间的1%时,则为最佳状态  传输时间=10ms/0.01=1000ms=1s

3.目前磁盘的传输速率普遍为100MB/S    块大小:100MB/s*1s=100M       128M

普通机械硬盘  80M/s-90M/s    块大小:80-90M     128M,64m

固定硬盘  200M/s-300M/s       块大小:  200M-300M    256M

思考:为什么块的大小不能设置太小,也不能设置太大?

(1)HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置(找目标块)

(2)如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置的时间(寻址时间)。导致程序在处理这块数据时,会非常慢

总结:HDFS块的大小设置主要取决于磁盘的传输速率

1.6 HDFS的shell操作(10分)非常重要,考试题

1.基本的语法

bin/hadoop fs 具体命令

bin/hdfs dfs 具体命令

这两个具体命令完全相同,随便找一个用就行

2.常用命令

(0)启动Hadoop集群

sbin/start-dfs.sh(在sbin目录下启动hdfs系统)

sbin/start-yarn.sh(在sbin目录下启动yarn)

(1)-help:输出这个命令参数

hadoop fs -help rm:输出rm这个命令的相关解释以及参数说明

下图是错误:解决办法是重启(stop-dfs.sh)

(2)-ls:显示目录信息的

(3)-mkdir:在HDFS上创造文件夹

hadoop fs -mkdir -p /rangzherongyao/zhonglu

(4)-moveFromLocal:从本地剪切粘贴到HDFS

1.本地有一个文件

touch buzhihuowu.txt  创建buzhihuowu这个空文件

2.将本地的文件剪切,粘贴到HDFS系统里面

hadoop fs -moveFromLocal 本地文件 HDFS的路径

(5)-appendToFile:追加一个文件到已经存在的文件末尾(第一组:上单的英雄,第二组:中单的英雄,第三组:发育的英雄,第四组:辅助的英雄,第五组:打野的英雄,第六组:中单)

1.本地文件(一般是有内容的)

2.把本地文件的内容放到(HDFS系统里面已经存在的)文件末尾

hadoop fs -appendToFile 本地文件 HDFS的文件:把本地文件的内容放到HDFS文件的末尾

(6)-cat:显示文件内容

(7)-chgrp,chmod,chown:Linux文件系统中的用法一样,修改文件所属权限

(ch:change,grp:group分组,mod:mode权限,own:owner拥有者)

chgrp [R] 组名 文件或者目录名:更改文件/目录所属的用户组

hadoop fs -chgrp  新改的组名 文件或者文件夹

chown:修改文件/目录所有者

hadoop fs -chown 更改为所有者:组名 文件或者文件夹

chmod:[用户类别][操作符][权限] 文件/目录 :修改文件/目录的权限(不考)

1.用户类别

u.所有者(user)

g:所属者组(group)

o:其他用户(others)

a:所有用户(all,默认值)

2.操作符

+:添加权限

-:移除权限

=:直接设置权限(覆盖原有的权限)

3.权限

r:读

w:写

x:执行

原有的权限 命令 现有的权限
user原本rw- chmod u+x script.txt user的权限rwx
group的权限rw- chmod g-w data.txt group的权限r--
others的权限rwx chmod o=r file.txt others的权限r--

数字模式:

chmod [数字组合] 文件/目录

1.权限对应的数字

r读=4

w写=2

x执行=1

-(无权限)=0

2.数字组合规则

将三类用户(所有者,组,其他用户)的权限,得到三位数字

第一位:所有者权限

第二位:组权限

第三位:其他用户权限

例1:rwxr-xr--

所有者:rwx  4+2+1=7   给了读,写,执行的权限

组:r-x  4+0+1=5   给了读和写的权限

其他用户:r--  4+0+0=4  只给了读的权限

chmod 754 file.txt

例2:rw-r--r--

所有者:rw-  4+2+0=6

组:r-- 4+0+0=4

其他用户:r--  4+0+0=4

chmod  644 file.txt

(8)-copyFromLocal:从本地文件系统中拷贝文件HDFS路径去(和shell中的put一样

(9)-copyToLocal:从HDFS拷贝到本地(和get一样)

(10)-cp:从HDFS的一个路径拷贝到HDFS的另一个路径

1.创建一个其他分路的文件夹(游走)

2.我们把王昭君从中路复制到一个到游走

(11)-mv:在HDFS目录中移动文件

1.先在HDFS创建一个打野的文件夹

2.把不知火舞从中路移到打野的位置(不是复制)

(12) -get:等同于copyToLocal(即从HDFS下载文件到本地),都是从HDFS下载文件到本地

hadoop fs -get hdfs 系统里面的文件  下载到本地的存放路径

(13) -put:等同于copyFromLocal:上传本地文件到HDFS系统里面

hadoop fs -put 本地文件 HDFS系统的存放路径

(14) -tail:显示一个文件的末尾

(15)-rm:删除文件或文件夹

-r代表遍历文件夹

hadoop fs -rm -r hdfs文件夹:删除hdfs系统里面的这个文件夹以及文件及下面的所有文件

(16)-du 统计文件夹大小信息

-s -h 88 264 34 27代表是什么

(17) -setrep:设置HDFS文件的副本数量

hadoop fs -setrep 副本数量

(18)-touchz:在hdfs系统里创建一个空的文件(必考)(具体用法自己搜)

在hdfs系统里面创建一个空文件two.txt的空文件

hadoop fs -touchz two.txt

1.7 HDFS客户端操作(开发重点)

master(shell),slave1,slave2,slave3

hdfs客户端代码(集群外的一台电脑)  可以控制整个集群(master,slave1,slave2,slave3)

1.pom.xml

<!--dependencies代表这个项目所有的依赖 -->
<dependencies>
    <!--junit单元测试类 -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <!-- log4j 打印日志,日志等级-->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>

    <!--hadoop的客户端 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

2.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>
 
    </Appenders>
 
    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>
 
        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>
 
</Configuration>

3.HdfsClient类

package org.penglian;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;

public class HdfsClient {
    @Test
    public void testMkdir() throws IOException, InterruptedException {
        //1.创建HDFS的客户端对象fileSystem,(发送了uri网址,配置对象,用户),请求连接集群
        FileSystem fileSystem=FileSystem.get(URI.create("hdfs://172.18.0.2:9000"),new Configuration(),"root");
        //2.登录HDFS成功,可以对HDFS做操作,所以我在HDFS系统里创建了一个aiqnggongyu的文件夹
        fileSystem.mkdirs(new Path("/aiqinggongyu"));
        //3.退出登录,关闭资源
        fileSystem.close();
    }
}
package org.penglian;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;

public class HdfsClient {
    @Test
    public void testMkdir() throws IOException, InterruptedException {
        //1.创建HDFS的客户端对象fileSystem,(发送了uri网址,配置对象,用户),请求连接集群
        URI uri=new URI("Hdfs://172.18.0.2:9000");
        Configuration conf=new Configuration();
        String user="root";
        // FileSystem不一定有三个参数,也有可能是1个或者2个
        FileSystem fileSystem=FileSystem.get(uri,conf,user1);
        //2.登录HDFS成功,可以对HDFS做操作,所以我在HDFS系统里创建了一个aiqnggongyu的文件夹
        fileSystem.mkdirs(new Path("/aiqinggongyu"));
        //3.退出登录,关闭资源
        fileSystem.close();
    }
}

只要考hdfs,第1和3步一定存在

上传

@Test
    public void testCopyFromLocalFile() throws URISyntaxException, IOException, InterruptedException {
        //1.
        连接集群的namenode地址,HDFS的链接地址
        URI uri=new URI("hdfs://172.18.0.2:9000");
        //创造一个配置文件
        Configuration configuration=new Configuration();
        //登录用户
        String user="root";
        //获取到HDFS客户端对象
        FileSystem fileSystem=FileSystem.get(uri,configuration,user);

        //2.操作HDFS系统
        //copyFromLocalFile 从本地上传文件到HDFS系统
        //参数1:表示是否删除本地的原数据  ,参数2:是否覆盖HDFS里面已经存在的文件
        //参数3:需要上传的本地文件的路径地址,参数4:HDFS的目标地址(要上传到hdfs的那个位置的地址)
        fileSystem.copyFromLocalFile(false,true,new Path("/root/IdeaProjects/HdfsDemo/src/main/resources/zengxiaoxian.txt"),new Path("/aiqinggongyu"));

        //3.关闭资源
        fileSystem.close();
    }

测试参数优先级

 (1)客户端代码中设置的值  > (2)ClassPath下用户自定义配置文件 > (3)服务器的自定义配置(hadoop/etc/hadoop/xxx-site.xml) >  (4)服务器默认配置(xxx-default.xml)

自定义的配置高于默认配置

1.一开始默认三份    服务器默认配置 hdfs-default.xml

2.从3份变为2份   代码里的配置优先级高于服务器默认配置,所以直接把副本数改成了 2

3.从2份变为1份   用项目配置文件(优先级次高)→ 1 份(覆盖了默认配置,且代码配置已移除)

(红色字体不确定,不确定2,3顺序对不对,再ai看一下)

将 HDFS 上的文件,复制(下载)到本地文件系统中:

1.具体代码

public void testCopyToLocalFile() throws IOException {
        //copyToLocalFile从HDFS系统里面下载文件到本地
        //参数1 boolean delsrc:是否删除HDFS上的源文件  ,参数2 Path src:hdfs上要被下载的文件路径
        //参数3 path dst:要将文件下载到本地的路径
        //参数4 boolean useRawLocalFileSystem:是否开启文件验证(false下载完会有两个条件,crc校验文件;但如果是true,就不会有crc校验文件)
        fileSystem.copyToLocalFile(false,
                new Path("/wangzherongyao/youzou/wangzhaojun.txt"),
                new Path("/root/IdeaProjects/HdfsDemo/src/main/resources/"),
                false);
    }

2.整体代码

3.运行结果

删除

作业:测试上面两个boolean类型变成true会有什么效果

在HdfsClientV1

1.删除文件

删除了/aiqinggongyu/zengxiaoxian.txt

public void testDelete() throws IOException {
        File file=new File(“/aiqinggongyu/zengxiaoxian.txt”);
        //exists()判断文件是否存在
        if (file.exists()){
               System.out.print("存在");
        }


        //delete删除HDFS里面的文件或文件夹
        //参数1:要删除的文件或者是文件夹,参数2:是否遍历
        fileSystem.delete(new Path("/aiqinggongyu/zengxiaoxian.txt"),false);
    }

2.删除文件夹,例如删除/aiqinggongyu/文件夹(该文件夹里有文件用true,没有可以用false)true就是允许遍历

public void testDelete() throws IOException {
        //删除文件夹时,如果该文件夹里有文件就要用true(表示允许遍历),没有文件可以用false。否则会报错
        fileSystem.delete(new Path("/aiqinggongyu/"),true);
    }

HDFS文件名更改/移动

更改

1.代码

public void testRename() throws IOException {
        //rename()这个方法是用来修改文件名字或者移动文件的
        //参数1:HDFS系统原本文件的路径和名字;参数2:hdfs系统里被修改后的名字和路径
        fileSystem.rename(new Path("/wangzherongyao/zhonglu/anqila.txt"),new Path("/wangzherongyao/zhonglu/shuangmawei.txt"));
    }

2.初始图

3.结果图

移动

1.代码

public void testRename() throws IOException {
        fileSystem.rename(new Path("/wangzherongyao/zhonglu/shuangmawei.txt"),
                new Path("/anqila.txt"));
    }

2.结果图:把anqila移到了根目录下

HDFS文件详情查看

查看文件名称、权限、长度、块信息

public void testListFiles() throws IOException {
        //listFiles()这个方法是返回根目录(/)下所有的子文件和子目录的详细信息,包括文件的长度、块大小、备份数、修改时间、所有者、权限
        //参数1:要遍历的目标起始路径    参数2:boolean recursive:是否递归遍历子目录。如果设置为true时,会返回路径下所有的子目录中的文件;false时仅返回当前目录下的直接文件
        //左边:右边listFiles()这个方法返回来的所有信息都存储在左边listFiles里面
        RemoteIterator<LocatedFileStatus> listFiles=fileSystem.listFiles(new Path("/"),true);
        //遍历
        while (listFiles.hasNext()){
            //status是 存储的listFiles里面的一条信息 的对象
            LocatedFileStatus status=listFiles.next();
            //文件名称
            System.out.println(status.getPath().getName());
            //文件长度
            System.out.println(status.getLen());
            //文件权限
            System.out.println(status.getPermission());
            //文件的分组
            System.out.println(status.getGroup());
            System.out.println("------");

            //获取一个文件存储的所有块信息,块信息存放在blockLocaltions(大文件会有多个块)
            BlockLocation[] blockLocations=status.getBlockLocations();
            //遍历一个文件的每个块
            for (BlockLocation blockLocation:blockLocations){
                //获取存储这个块的hosts(他会包含多个节点,这取决于备份数和datanode有多少节点)

                String[] hosts=blockLocation.getHosts();
                //遍历所有节点的host
                for (String host:hosts){
                    System.out.print(host+"|");
                }
                System.out.println();
            }
            System.out.println("++++yitiaoxinxidejieshu+++++++++++++++++++++");

        }
    }

最后显示的结果(截图太杂,我写在代码里了)

//根目录下的第一个文件
anqila.txt
//该文件大小
34
//文件权限
rw-rw-rw-
//所属用户组
supergroup
//分界线
------
//只有一个数据块,该数据块存储了两个数据节点,即slave3和master
slave3|master|
//证明第一个循环结束
++++yitiaoxinxidejieshu+++++++++++++++++++++
//根目录下第二个文件
jdk-8u171-linux-x64.tar.gz
190890122
rw-r--r--
supergroup
------
master|slave1|slave2|
slave3|master|slave2|
++++yitiaoxinxidejieshu+++++++++++++++++++++
penglian.txt
81
rw-r--r--
supergroup
------
slave3|master|slave1|
++++yitiaoxinxidejieshu+++++++++++++++++++++
job_1775531247434_0001-1775531788014-root-word+count-1775531809802-1-1-SUCCEEDED-root.default-1775531795272.jhist
22748
rwxrwx---
supergroup
------
slave3|slave1|slave2|
++++yitiaoxinxidejieshu+++++++++++++++++++++
job_1775531247434_0001.summary
439
rwxrwx---
supergroup
------
master|slave1|slave2|
++++yitiaoxinxidejieshu+++++++++++++++++++++
job_1775531247434_0001_conf.xml
305591
rwxrwx---
supergroup
------
slave3|slave1|slave2|
++++yitiaoxinxidejieshu+++++++++++++++++++++
wangzhaojun.txt
27
rw-r--r--
supergroup
------
slave3|master|slave2|
++++yitiaoxinxidejieshu+++++++++++++++++++++
wangzhaojun.txt
27
rw-r--r--
supergroup
------
master|slave1|slave2|
++++yitiaoxinxidejieshu+++++++++++++++++++++

Process finished with exit code 0

Hdfs判断是文件还是文件夹

 public void testListStatus() throws IOException {
        //获取在hdfs系统里面/根目录下,所有文件以及文件夹的状态
        FileStatus[] listStatus=fileSystem.listStatus(new Path("/"));
        //遍历listStatus
        for(FileStatus fileStatus:listStatus){
            //isFile()方法判断是不是文件
            if(fileStatus.isFile()){
                System.out.println("wenjian:"+fileStatus.getPath().getName());
            }else{
                System.out.println("wenjian:"+fileStatus.getPath().getName());
            }
        }
    }

public void testListStatus() throws IOException {
        FileStatus[] listStatus=fileSystem.listStatus(new Path("/"));
        for (FileStatus fileStatus:listStatus){
            //isDirectory是不是文件夹
            if (fileStatus.isDirectory()){
                System.out.println("wenjianjia"+fileStatus.getPath().getName());
            }else {
                System.out.println("wenjian"+fileStatus.getPath().getName());
            }
        }

    }

hdfs写数据流程(面试重点)(会考)

(1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。(Name Node检查权限,检查目录结构)

(2)NameNode返回是否可以上传

(3) 客户端请求第一个Block上传到哪几个DataNode服务器上

(4)NameNode返回3个DataNode节点,分别是dn1(DataNode1),dn2,dn3(3个这个数量和我们副本存储节点选择有关)

(5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成

(6)dn1,dn2,dn3逐级应答客户

下面第七步才真正开始存数据

(7)客户端开始往dn1上传第一个Block(从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个Packet会放入一个应答队列等待应答

(8)当一个Block传输完成后,客户端再次请求Name Node上传第二个Block的服务器。(重复执行3-7)

网络拓扑-节点距离计算

在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。那么这个最近距离怎么计算呢?

节点距离:两个节点达到最近的共同祖先的距离总和

Distance(d1/r1/n0,d1/r1/n0)=0

Distance(d1/r1/n1,d1/r1/n2)=1+1=2

Distance(d1/r2/n1,d1/r3/n2)=2+2=4

Distance(d1/r2/n0,d2/r4/n1)=3+3=6

机架感知(副本存储节点选择)

机架感知(replication副本存储节点选择):数据可靠性,传递速度快

第一个副本在Client所处的节点上。如果客户端在集群外,随机选一个。(为了传输速度快)

第二个副本在另一个机架的随机一个节点。(为了数据的可靠性)

第三个副本在第二个副本所在机架的随机节点。(为了速度)

hdfs读取数据流程

(1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址

(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据

(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)

(4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件

namenode(nn)和secondarynamenode(2nn)(3遍)

DataNode中元数据存储在哪儿?(内存,磁盘)

   内存<------开机加载---------fsimage存储元数据,edits追加的信息--------关机合并到fsimage

NameNode工作机制

第一阶段:NameNode启动

(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存

(2)客户端对元数据进行增删改的请求

(3)NameNode记录操作日志,更新滚动日志

(4)NameNode在内存中对元数据进行增删改

第二节阶段:Secondary NameNode工作

(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果

(2)Secondary NameNode请求执行CheckPoint

(3)NameNode滚动正在写的Edits日志。

(4)将滚动前的编辑日志和镜像文件拷贝到Secondar NameNode

(5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint

(7)拷贝fsimage.chkpoint到NameNode

(8)NameNode将fsimage.chkpoint重新命名为fsimage。

DataNode工作机制(3遍)

(1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度、快数据的校验和、以及时间戳。

(2)DataNode启动后会向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息(跟老板汇报工作)

(3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另外一台机器,或删除某个数据块(保持副本数)如果超过10分钟没有收到某个DataNode的心跳,

则认为该节点不可用

(4)集群运行中可以安全加入和退出一些机器(DataNode  10---->100  )

MapReduce

1.1 MapReduce定义

         MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架

        MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

1.2 优缺点

优点

1.MapReduce易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的,就是因为这个特点使得MapReduce编程变得很流行

2.良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展他的计算能力。

3.高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就是要求他具有很高的容错性

比如其中有台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成

4.适合PB级以上海量数据的离线处理(是离线处理,不是实时的)

可以实现上千台服务器集群并发工作,提高数据处理能力

缺点

1.不擅长实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果

2.不擅长流失计算

流式计算的输入数据时动态的,而MapReduce的输入数据集时静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的

3.不擅长DGA(有向图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的地下

1.3 MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:172.0.18.2:9870

(1)MrAppMaster:负责整个程序的过程调度及状态协调

(2)MapTask:负责Map阶段的整处理流程

(3)ReduceTask:负责Reduce阶段的整个数据处理流程

1.4 常用数据序列化类型

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable

1.5 MapReduce编程规范

用户编程的程序分为三类部分:Mapper,Reducer和Driver

1.Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

(3)Mapper中的业务逻辑写在map()方法里面

(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)

(5)map()方法(MapTask进程)对每一个<K,V>调用一次

2.Reducer阶段

(1)用户自定义的Reducer要继承自己的父类

(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

(3)Reducer的业务逻辑写在reduce()方法中

(4)ReduceTask进程对每一组相同的<K,V>组调用一次reduce()方法

3.Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

Mapper

k1代表偏移量,v1这一行的内容

I a m s o m e b o d y 换行符
0 1 2 3 4 5 6 7 8 9 10 11 12 13

<0,I am somebody>
<14,I am smart and kind>
<29,I am important>
<40,I am starve of education>
<51,I have places to go>
<63,I have people to impress>
<77,I have world to change>


Hadoop Mapper 阶段知识点
 
1. 用户自定义的 Mapper 需要继承父类 Mapper ( extends Mapper )

2. Mapper的输入数据是KV对的形式(KV的类型可自定义)

3. Mapper中的业务逻辑写在 map() 方法里面

4. Mapper的输出数据是KV对的形式(KV的类型可自定义)

5.  map() 方法(由 MapTask 进程执行)对每一个 <K,V> 输入调用一次(我们有7行文件,所以有七对输出,所以我们会执行map())

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//mapper输入key的类型:LongWritable
//mapper输入value的类型:Text
//mapper输出key的类型:Text
//mapper输出value的类型:IntWritable
public class wordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //将输入的value数据转换成string类型
        String line= value.toString();
        //将line这个字符串切割成单词
        //line "I am somebody"
        String[] words=line.split(" ");
        //遍历words
        for(String word:words){
            //输出单词,还有单词的个数1(一定要注意输出的类型必须保持一致)
            context.write(new Text(word),new IntWritable(1));
        }
        //输出
        //<I,1>
        //<am,1>
        //<somebody,1>
    }
}

Reduce 

1.mapper的输入

<0,I am somebody>会调用一次map()
<14,I am smart and kind>会调用一次map()
<29,I am important>会调用一次map()
<40,I am starve of education>会调用一次map()
<51,I have places to go>会调用一次map()
<63,I have people to impress>会调用一次map()
<77,I have world to change>会调用一次map()

2.mapper的输出

map()执行完后的结果<I,1>,<am,1>,<somebody,1>

map()执行完后的结果<I,1>,<am,1>,<smart,1>,<and,1>,<kind,1>

map()执行完后的结果<I,1>,<am,1>,<important,1>

shuffle过程

<l,<1,1,1,1,1,1,1>>

<am,<1,1,1>>

<somebody,1>

Reduce的输入

<l,<1,1,1,1,1,1,1>>

<am,<1,1,1>>

<somebody,1>

Reduce的输出

<1,7>

<am,3>

<somebody,1>

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//reduce输入key的类型:Text
//reduce输入value的类型:IntWritable
//reduce输出key的类型:Text
//reduce输出value的类型:IntWritable
public class wordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
    @Override
    protected void reduce(Text k3,Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
        //累加,初始单词个数是0
        int total=0;
        //遍历reduce输入的value
        for(IntWritable v:v3){
            total+=v.get();//total=total+v.get();累加单词出现的次数
            
        }
        //reduce的输出<单词,单词的个数>
        context.write(k3,new IntWritable(total));
    }
    
}

Driver

代码要背下来(手敲)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class wordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1.获取配置信息以及封装任务,获得一个job对象
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);

        //2.设置jar加载路径(就是自己的本类,生成的class文件)
        job.setJarByClass(wordCountDriver.class);
       
        //3.设置Mapper和Reducer类
        job.setMapperClass(wordCountMapper.class);
        job.setReducerClass(wordCountReducer.class);

        //4.设置map输出的key和value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5.设置最终输出的key和value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6.设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path("/root/IdeaProjects/mapReduceTest/src/main/resources/penglian.txt"));
        FileOutputFormat.setOutputPath(job,new Path("/root/IdeaProjects/mapReduceTest/src/main/resources/output"));

        //7.提交
        boolean result=job.waitForCompletion(true);
        System.exit(result?0:1);

    }
}

本地测试

输入文件和输出文件都在本地

执行Driver类后,结果如下:

使用HDFS系统作为文件的输入输出测试

1.启动HDFS系统

2.修改Driver类代码的路径

3.执行Driver类,可以看到生产了anqilaOutput文件夹,具体结果如下:

在集群上测试

1.启动HDFS系统和yarn

2.修改pom.xml文件(注意:mainClass必须跟的是整个项目的启动类,即我们写的Driver类)

<plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin </artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>wordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

3.修改代码

将输入输出的路径设为可以输入的参数

4.打包(双击package,会生成一个target文件夹,文件夹下面会有两个jar包)

5.上传jar包到集群

目前我们所生成的jar包所在地址为cg这台电脑上面的

/root/IdeaProjects/mapReduceTest/target/mapReduceTest-jar-with-dependencies.jar

现在需要上传到resourceManager所在电脑上(master)

scp /root/IdeaProjects/mapReduceTest/target/mapReduceTest-jar-with-dependencies.jar root@172.18.0.2:/usr/local/hadoop

查看是否上传成功,在master机器上执行

6.执行jar包

在master使用如下命令

hadoop jar mapReduce Test-jar-with-denpendencies.jar /anqila.txt/yarnOutput

可以通过yarn的客户端查看运行的各个状态

7.最后结果

getInstance要考

实践报告要写,结果要有截图

作业:1.认真看一下driver的各行代码和注释

2.创建一个新的工程

输入:

学号,姓名,第一次作业成绩,第二次作业成绩,第三次作业成绩
创建一个txt文件,文件内容如下:

01,xiaohua,85,82,96
02,xiaoming,45,60,47
03,lily,85,49,90
04,rose,75,65,70

输出:

学号,最好的一次成绩

01 96

02 60

03 90

04 75

Mapper类:

import javafx.scene.text.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//map shuru key:LongWritable
//map shuru value:Text   01,xiaohua,85,82,96
//map shuchu key:Text   01
//map shuchu value:IntWritable
public class zuihaochengjiMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
        //1.01,xiaohua,85,82,96
        //1.Text类型转换string类型
        String line=value.toString();

        //2.切割string类型,split(“,”)双引号里面是什么就按照什么切割,切割万是string的数组:parts{“01”,“xiaohua”,“85”,“82”,“96”}
        String[] parts=line.split(",");

        //3.xuehao=parts[0]       第一行就是01
        String xuehao=parts[0];
          //4.遍历三次循环
        for(int i=2;i<parts.length;i++){
            //string类型转换成int类型
            int score=Integer.parseInt(parts[i]);
            //输出,输出的两个参数,第一个是输出的key,第二次数输出的value
            context.write(new Text(xuehao),new IntWritable(score));
        }

        //shuchu
        //01,85
        //01,82
        //01,96

    }
}

Reducer类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//reduce 输入 key:Text
//reduce 输入 value:IntWritable
//reduce 输出 key:Text
//reduce 输出 value:IntWriable

public class zuihaochengjiReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    //输入 :01,<85,82,96>
    //输出 :01,96
    protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        int max=0;
        //遍历所有的成绩,找到最高的成绩
        for(IntWritable value:values){
            if(value.get()>max){ 
                //注意这里获取具体的值用get()方法
                max=value.get();

            }
        }
        //输出write方法
        context.write(key,new IntWritable(max));
    }
}

Driver类(全文背诵,必考)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class zuihaochengjiDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1.获取配置信息以及封装任务
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);

        //2.设置jar加载路径(程序入口,自己的driver类+.class)
        job.setJarByClass(zuihaochengjiDriver.class);

        //3.设置Mapper和Reducer类(你自己的Mapper和Reducer的类名)
        job.setMapperClass(zuihaochengjiMapper.class);
        job.setReducerClass(zuihaochengjiReducer.class);

        //4.设置Map的输出key和value的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5.设置最终输出key和value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6.设置输入(有s)和输出路径(无s)
        FileInputFormat.setInputPaths(job,new Path("/root/IdeaProjects/mapReduceTest/src/main/resources/penglian.txt"));
        FileOutputFormat.setOutputPath(job,new Path("/root/IdeaProjects/mapReduceTest/src/main/resources/output"));
        
        //7.提交
        boolean result=job.waitForCompletion(true);
        System.exit(result?0:1);

    }
}

MapReduce核心框架原理

一、InputFormat数据输入

InputFormat功能:数据切片、为Mapper提供输入数据

1.切片

(1)问题引出

Map Task的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。(Map Task不是越多越好)

(2)Map Task并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块(默认:Block的大小是128M)(必考)

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

Map Task个数,决定job并行度,Map Task的个数取决于数据切片的个数(必考)

Map Task并行度决定机制

(1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定(必考)

(2)每一个Split切片分配一个Map Task并行实例处理         s.exists()方法用来查看是否存在

(3)默认情况下,切分大小=BlockSize=128M

(4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片(不管你有几个文件,一个文件一个文件处理,不会让两个文件合并在一起了再切)

例子:

输入:data1.txt  300M         data2.txt   1M

切片大小设置为128M

切片:切片1:0M-128M      切片2:128M-256M     切片3:256M-300M        切片4: 0M-1M

切片大小计算公式:Math.max(minSize,Math.min(maxSize,blockSize))

Math.max(1,Math.min(最大值,blockSize))=Math.max(1blockSize)=blockSize

Math.max(1,6)=6

Math.min(1,6)=1

问题1(必考):一个文件400MB,设置最小分片(minSize)为256MB,设置最大分片(maxSize)就是默认值,BlockSize=128MB,最终切片是多少?有多少个MapTask任务?

答:最终切片为256

切片1:0-256M   切片2:257-400M,所以任务为2个

2.FileInputFormat实现类

InputFormat是一个抽象类,定义了一个MapReduce作业必须实现的标准规范

FileInputFormat同样是一个抽象类,它继承自InputFormat

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat。

TextInputFormat是默认的FileInputFormat实现类

MapReduce的默认输入格式是?   TextInputFormat(必考)

二、输出数据OutputFormat

OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口

OutputFormat的实现类有TextOutputFormat,SequenceFileOutputFormat,自定义OutputFormat

默认的输出格式是TextOutputFormat,他把每条记录都写为文本行(必考)

三、shuffle(必考)

Map方法之后,Reduce方法之前的数据处理过程,称之为Shuffle

combiner不一定在shuffle里面

3.1 分区partition

(1)问题引入

            要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照号码归属地不同省份输出到不同文件中(分区)。

(2)默认partition分区

        默认分区是根据Key的hashCode对ReduceTasks个数取模(取余)得到的。用户没法控制哪个Key存储到哪个分区

什么是分区?Mapper任务划分数据的过程称作Partition。负责实现数据的类称作Partitioner,默认的分区时Hash分区(Hash Partition)

Partition的作用:将map阶段产生的所有<key,value>对分配给不同的Reducer处理,可以将Reduce阶段的处理负载进行分摊

这里回答前面的问题,什么决定Reduce任务的数量,答案是:Partition的数量决定Reducer的数量

什么决定map的数量:切片的数量(必考)

一般Reduce的任务数默认值是1,用户可以通过job.setNumReduceTasks(数字)去设置Reduce的个数

在Driver类里面有这样的代码(放到Driver里面) job.setNumReduceTasks(5);代表有5个Reduce

3.2 合并Combiner

(1)需求

         统计过程中对每一个Map Task的输出进行局部汇总,以减少网络传输量即Combiner功能

Combiner是一种特殊的Reducer,在Mapper端,先执行一次Reducer

作用:减少Mapper输出到Reduce的数据量,缓解网路传输瓶颈,提高reducer的执行效率

需要注意的问题:一定要谨慎使用Combiner

有些情况不能使用Combiner --->  如:求平均值

保证引入Combiner以后,不能改变原来的逻辑

3.3 shuffle完整流程理解

分区前面还有一个溢出的步骤。

实例背景

假设有2个Map任务处理以下输入数据:

-Map1输出:

(“apple”,1),(“banana”,1),(“apple”,1)

-Map2输出:

(“banana”,1),(“apple”,1),(“cherry”,1)

最终需统计单词出现次数(WordCount),由2个Reduce任务处理(按首字母分区):

-Reduce1:处理a-c(如apple,banana,cherry)

-Reduce2:处理d-z(本例无此类数据)

第一部分:Map任务处理(无任何排序)

1.Map任务1输入:(apple,1),(banana,1),(apple,1)

2.Map任务2输入:(banana,1),(apple,1),(cherry,1)

3.Map输出:保持原始键值对

第二部分:Shuffle过程(属于Map端)(核心)

1.分区(partitioning):

       1.所有数据分配到partition 0

        2.数据标记:(apple,P0),(banana,P0),(cherry,P0)等

2.Map端排序(Sorting):

        1.Map1排序后:(apple,1),(apple,1),(banana,1)

        2.Map2排序后:(apple,1),(banana,1),(cherry,1)

3.Combiner(可选本地聚合)

       1.Map1聚合后:(apple,2),(banana,1)

        2.Map2保持原样:(apple,1),(banana,1),(cherry,1)

第三部分:Reduce端处理(shuffle)

1.数据拉取(Fetch):

          1.来自Map1:(apple,2),(banana,1)

           2.来自Map2:(apple,1),(banana,1),(cherry,1)

2.归并排序(Merge Sort)

           1.全局排序结果:(apple,2),(apple,1),(banana,1),(banana,1),(sherry,1)

3.分组(Grouping)

            1.apple:[2,1]

            2.banana:[1,1]

            3.cherry:[1]

第四部分:Reduce任务处理

1.Reduce输入:分组后的数据

2.处理过程:对值求和sum(values)

3.最终输出:

  1. (apple,3)
  2. (banana,2)
  3. (cherry,1)
阶段 输入 输出 阶段
分区(Partitioning) (Key,Value) (Partition,Key,Value)

Map端

排序(Sorting) (Partition,Key,Value) 分区内按键排序的数据 Map端
Combiner(可选) 排序后的(Key,Value) 合并后的(Key,CombinedValue) Map端
数据拉取(Fetch) Map输出的磁盘文件 属于同一分区的未排序数据 Reduce端
归并排序(Merge) 来自多个Map的同一分区数据 全局按键排序的数据 Reduce端
分组(Grouping) 排序后的(Key,Value)序列 (Key,Iterable<Value>) Reduce端

Yarn

下图是YARN的架构图,它由Container,ResourceManager,NodeManager,ApplicationMaster几个主要部分组成

内存,CPU,磁盘都是资源

YARN的架构是主从架构,主机为ResourceManager,从机为NodeManager,其中ResourceManager负责接收客户端的作用请求以及为作业分配相应的NodeManager资源,在NodeManager启动Container资源容器,在资源容器中运行相关作业

(1)Container(容器):YARN中资源包括内存、CPU、磁盘输入输出等等。Container是YARN中资源的抽象,它封装了某个节点上的多维度资源

(2)ResourceManager(资源管理器)

ResourceManager负责整个系统的资源分配和管理,是一个全局的资源管理器。主要由两个组件构成:调度器和应用程序管理器:

调度器(Scheduler):

       调度器根据资源情况为应用程序分配封装在Container中的资源

应用程序管理器(Application Manager):

       应用程序管理器负责管理整个系统中所有的应用程序

(3)NodeManager(节点管理器)

NodeManage是每一个节点上的资源和任务管理器

定时向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态;

接受并处理来自ApplicationManager的Container启动/停止等请求

(4)ApplicationMaster(主应用)

ApplicationMaster是一个详细的框架库,它结合从ResourceMaster获得的资源和NodeManager协同工作来运行和监控任务

用户提交的每一个应用程序均包含一个ApplicationMaster

主要功能包括:

  1. 与ResourceManager调度器协商以获取抽象资源(Container)
  2. 负责应用的监控、跟踪应用执行状态,重启失败任务等
  3. 并且与NodeManager协同工作完成Task的执行和监控

yarn中应用运行机制

yarn监控

yarn调度器(要考)


  类似一个工厂,接到很多订单,比如一个订单是1W件,一个是100件等等,工人是有限的,采用什么样的策略来安排达到资源利用率最高?这就是调度器所要考虑的。
 
YARN调度器分三种:
(1) FIFO Scheduler   → 先进先出调度器


(2) Capacity Scheduler   → 容器调度器

-分成多个队列,每个队列占用一定资源可以看作是FIFO Scheduler的多队列版本

-YARN默认采用Capacity Scheduler


(3) Fair Scheduler   → 公平调度器

三种调度器比较(必考)

调度器 工作方法
FIFO Scheduler(先进先出)

(1)单队列

(2)先进先出的原则

Capacity Scheduler(容器)

(1)多队列

(2)计算能力调度器,选择资源使用量占用最小、优先级高的先执行

(3)多用户的情况下,可以最大化集群的吞吐和利用率

Fair Scheduler(公平)

(1)多队列

(2)公平调度,所有的job具有相同的资源

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐