3-小白学习大数据之提交第一个任务

编写第一个WordCountJob程序,用来统计文件内每个单词的数量,通过这个小例子,了解一下Hadoop程序开发的过程。

1、配置环境变量

Hadoop版本使用的是3.3.6,其他低版本可自行百度,因为引入的jar包不同。Hadoop3.x版本使用如下Jar包:

hadoop-client-api-3.3.6.jar
hadoop-common-3.3.6.jar
hadoop-hdfs-3.3.6.jar
hadoop-mapreduce-client-core-3.3.6.jar

在环境变量配置文件.bash_profile里面设置classpath,配置完:source .bash_profile

JAVA_HOME和HADOOP_HOME的路径根据自己实际情况自行修改。

export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export HADOOP_HOME=/usr/local/hadoop/hadoop-3.3.6
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/common/hadoop-common-3.3.6.jar:${CLASSPATH}
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-3.3.6.jar:${CLASSPATH}
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.3.6.jar:${CLASSPATH}
export CLASSPATH=.:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:${CLASSPATH}

2、编写第一个程序 WordCountJob.java

程序通过传入一个文本文件,统计文本中每个词语出现的数量。

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

import javax.naming.Context;
@SuppressWarnings("unused")
//自定义mapper类
public class WordCountJob{
    //map要接受的输入是<k1,v1>键值对,k1是当前行的偏移量,v1是内容;
    //输出是k2,v2;k2为单词,v2是数字
    public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        protected void map(LongWritable k1,Text v1,Context context)throws IOException,InterruptedException{
            //对获取到的每一行数据进行切割,将单词切割出来
            String words[]=v1.toString().split(" ");
            for(String word:words){
                //把迭代出来的单词封装成<k2,v2>形式
                Text k2=new Text(word);
                LongWritable v2=new LongWritable(1l);
                //把<k2,v2>写出去
                context.write(k2,v2);
            }
        }
    }

    //reduce接受的输入是map的输出,输出k3,v3,k3是单词,v3是数字
    public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        //针对k2s的数据进行累加求和,并且转换为k3,v3输出
        public void reduce(Text k2,Iterable<LongWritable> v2s,Context context)
                throws IOException,InterruptedException{
            //创建一个sum变量,保存v2s的和
            long sum=0l;
            for(LongWritable v2:v2s){
                sum+=v2.get();//保持类型一致
            }
            //组装k3,v3
            Text k3=k2;
            LongWritable v3=new LongWritable(sum);
            context.write(k3,v3);
        }
    }

    //组装job=map+reduce
    public static void main(String []args) {
        try{
            if(args.length!=2){
                System.exit(100);
            }
            //job需要的配置参数
            Configuration conf = new  Configuration();

            //创建一个job
            Job job = Job.getInstance(conf);


            //这一行必须设置,否则在集群中执行时找不到这个类
            job.setJarByClass(WordCountJob.class);
            //指定输入路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //指定输出目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2类型
            job.setMapOutputValueClass(LongWritable.class);


            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            job.waitForCompletion(true);

        }catch(Exception e){
            e.printStackTrace();
        }
    }

}

2.1、编译Java文件

# 编译.java文件
javac WordCountJob.java

2.2 打包.class文件为jar包

jar cvf wc1.jar WordCountJob*class

wc1.jar:这个是打完包后的jar包名称
WordCountJob*class:这个是编译后的所有.class文件

3、提交任务

通过上述操作之后,会生成一个wc1.jar的jar包。在提交任务之前,我们创建一个测试文件:test.txt, 向文件内写入以下信息:

This is a map
This is a reduce
This is a wordcount example

因为文件在本地磁盘内,需要将文件复制到hdfs内才可以使用。复制命令如下:

hdfs dfs -copyFromLocal ./test.txt /user/root

/user/root这个目录不存在需要自行创建:hdfs dfs -mkdir /user/root

最后一步就是提交任务啦!

hadoop jar wc1.jar WordCountJob test.txt output

我们首次对自己的代码使用Hadoop JAR命令。它有4个参数:
(1) JAR文件名;
(2) JAR文件中的驱动类名;
(3) 输入文件在HDFS的位置(本例中,是对/user/Hadoop home文件夹的相对引用);
(4) 输出文件夹的目标位置(同样也是一个相对路径)。

4、查看任务执行结果

使用如下命令查看任务结果文件:

[root@5b49e23aa62e code]# hdfs dfs -cat /user/root/output/part-r-00000
This    3
a       3
example 1
is      3
map     1
reduce  1
wordcount       1

/user/root/output/part-r-00000: 因为提交任务的时候使用的是output,所以他会以/user/root为根目录创建一个output文件夹,将结果输出到这里,如果要重复执行,输出目录不变的话,需要先将这个文件夹删除(hadoop fs -rmr output),这是Hadoop对与输出结构的保护机制。