3-小白学习大数据之提交第一个任务
编写第一个WordCountJob程序,用来统计文件内每个单词的数量,通过这个小例子,了解一下Hadoop程序开发的过程。
1、配置环境变量
Hadoop版本使用的是3.3.6,其他低版本可自行百度,因为引入的jar包不同。Hadoop3.x版本使用如下Jar包:
hadoop-client-api-3.3.6.jar
hadoop-common-3.3.6.jar
hadoop-hdfs-3.3.6.jar
hadoop-mapreduce-client-core-3.3.6.jar
在环境变量配置文件.bash_profile
里面设置classpath,配置完:source .bash_profile
JAVA_HOME和HADOOP_HOME的路径根据自己实际情况自行修改。
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export HADOOP_HOME=/usr/local/hadoop/hadoop-3.3.6
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/common/hadoop-common-3.3.6.jar:${CLASSPATH}
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-3.3.6.jar:${CLASSPATH}
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.3.6.jar:${CLASSPATH}
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:${CLASSPATH}
2、编写第一个程序 WordCountJob.java
程序通过传入一个文本文件,统计文本中每个词语出现的数量。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
import javax.naming.Context;
@SuppressWarnings("unused")
//自定义mapper类
public class WordCountJob{
//map要接受的输入是<k1,v1>键值对,k1是当前行的偏移量,v1是内容;
//输出是k2,v2;k2为单词,v2是数字
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
protected void map(LongWritable k1,Text v1,Context context)throws IOException,InterruptedException{
//对获取到的每一行数据进行切割,将单词切割出来
String words[]=v1.toString().split(" ");
for(String word:words){
//把迭代出来的单词封装成<k2,v2>形式
Text k2=new Text(word);
LongWritable v2=new LongWritable(1l);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
}
//reduce接受的输入是map的输出,输出k3,v3,k3是单词,v3是数字
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
//针对k2s的数据进行累加求和,并且转换为k3,v3输出
public void reduce(Text k2,Iterable<LongWritable> v2s,Context context)
throws IOException,InterruptedException{
//创建一个sum变量,保存v2s的和
long sum=0l;
for(LongWritable v2:v2s){
sum+=v2.get();//保持类型一致
}
//组装k3,v3
Text k3=k2;
LongWritable v3=new LongWritable(sum);
context.write(k3,v3);
}
}
//组装job=map+reduce
public static void main(String []args) {
try{
if(args.length!=2){
System.exit(100);
}
//job需要的配置参数
Configuration conf = new Configuration();
//创建一个job
Job job = Job.getInstance(conf);
//这一行必须设置,否则在集群中执行时找不到这个类
job.setJarByClass(WordCountJob.class);
//指定输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
2.1、编译Java文件
# 编译.java文件
javac WordCountJob.java
2.2 打包.class文件为jar包
jar cvf wc1.jar WordCountJob*class
wc1.jar:这个是打完包后的jar包名称
WordCountJob*class:这个是编译后的所有.class文件
3、提交任务
通过上述操作之后,会生成一个wc1.jar的jar包。在提交任务之前,我们创建一个测试文件:test.txt, 向文件内写入以下信息:
This is a map
This is a reduce
This is a wordcount example
因为文件在本地磁盘内,需要将文件复制到hdfs内才可以使用。复制命令如下:
hdfs dfs -copyFromLocal ./test.txt /user/root
/user/root
这个目录不存在需要自行创建:hdfs dfs -mkdir /user/root
最后一步就是提交任务啦!
hadoop jar wc1.jar WordCountJob test.txt output
我们首次对自己的代码使用Hadoop JAR命令。它有4个参数:
(1) JAR文件名;
(2) JAR文件中的驱动类名;
(3) 输入文件在HDFS的位置(本例中,是对/user/Hadoop home文件夹的相对引用);
(4) 输出文件夹的目标位置(同样也是一个相对路径)。
4、查看任务执行结果
使用如下命令查看任务结果文件:
[root@5b49e23aa62e code]# hdfs dfs -cat /user/root/output/part-r-00000
This 3
a 3
example 1
is 3
map 1
reduce 1
wordcount 1
/user/root/output/part-r-00000: 因为提交任务的时候使用的是output,所以他会以/user/root为根目录创建一个output文件夹,将结果输出到这里,如果要重复执行,输出目录不变的话,需要先将这个文件夹删除(
hadoop fs -rmr output
),这是Hadoop对与输出结构的保护机制。