Hadoop学习04——MapReduce

这是我参与更文挑战的第12天,活动详情查看:更文挑战

MapReduce

概念

一、MapReduce原理

05.jpg

  1. (step1)切片————map的数量由什么决定
    1. 我们知道,文件是以块(block)的形式保存在hdfs文件系统里的。那为什么上图是一个split对应一个map,而不是一个block对应一个map呢?
      • split是对block块的逻辑划分,因为block的大小是在存储文件的时候决定的,无法动态修改,很不灵活;所以可以用split对block进行逻辑划分
      • 比如block太大会影响计算时间,用split划小一点,然后map并行执行,计算时间就大大缩短了
    2. 一个split对应一个map
  2. (step2)map的键值映射
    • 把一堆数据做key-value的映射
  3. (step3)shuffle
    • 把所有相同key的数据放到一起的过程
  4. (step4)reduce的分发策略及数据处理
    • map会把key相同数据作为一组,一个reduce可以处理多组数据,一组数据只能由一个reduce处理,而不能分开来放到多个reduce上处理
  5. 一次完整的mapreducce处理

06.jpg

  1. memory缓冲区
    • 功能一:给数据做排序
      • 比如key有北京,上海,南京,值是房价;首先会在缓冲区里分区,然后按照key把不同的数据放到不同的分区里
      • 然后把数据传给reduce,reduce就能根据分区很快的进行计算(哪个reduce处理哪个分区就很明确了)
    • 问题一:分区是如何确定的?
      • 有几个reduce就有几个分区
      • 比如reduce1处理北京的数据,那这个分区里就只有北京的数据;而reduce2处理上海,南京的数据,那么这第二个分区就包含了上海和南京的所有数据
    • 问题二:map如何知道哪个key属于哪个分区呢?
      • map在对数据做键值映射的时候,会根据key来生成对应的分区(partions)
    • 在内存中第二次给数据做排序
      • 第二次给数据做排序
      • 为什么要第二次排序呢?
        • 如果一个ruduce处理一队键值对,那么就很方便了,直接处理就行
        • 如果一个ruduce要处理多组不同key的键值对,那么这些数据就会混在一个分区里,reduce就会进行大量磁盘交互,所以现在缓冲区里把分区里的数据进行二次排序
    • 功能二:数据压缩
      • 比如要计算所有房价均值,可以把所有数据直接在map上就进行累加,变成一条数据;而不必把这个工作交给reduce来处理,避免了reduce的大量磁盘交互
  2. map如何把数据交给reduce
    • 假设一个block为默认值128M,那么每次缓冲区处理完数据以后就是128M,map并不会把数据直接给reduce,而是序列化成小文件,最后一起发给reduce;原因是reduce可能一下能处理比如1个G的文件
    • 先把一堆小文件排序,最后合成一个大文件(有序),再发给reduce
    • 多个map并行处理时就会形成多个小文件,最后需要对所有小文件进行排序,只要保证reduce拉取数据是有序的就行
    • 总而言之,reduce获取的是有序的文件
    • reduce必须获取所有数据才会开始进行迭代计算
  3. 案例

07.jpg

二、mr v1.0架构

  1. 架构图

08.jpg

  • JobTracker
    • 核心,主,单点
    • 调度所有的作业
    • 监控整个集群的资源负载
  • TaskTracker
    • 从,自身节点资源管理
    • 和JobTracker心跳,汇报资源,获取Task
  • Client
    • 作业为单位
    • 规划作业计算分布
    • 提交作业资源到HDFS
    • 最终提交作业到JobTracker
  1. mr1.0的弊端
    • JobTracker:负载过重,单点故障
    • 资源管理与计算调度强耦合
    • 其他计算框架需要重复实现资源管理
      • 比如mapreduce给一个datanode安排了工作;另一个计算框架无法知道这个datanode现在正在工作,就会再次安排工作给datanode,导致这个框架运行失败
    • 不同框架对资源不能全局管理
  2. mr1.0处理流程

09.jpg

  • 数出refund这个单词出现的次数

10.jpg

三、mr2.0(YARN)架构

  1. 架构图

11.jpg

  • ResourceManager
    • 主,核心
    • 集群节点资源管理
    • 长服务
    • 注意到,resourcemanager是一个单点,所以需要和zookeeper配合搭建HA
  • NodeManager
    • 长服务
    • 运行在节点上,将节点资源状况汇报给resourcemanager
    • 管理container的生存周期
  • container
    • 相当于是一个内存空间,如果reduce task运行内存超过container(默认大小1G)的大小,这个task就会被kill
  • applicationmaster
    • 负责分配任务,但是分配完了以后不会立即执行任务,首先要向resourcemanager申请资源
    • 短服务,来了一个task就开启一个applicationmaster
    • 避免了单点故障
  • client
    • 进行一些任务的设置
  1. YARN:Yet Another Resource Negotiator
    • Hadoop 2.0新引入的资源管理系统,直接从MRv1演化而来的;
      • 核心思想:将MRv1中JobTracker的资源管理和任务调度两个功能分开,分别由ResourceManager和ApplicationMaster进程实现
      • ResourceManager:负责整个集群的资源管理和调度
      • ApplicationMaster:负责应用程序相关的事务,比如任务调度、任务监控和容错等
    • YARN的引入,使得多个计算框架可运行在一个集群中
      • 每个应用程序对应一个ApplicationMaster
      • 目前多个计算框架可以运行在YARN上,比如MapReduce、Spark、Storm等

构建mapreduce

一、各服务节点安装位置

12.png

  • NM(nodemanager):它是管理数据的服务,必须放在datanode节点上
  • RS(resourcemanage):任意放,但是最好和namenode分开放

二、YARN on Single Node

  1. 修改etc/hadoop/mapred-site.xml
    • 打开配置文件目录发现并没有mapred-site.xml,但是有一个mapred-site.xml.template,把它的名字修改为mapred-site.xml
//添加如下
<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>
复制代码
  1. 修改etc/hadoop/yarn-site.xml
//添加如下
<property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
复制代码
  1. etc/hadoop/yarn-site.xml文件中增加HA的相关配置
<property>
   <name>yarn.resourcemanager.ha.enabled</name>
   <value>true</value>
 </property>
 <property>
 #这个id和之前hdfs的clusterID并没有任何关系,它是resourcemanager的id
   <name>yarn.resourcemanager.cluster-id</name>
   <value>cluster1</value>
 </property>
 #通过这两个逻辑名找到resourcemanager入口
 <property>
   <name>yarn.resourcemanager.ha.rm-ids</name>
   <value>rm1,rm2</value>
 </property>
 #以下是在哪两个主机配置resourcemanager
 <property>
   <name>yarn.resourcemanager.hostname.rm1</name>
   <value>master1</value>
 </property>
 <property>
   <name>yarn.resourcemanager.hostname.rm2</name>
   <value>master2</value>
 </property>
 #zookeeper节点位置
 <property>
   <name>yarn.resourcemanager.zk-address</name>
   <value>node2:2181,node3:2181,node4:2181</value>
 </property>
复制代码
  1. node3,node4之间配置免密钥登录
    • 由于在它们上面配置resourcemanager,且是HA,可能会互相切换,所以要配置ssh免密登录

三、启动

  1. 启动zookeeper
zkServer.sh start
复制代码
  1. 启动hdfs
    • 这里启动后,我发现一个异常,两个namenode都是standby状态,检查发现zkfc进程没有开启
    • 如果没有开启的话使用hadoop-daemon.sh start zkfc命令启动即可
start-dfs.sh
复制代码
  1. 启动yarn
    • 启动后会发现几个datanode上多了nodemanager进程,而resourcemanager进程并没有启动
start-yarn.sh
复制代码
  1. 启动resourcemanager
    • 在你配置resourcemanager的节点上启动这个进程,我这里是node3,node4
    • 配置的节点上都要运行一下这个命令
yarn-daemon.sh start resourcemanager
复制代码
  1. 各节点状态
#node1
[root@node1 ~]# jps
7426 Jps
7111 DFSZKFailoverController
6680 JournalNode
6491 NameNode

#node2
[root@node2 hadoop]# jps
6385 QuorumPeerMain
7269 Jps
6438 NameNode
6502 DataNode
7159 NodeManager
6891 DFSZKFailoverController
6590 JournalNode

#node3
[root@node3 ~]# jps
6401 QuorumPeerMain
6455 DataNode
6841 ResourceManager
6540 JournalNode
6718 NodeManager
6879 Jps

#node4
[root@node4 ~]# jps
6769 ResourceManager
6453 DataNode
6646 NodeManager
7002 Jps
6399 QuorumPeerMain
复制代码
  1. 关闭进程
    1. 关闭resourcemanager进程
      • 注意:两台机器(node3,node4)上要分别执行这个命令
    2. 直接使用stop-all.sh关闭所有进程
    3. 关闭zookeeper

四、使用浏览器访问resourcemanager

  • 由于resourcemanager配置了HA,所以有一个节点是active,一个是standby
//直接连接node4节点
node4:8088
复制代码

五、使用idea搭建一个wordcountDemo

  1. 源码
package com.sju.mr.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class MyWordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(MyWordCount.class);

        //设置输出输入
        Path inPath = new Path("/user/test01/haha.txt");
        FileInputFormat.addInputPath(job,inPath);
        Path outPath = new Path("/output/wordcount");
        //如果输出路径已经存在,先删除(如果存在会报错)
        if(outPath.getFileSystem(conf).exists(outPath))
            outPath.getFileSystem(conf).delete(outPath,true);
        FileOutputFormat.setOutputPath(job,outPath);

        //mapper做中间集映射的类
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setReducerClass(Myreducer.class);

        job.waitForCompletion(true);
    }
}


--------------------------------
--------------------------------

package com.sju.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/*
* 第一个参数KEYIN,表示每一行的第一个字符的坐标,比如第一行第一个是0后面是1,2,3(类似数组)
* 第二个:VALUEIN,代表一整行数据内容
* 第三个:KEYOUT,一个单词
* 第四个:VALUEOUT,一个数字
* */
public class MyMapper extends Mapper<Object, Text,Text, IntWritable>{
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());//value就是文本里的一个个单词
        while(itr.hasMoreTokens()){
            //每个itr都是string,要把它封装到word里
            //word是TEXT类型
            word.set(itr.nextToken());
            //one的值是1
            //word对应TEXT,one对应IntWritable
            context.write(word,one);
        }
    }
}


--------------------------------
--------------------------------

package com.sju.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
* 泛型的四个参数,就是单词,出现的次数
* */
public class Myreducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    //迭代计算
    private  IntWritable result = new IntWritable();
    //一个key下面有多个value
    //由于map已经排序完毕,所以一个key处理完毕以后,这个reduce就结束了,转而开始处理下一个key
    public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable val : values){
            sum += val.get();
        }
        result.set(sum);
        //这一组的键值对已经处理完毕,进行输出
        context.write(key,result);
    }

}

复制代码
  1. idea如何把文件导出为jar包呢?
    1. 打开 File > Project Structure > Artifacts
    2. 添加一个 Artifact( + > JAR > Empty)
    3. 输入这个jar的name(’wcc’),Name下面有这个jar包生成的路径(Output directory)
    4. 在Output Layout中给这个wc.jar添加Module Output(+ > Module Output 或者在 Available Elements框中选择对应的Module双击)
    5. 选择你的项目源文件的模块
    6. 然后点击ok退出Project Structure面板
    7. 最后生成这个jar(Build > Build Artifacts… > sampleName > Build/Rebuild)
  2. 把这个jar包放到linux的任意 目录下
    • 执行如下命令hadoop jar wc.jar com.sju.mr.wc.MyWordCount
    • 最终按照java代码的配置得出输出文件
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享