hadoop的核心 MapReduce 1. MapReduce 介绍 MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系 。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
MapReduce运行在yarn集群
ResourceManager
NodeManager
这两个阶段合起来正是MapReduce思想的体现。
还有一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
1.1. MapReduce 设计构思 MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。
一个完整的mapreduce程序在分布式运行时有三类实例进程:
MRAppMaster
负责整个程序的过程调度及状态协调
MapTask
负责map阶段的整个数据处理流程
ReduceTask
负责reduce阶段的整个数据处理流程
2. MapReduce 编程规范 MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为 2 个步骤
Map 阶段 2 个步骤
设置 InputFormat 类, 将数据切分为 Key-Value**(K1和V1)** 对, 输入到第二步 k1为源数据的每行的偏移量,v1为行数据
自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2 ) 对, 输出结果
Shuffle 阶段 4 个步骤
对输出的 Key-Value 对进行分区 自定义类继承Partitioner类 方法 getPartition 类的参数与k2v2同
reduce的默认分区只有一个,k2的哈希值与上int的最大值,模上ReduceTask的个数 指定分区就是相同类型的,共性的数据发送到同一个reduce中处理.
对不同分区的数据按照相同的 Key 排序 自定义类实现WritableComparable 成员变量为不要排序的列
(可选) 对分组过的数据初步规约 , 降低数据的网络拷贝 每个map都会产生大量本地输出 作用: 对map端的输出先做一次合并,减少在map和reduce节点间的数据传输量,提高网络io性能
应用前提: 不能影响最终业务逻辑
自定义类 继承 Reduce类
对数据进行分组 , 相同 Key 的 Value 放入一个集合中
Reduce 阶段 2 个步骤
对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3 )输出
设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
3. WordCount
需求: 在一堆给定的文本文件中统计输出每一个单词出现的总次数
Step 1. 数据格式准备
创建一个新的文件
1 2 cd /export/servers vim wordcount.txt
向其中放入以下内容并保存
1 2 3 4 hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop
上传到 HDFS
1 2 hdfs dfs -mkdir /wordcount/ hdfs dfs -put wordcount.txt /wordcount/
Step 2. Mapper 1 2 3 4 5 6 7 8 9 10 11 public class WordCountMapper extends Mapper <LongWritable ,Text ,Text ,LongWritable > { @Override public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("," ); for (String word : split) { context.write(new Text(word),new LongWritable(1 )); } } }
Step 3. Reducer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class WordCountReducer extends Reducer <Text ,LongWritable ,Text ,LongWritable > { @Override protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0 ; for (LongWritable value : values) { count += value.get(); } context.write(key,new LongWritable(count)); } }
Step 4. 定义主类, 描述 Job 并提交 Job 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class JobMain extends Configured implements Tool { @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(super .getConf(), JobMain.class.getSimpleName()); job.setJarByClass(JobMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("hdfs://192.168.52.250:8020/wordcount" )); job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.52.250:8020/wordcount_out" )); boolean b = job.waitForCompletion(true ); return b?0 :1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); Tool tool = new JobMain(); int run = ToolRunner.run(configuration, tool, args); System.exit(run); } }
常见错误 如果遇到如下错误
1 Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/":root:supergroup:drwxr-xr-x
直接将hdfs-site.xml当中的权限关闭即可
1 2 3 4 <property > <name > dfs.permissions</name > <value > false</value > </property >
最后重启一下 HDFS 集群
小细节 本地运行完成之后,就可以打成jar包放到服务器上面去运行了,实际工作当中,都是将代码打成jar包,开发main方法作为程序的入口,然后放到集群上面去运行
4. MapReduce 运行模式 本地运行模式
MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行
处理的数据及输出结果可以在本地文件系统, 也可以在hdfs上
怎样实现本地运行? 写一个程序, 不要带集群的配置文件, 本质是程序的 conf
中是否有 mapreduce.framework.name=local
以及 yarn.resourcemanager.hostname=local
参数
本地模式非常便于进行业务逻辑的 Debug
, 只要在 idea 中打断点即可
1 2 3 4 configuration.set("mapreduce.framework.name" ,"local" ); configuration.set(" yarn.resourcemanager.hostname" ,"local" ); TextInputFormat.addInputPath(job,new Path("file:///F:\\wordcount\\input" )); TextOutputFormat.setOutputPath(job,new Path("file:///F:\\wordcount\\output" ));
集群运行模式
将 MapReduce 程序提交给 Yarn 集群, 分发到很多的节点上并发执行
处理的数据和输出结果应该位于 HDFS 文件系统
提交集群的实现步骤: 将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
1 hadoop jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar cn.itcast.hdfs.demo1.JobMain
5. MapReduce 分区 在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个 Reduce 当中进行处理
例如: 为了数据的统计, 可以把一批类似的数据发送到同一个 Reduce 当中, 在同一个 Reduce 当中统计相同类型的数据, 就可以实现类似的数据分区和统计等
其实就是相同类型的数据, 有共性的数据, 送到一起去处理
Reduce 当中默认的分区只有一个
Step 1. 定义 Mapper
这个 Mapper 程序不做任何逻辑, 也不对 Key-Value 做任何改变, 只是接收数据, 然后往下发送
1 2 3 4 5 6 public class MyMapper extends Mapper <LongWritable ,Text ,Text ,NullWritable > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }
Step 2. 定义 Reducer 逻辑 这个 Reducer 也不做任何处理, 将数据原封不动的输出即可
1 2 3 4 5 6 public class MyReducer extends Reducer <Text ,NullWritable ,Text ,NullWritable > { @Override protected void reduce (Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }
Step 3. 自定义 Partitioner 主要的逻辑就在这里, 这也是这个案例的意义, 通过 Partitioner 将数据分发给不同的 Reducer
1 2 3 4 5 6 7 8 9 10 public class PartitonerOwn extends Partitioner <Text ,LongWritable > { @Override public int getPartition (Text text, LongWritable longWritable, int i) { if (text.toString().length() >=5 ){ return 0 ; }else { return 1 ; } } }
Step 4. Main 入口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class PartitionMain extends Configured implements Tool { public static void main (String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new PartitionMain(), args); System.exit(run); } @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(super .getConf(), PartitionMain.class.getSimpleName()); job.setJarByClass(PartitionMain.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,new Path("hdfs://192.168.52.250:8020/partitioner" )); TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.52.250:8020/outpartition" )); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(MyReducer.class); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2 ); boolean b = job.waitForCompletion(true ); return b?0 :1 ; } }
6. MapReduce 排序和序列化
序列化 (Serialization) 是指把结构化对象转化为字节流
反序列化 (Deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取的字节流转换为对象, 就要进行反序列化
Java 的序列化 (Serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, Hadoop 自己开发了一套序列化机制(Writable), 精简高效. 不用像 Java 对象类一样传输多层的父子关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销
Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可序列化只需实现这个接口即可
另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能
数据格式如下
1 2 3 4 5 6 7 a 1 a 9 b 3 a 7 b 8 b 10 a 5
要求:
第一列按照字典顺序进行排列
第一列相同的时候, 第二列按照升序进行排列
解决思路:
将 Map 端输出的 <key,value>
中的 key 和 value 组合成一个新的 key (newKey), value值不变
这里就变成 <(key,value),value>
, 在针对 newKey 排序的时候, 如果 key 相同, 就再对value进行排序
Step 1. 自定义类型和比较器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class PairWritable implements WritableComparable <PairWritable > { private String first; private int second; public PairWritable () { } public PairWritable (String first, int second) { this .set(first, second); } public void set (String first, int second) { this .first = first; this .second = second; } @Override public void readFields (DataInput input) throws IOException { this .first = input.readUTF(); this .second = input.readInt(); } @Override public void write (DataOutput output) throws IOException { output.writeUTF(first); output.writeInt(second); } public int compareTo (PairWritable o) { System.out.println(o.toString()); System.out.println(this .toString()); int comp = this .first.compareTo(o.first); if (comp != 0 ) { return comp; } else { return Integer.valueOf(this .second).compareTo( Integer.valueOf(o.getSecond())); } } public int getSecond () { return second; } public void setSecond (int second) { this .second = second; } public String getFirst () { return first; } public void setFirst (String first) { this .first = first; } @Override public String toString () { return "PairWritable{" + "first='" + first + '\'' + ", second=" + second + '}' ; } }
Step 2. Mapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class SortMapper extends Mapper <LongWritable ,Text ,PairWritable ,IntWritable > { private PairWritable mapOutKey = new PairWritable(); private IntWritable mapOutValue = new IntWritable(); @Override public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t" ); mapOutKey.set(strs[0 ], Integer.valueOf(strs[1 ])); mapOutValue.set(Integer.valueOf(strs[1 ])); context.write(mapOutKey, mapOutValue); } }
Step 3. Reducer 1 2 3 4 5 6 7 8 9 10 11 12 public class SortReducer extends Reducer <PairWritable ,IntWritable ,Text ,IntWritable > { private Text outPutKey = new Text(); @Override public void reduce (PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { outPutKey.set(key.getFirst()); context.write(outPutKey, value); } } }
Step 4. Main 入口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class JobMain extends Configured implements Tool { @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(super .getConf(), "mapreduce_sort" ); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\sort_input" )); job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(SortBean.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(SortBean.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\sort_out" )); boolean bl = job.waitForCompletion(true ); return bl?0 :1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
MapReduce 中的计数器 计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
hadoop内置计数器列表
MapReduce任务计数器
org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器
org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器
org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器
org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器
org.apache.hadoop.mapreduce.JobCounter
每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下截图
所有的这些都是MapReduce的计数器的功能,既然MapReduce当中有计数器的功能,我们如何实现自己的计数器???
需求:以以上分区代码为案例,统计map接收到的数据记录条数
第一种方式 第一种方式定义计数器,通过context上下文对象可以获取我们的计数器,进行记录 通过context上下文对象,在map端使用计数器进行统计
1 2 3 4 5 6 7 8 9 public class PartitionMapper extends Mapper <LongWritable ,Text ,Text ,NullWritable > { @Override protected void map (LongWritable key, Text value, Context context) throws Exception { Counter counter = context.getCounter("MR_COUNT" , "MyRecordCounter" ); counter.increment(1L ); context.write(value,NullWritable.get()); } }
运行程序之后就可以看到我们自定义的计数器在map阶段读取了七条数据
第二种方式 通过enum枚举类型来定义计数器 统计reduce端数据的输入的key有多少个
1 2 3 4 5 6 7 8 9 10 public class PartitionerReducer extends Reducer <Text ,NullWritable ,Text ,NullWritable > { public static enum Counter { MY_REDUCE_INPUT_RECORDS,MY_REDUCE_INPUT_BYTES } @Override protected void reduce (Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.getCounter(Counter.MY_REDUCE_INPUT_RECORDS).increment(1L ); context.write(key, NullWritable.get()); } }
规约Combiner 概念 每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一
combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
combiner 组件的父类就是 Reducer
combiner 和 reducer 的区别在于运行的位置
Combiner 是在每一个 maptask 所在的节点运行
Reducer 是接收全局所有 Mapper 的输出结果
combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现步骤
自定义一个 combiner 继承 Reducer,重写 reduce 方法
在 job 中设置 job.setCombinerClass(CustomCombiner.class)
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来
MapReduce案例-流量统计 需求一: 统计求和 统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和 分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入
Step 1: 自定义map的输出value对象FlowBean 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class FlowBean implements Writable { private Integer upFlow; private Integer downFlow; private Integer upCountFlow; private Integer downCountFlow; @Override public void write (DataOutput out) throws IOException { out.writeInt(upFlow); out.writeInt(downFlow); out.writeInt(upCountFlow); out.writeInt(downCountFlow); } @Override public void readFields (DataInput in) throws IOException { this .upFlow = in.readInt(); this .downFlow = in.readInt(); this .upCountFlow = in.readInt(); this .downCountFlow = in.readInt(); } public FlowBean () { } public FlowBean (Integer upFlow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) { this .upFlow = upFlow; this .downFlow = downFlow; this .upCountFlow = upCountFlow; this .downCountFlow = downCountFlow; } public Integer getUpFlow () { return upFlow; } public void setUpFlow (Integer upFlow) { this .upFlow = upFlow; } public Integer getDownFlow () { return downFlow; } public void setDownFlow (Integer downFlow) { this .downFlow = downFlow; } public Integer getUpCountFlow () { return upCountFlow; } public void setUpCountFlow (Integer upCountFlow) { this .upCountFlow = upCountFlow; } public Integer getDownCountFlow () { return downCountFlow; } public void setDownCountFlow (Integer downCountFlow) { this .downCountFlow = downCountFlow; } @Override public String toString () { return "FlowBean{" + "upFlow=" + upFlow + ", downFlow=" + downFlow + ", upCountFlow=" + upCountFlow + ", downCountFlow=" + downCountFlow + '}' ; } }
Step 2: 定义FlowMapper类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class FlowCountMapper extends Mapper <LongWritable ,Text ,Text ,FlowBean > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t" ); String phoneNum = split[1 ]; FlowBean flowBean = new FlowBean(); flowBean.setUpFlow(Integer.parseInt(split[6 ])); flowBean.setDownFlow(Integer.parseInt(split[7 ])); flowBean.setUpCountFlow(Integer.parseInt(split[8 ])); flowBean.setDownCountFlow(Integer.parseInt(split[9 ])); context.write(new Text(phoneNum), flowBean); } }
Step 3: 定义FlowReducer类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class FlowCountReducer extends Reducer <Text ,FlowBean ,Text ,FlowBean > { @Override protected void reduce (Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { FlowBean flowBean = new FlowBean(); Integer upFlow = 0 ; Integer downFlow = 0 ; Integer upCountFlow = 0 ; Integer downCountFlow = 0 ; for (FlowBean value : values) { upFlow += value.getUpFlow(); downFlow += value.getDownFlow(); upCountFlow += value.getUpCountFlow(); downCountFlow += value.getDownCountFlow(); } flowBean.setUpFlow(upFlow); flowBean.setDownFlow(downFlow); flowBean.setUpCountFlow(upCountFlow); flowBean.setDownCountFlow(downCountFlow); context.write(key, flowBean); } }
Step 4: 程序main函数入口FlowMain 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class JobMain extends Configured implements Tool { @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(super .getConf(), "mapreduce_flowcount" ); job.setJarByClass(JobMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\flowcount_input" )); job.setMapperClass(FlowCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setReducerClass(FlowCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\flowcount_out" )); boolean bl = job.waitForCompletion(true ); return bl ? 0 :1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
需求二: 上行流量倒序排序(递减排序) 分析,以需求一的输出数据作为排序的输入数据,自定义FlowBean,以FlowBean为map输出的key,以手机号作为Map输出的value,因为MapReduce程序会对Map阶段输出的key进行排序
Step 1: 定义FlowBean实现WritableComparable实现比较排序 Java 的 compareTo 方法说明:
compareTo 方法用于将当前对象与方法的参数进行比较。
如果指定的数与参数相等返回 0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
例如:o1.compareTo(o2);
返回正数的话,当前对象(调用 compareTo 方法的对象 o1)要排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class FlowBean implements WritableComparable <FlowBean > { private Integer upFlow; private Integer downFlow; private Integer upCountFlow; private Integer downCountFlow; public FlowBean () { } public FlowBean (Integer upFlow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) { this .upFlow = upFlow; this .downFlow = downFlow; this .upCountFlow = upCountFlow; this .downCountFlow = downCountFlow; } @Override public void write (DataOutput out) throws IOException { out.writeInt(upFlow); out.writeInt(downFlow); out.writeInt(upCountFlow); out.writeInt(downCountFlow); } @Override public void readFields (DataInput in) throws IOException { upFlow = in.readInt(); downFlow = in.readInt(); upCountFlow = in.readInt(); downCountFlow = in.readInt(); } public Integer getUpFlow () { return upFlow; } public void setUpFlow (Integer upFlow) { this .upFlow = upFlow; } public Integer getDownFlow () { return downFlow; } public void setDownFlow (Integer downFlow) { this .downFlow = downFlow; } public Integer getUpCountFlow () { return upCountFlow; } public void setUpCountFlow (Integer upCountFlow) { this .upCountFlow = upCountFlow; } public Integer getDownCountFlow () { return downCountFlow; } public void setDownCountFlow (Integer downCountFlow) { this .downCountFlow = downCountFlow; } @Override public String toString () { return upFlow+"\t" +downFlow+"\t" +upCountFlow+"\t" +downCountFlow; } @Override public int compareTo (FlowBean o) { return this .upCountFlow > o.upCountFlow ?-1 :1 ; } }
Step 2: 定义FlowMapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class FlowCountSortMapper extends Mapper <LongWritable ,Text ,FlowBean ,Text > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { FlowBean flowBean = new FlowBean(); String[] split = value.toString().split("\t" ); String phoneNum = split[0 ]; flowBean.setUpFlow(Integer.parseInt(split[1 ])); flowBean.setDownFlow(Integer.parseInt(split[2 ])); flowBean.setUpCountFlow(Integer.parseInt(split[3 ])); flowBean.setDownCountFlow(Integer.parseInt(split[4 ])); context.write(flowBean, new Text(phoneNum)); } }
Step 3: 定义FlowReducer 1 2 3 4 5 6 7 8 public class FlowCountSortReducer extends Reducer <FlowBean ,Text ,Text ,FlowBean > { @Override protected void reduce (FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, key); } } }
Step 4: 程序main函数入口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class JobMain extends Configured implements Tool { @Override public int run (String[] strings) throws Exception { Job job = Job.getInstance(super .getConf(), "mapreduce_flowcountsort" ); job.setJarByClass(JobMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/out/flowcount_out" )); job.setMapperClass(FlowCountSortMapper.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(FlowCountSortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/out/flowcountsort_out" )); boolean b = job.waitForCompletion(true ); return b?0 :1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
需求三: 手机号码分区 在需求一的基础上,继续完善,将不同的手机号分到不同的数据文件的当中去,需要自定义分区来实现,这里我们自定义来模拟分区,将以下数字开头的手机号进行分开
1 2 3 4 135 开头数据到一个分区文件 136 开头数据到一个分区文件 137 开头数据到一个分区文件 其他分区
自定义分区 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class FlowPartition extends Partitioner <Text ,FlowBean > { @Override public int getPartition (Text text, FlowBean flowBean, int i) { String line = text.toString(); if (line.startsWith("135" )){ return 0 ; }else if (line.startsWith("136" )){ return 1 ; }else if (line.startsWith("137" )){ return 2 ; }else { return 3 ; } } }
作业运行设置 1 2 job.setPartitionerClass(FlowPartition.class); job.setNumReduceTasks(4 );
修改输入输出路径, 并放入集群运行 1 2 TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/partition_flow/" )); TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out" ));