当前位置: 首页 > news >正文

实验5 MapReduce初级编程实践

实验步骤

(一)编程实现文件合并和去重操作

对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例供参考。

输入文件A的样例如下:

 

20170101     x

20170102     y

20170103     x

20170104     y

20170105     z

20170106     x

 

输入文件B的样例如下:

20170101      y

20170102      y

20170103      x

20170104      z

20170105      y

 

根据输入文件A和B合并得到的输出文件C的样例如下:

20170101      x

20170101      y

20170102      y

20170103      x

20170104      y

20170104      z

20170105      y

20170105      z

20170106      x

 

(二)编写程序实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。

输入文件1的样例如下:

33

37

12

40

 

输入文件2的样例如下:

4

16

39

5

 

输入文件3的样例如下:

1

45

25

 

根据输入文件1、2和3得到的输出文件如下:

1 1

2 4

3 5

4 12

5 16

6 25

7 33

8 37

9 39

10 40

11 45

       

(三)对给定的表格进行信息挖掘

下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。

输入文件内容如下:

child          parent

Steven        Lucy

Steven        Jack

Jone         Lucy

Jone         Jack

Lucy         Mary

Lucy         Frank

Jack         Alice

Jack         Jesse

David       Alice

David       Jesse

Philip       David

Philip       Alma

 Mark       David

Mark       Alma

 

输出文件内容如下:

grandchild       grandparent

Steven          Alice

Steven          Jesse

Jone            Alice

Jone            Jesse

Steven          Mary

Steven          Frank

Jone            Mary

Jone            Frank

Philip           Alice

Philip           Jesse

Mark           Alice

Mark           Jesse

具体代码

导入依赖

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>step1</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.4.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

任务一

MergeDeduplicateMapper
package com.example;import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class MergeDeduplicateMapper extends Mapper<LongWritable, Text, Text, Text> {private final Text outputKey = new Text();private final Text outputValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString().trim();// 跳过空行if (line.isEmpty()) {return;}// 按制表符或空格分割String[] parts = line.split("\\s+");if (parts.length >= 2) {// 以日期和值的组合作为key,这样可以去除重复的记录String date = parts[0];String val = parts[1];// 创建复合键:日期+值String compositeKey = date + "\t" + val;outputKey.set(compositeKey);outputValue.set("");  // 值为空,因为我们只需要keycontext.write(outputKey, outputValue);}}
}
MergeDeduplicateReducer
package com.example;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class MergeDeduplicateReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 由于在mapper阶段已经用复合键去重,这里每个key只会出现一次// 直接输出键(包含日期和值)String compositeKey = key.toString();String[] parts = compositeKey.split("\t");if (parts.length == 2) {Text outputKey = new Text(parts[0]);Text outputValue = new Text(parts[1]);context.write(outputKey, outputValue);}}
}
MergeDeduplicateDriver
package com.example;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MergeDeduplicateDriver {public static void main(String[] args) throws Exception {if (args.length != 3) {System.err.println("Usage: MergeDeduplicate <input path A> <input path B> <output path>");System.exit(-1);}Configuration conf = new Configuration();Job job = Job.getInstance(conf, "Merge and Deduplicate Files");// 设置Jar类job.setJarByClass(MergeDeduplicateDriver.class);// 设置Mapper和Reducer类job.setMapperClass(MergeDeduplicateMapper.class);job.setReducerClass(MergeDeduplicateReducer.class);// 设置输出键值类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入输出格式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));  // 文件AFileInputFormat.addInputPath(job, new Path(args[1]));  // 文件BFileOutputFormat.setOutputPath(job, new Path(args[2])); // 输出文件C// 设置Reducer任务数量job.setNumReduceTasks(1);// 等待作业完成System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

任务二

SortMapper
package com.example;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class SortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {private final IntWritable number = new IntWritable();private static final IntWritable one = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString().trim();// 跳过空行if (line.isEmpty()) {return;}try {// 将字符串转换为整数int num = Integer.parseInt(line);number.set(num);// 输出 (数字, 1),其中1是占位符context.write(number, one);} catch (NumberFormatException e) {// 如果遇到非数字内容,跳过System.err.println("跳过非数字内容: " + line);}}
}
SortReducer
package com.example;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;public class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {private final IntWritable rank = new IntWritable(1);@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {// 由于MapReduce已经按key(数字)排序,我们只需要按顺序输出排名for (IntWritable value : values) {context.write(rank, key);// 排名递增rank.set(rank.get() + 1);}}
}
SortDriver
package com.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SortDriver {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: SortDriver <input path> <output path>");System.exit(-1);}Configuration conf = new Configuration();Job job = Job.getInstance(conf, "Number Sort with Rank");// 设置Jar类job.setJarByClass(SortDriver.class);// 设置Mapper和Reducer类job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);// 设置Mapper输出键值类型job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(IntWritable.class);// 设置最终输出键值类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);// 设置输入输出格式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));  // 输入目录(包含所有输入文件)FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出目录// 设置Reducer任务数量为1,确保全局排序job.setNumReduceTasks(1);// 等待作业完成System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

任务三

GrandParentMapper
package com.example;import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class GrandParentMapper extends Mapper<LongWritable, Text, Text, Text> {private final Text outputKey = new Text();private final Text outputValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString().trim();// 跳过空行和标题行if (line.isEmpty() || line.startsWith("child") || line.startsWith("grandchild")) {return;}// 按制表符或空格分割String[] parts = line.split("\\s+");if (parts.length >= 2) {String child = parts[0];String parent = parts[1];// 输出两种关系:// 1. 作为子代关系:key=子, value="1:"+父 (1表示这是子代关系)// 2. 作为父代关系:key=父, value="2:"+子 (2表示这是父代关系)outputKey.set(child);outputValue.set("1:" + parent);context.write(outputKey, outputValue);outputKey.set(parent);outputValue.set("2:" + child);context.write(outputKey, outputValue);}}
}
GrandParentReducer
package com.example;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class GrandParentReducer extends Reducer<Text, Text, Text, Text> {private final Text outputKey = new Text();private final Text outputValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 存储该人的子女列表和父母列表List<String> children = new ArrayList<>();List<String> parents = new ArrayList<>();for (Text value : values) {String valStr = value.toString();if (valStr.startsWith("1:")) {// 这是父母关系:1:父母名parents.add(valStr.substring(2));} else if (valStr.startsWith("2:")) {// 这是子女关系:2:子女名children.add(valStr.substring(2));}}// 生成祖孙关系:该人的父母(祖父母) × 该人的子女(孙子)for (String parent : parents) {for (String child : children) {outputKey.set(child);        // 孙子outputValue.set(parent);     // 祖父母context.write(outputKey, outputValue);}}}
}
GrandParentDriver
package com.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class GrandParentDriver {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: GrandParentDriver <input path> <output path>");System.exit(-1);}Configuration conf = new Configuration();Job job = Job.getInstance(conf, "Find Grandparent Relationships");// 设置Jar类job.setJarByClass(GrandParentDriver.class);// 设置Mapper和Reducer类job.setMapperClass(GrandParentMapper.class);job.setReducerClass(GrandParentReducer.class);// 设置输出键值类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入输出格式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置Reducer任务数量job.setNumReduceTasks(1);// 等待作业完成System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
http://www.gsyq.cn/news/63046.html

相关文章:

  • 2025 年液化气泵厂家最新推荐榜,聚焦技术创新与质量保障的优质品牌深度解析无密封/磁力/倒罐/双端面机械密封/屏蔽/增压液化气泵公司推荐
  • 中药品牌十强排名彰显实力,好医生以完整产业链布局未来
  • 【51单片机】【protues仿真】基于51单片机自动浇花强大的系统
  • 2025年定期排污扩容器生产商权威推荐榜单:电厂疏水扩容器/定连排疏水扩容器/定期排污疏水扩容器源头厂家精选
  • 不只是制药!中药品牌排行榜10强好医生,用石榴谱写产业富民传奇
  • 微波烘干设备哪家好?国内优质企业及业务解析
  • CF1985G-D-Function
  • 2025 年义乌商务礼品厂家最新推荐榜,全链条能力与定制服务双维度深度解析商务伴手礼/商务礼品网/定制商务礼品/商务福利礼品/商务实用礼品公司推荐
  • 2025企业智能BI与知识库本地化部署实力厂商全景透视:从BI私有化、AI知识库到DeepSeek专有方案,方案,谁在定义数据新基座?
  • 排名榜单重磅来袭,关注优质十大留学机构
  • 国际物流公司优选指南:国际物流主流企业综合对比分析
  • 2025年11月优质代运营公司TOP5推荐:Facebook、LinkedIn、TikTok、Google、INS等全平台覆盖
  • 综合评估结果公布,揭晓十大留学机构排名榜单
  • 深入解析:Flink 并行度与最大并行度从 0 到弹性扩缩容
  • 留学机构排行榜TOP10:2025申请季弯道超车就靠它!
  • 2025申请季“决胜关键”:十大留学中介深度解码
  • 剖析十大留学中介:从服务细节到成功案例综合指南
  • SpecKit 规范驱动开发
  • NOIP2025邮寄
  • 2025申请季十大留学机构争霸:文书决胜申请头筹
  • Windows服务器如何重新注册.Net4.0?
  • 牛客刷题-Day24
  • 实测有效!有抗衰效果的口服产品,30+内调抗衰宝藏清单
  • 2025 美国货代公司排行榜:权威测评与中美专线优选指南
  • 成都恒利泰HT-LFCG-900+ , Pin-to-Pin 替代Mini-Circuits
  • Mac使用【访达】的【显示】设置显示完整路径。
  • 2025高性能隔热条品牌权威推荐榜:麓特丹领跑行业技术革新
  • 2025年V型螺旋输送机订制厂家权威推荐榜单:V型螺旋输送机/U型螺旋输送机/Z型斗式提升机源头厂家精选
  • 不是单点突破,而是立体重构:湖南天硕打造的国产高可靠存储生态
  • 2025年下半年消防检测/房屋结构/承载力/房屋鉴定公司前五推荐