5-小白学习大数据之Java实现UFO数据统计

上一个记录了使用Python实现UFO数据的统计案例,下面使用Java来开发一下UFO数据的统计。

1、分析

在使用Python开发MapReduce的时候,我们对数据进行了过滤,不满足6条的数据忽略,那么当我们需要使用同一个结果集进行多个维度统计的时候,是否可以将上面过滤数据的部分抽出来共用呢。
答案是肯定的,我们可以使用org.apache.hadoop.mapred.lib.ChainMapper来解决这个问题。ChainMapper类可以顺序执行多个mapper,而且最后的mapper输出会传递给reducer。 ChainMapper不仅适用于这种数据清理,而且在分析特定作业时,先通过它执行多个map型任务 再应用reducer的做法也很常见。
这种做法需要编写一个校验mapper,它可用于将来所有的字段分析作业。校验mapper会丢弃 错误记录行,只将有效的行传入实际的业务逻辑mapper。这样的话,目前的业务逻辑mapper就可 以专注于分析数据而不用担心粗粒度的校验。

2、使用ChainMapper进行字段验证、分析

  1. UFORecordValidationMapper.java文件中创建如下类:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;

public class UFORecordValidationMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
    @Override
    public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
        String line = value.toString();
        if (validate(line))
            output.collect(key, value);
    }

    private boolean validate(String str) {
        String[] parts = str.split("\t");
        if (parts.length != 6)
            return false;
        return true;
    }
}
  1. UFOLocation.java文件中创建如下类:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.LongSumReducer;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class UFOLocation {
    public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
        private final static LongWritable one = new LongWritable(1);
        private static Pattern locationPattern = Pattern.compile("[a-zA-Z]{2}[^a-zA-Z]*$");

        public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            String[] fields = line.split("\t");
            String location = fields[2].trim();
            if (location.length() >= 2) {
                Matcher matcher = locationPattern.matcher(location);
                if (matcher.find()) {
                    int start = matcher.start();
                    String state = location.substring(start, start + 2);
                    output.collect(new Text(state.toUpperCase()), one);
                }

            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        JobConf conf = new JobConf(config, UFOLocation.class);
        conf.setJobName("UFOLocation");
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(LongWritable.class);

        JobConf mapconf1 = new JobConf(false);
        ChainMapper.addMapper(conf, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, true, mapconf1);

        JobConf mapconf2 = new JobConf(false);
        ChainMapper.addMapper(conf, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, true, mapconf2);
        conf.setMapperClass(ChainMapper.class);

        conf.setCombinerClass(LongSumReducer.class);
        conf.setReducerClass(LongSumReducer.class);

        FileInputFormat.setInputPaths(conf, args[0]);
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

  1. 编译上面两个文件
> javac UFORecordValidationMapper.java UFOLocation.java 
  1. 将上述两个类文件打包为ufo1.jar文件并提交作业至Hadoop
> jar cvf ufo1.jar *.class
  1. 将输出文件拷贝至本地文件系统并检查它
> hadoop jar ./ufo1.jar UFOLocation /user/root/ufo.tsv output
> hdfs dfs -cat /user/root/output/part-00000
  AB      286
  AD      6
  AE      7
  AI      6
  AK      234
  AL      548
  AM      22
  ...

本作业的驱动程序发生的变化最大。之前的驱动配置中仅包含一个map类,而本作业的驱动配置要多次调用ChainMapper类。
在驱动中多次调用ChainMapper类的一般模式是,为每个mapper新建一个配置对象,然后将mapper添加到ChainMapper类,同时指定输入和输出位置,并引用整个作业的配置对象。
请注意,上述两个mapper的参数略有不同。它们都输入LongWritable类型的键及Text类 型的值。区别在于输出的数据类型:UFORecordValidationMapper输出LongWritable类型的 键及Text类型的值,而UFOLocationMapper则相反,输出Text类型的键及LongWritable类型 的值。
重要的是,要保证mapper链条末端的UFOLocationMapper的输入与reduce类(LongSum- Reducer)的输入类型相匹配。在使用ChainMapper类时,只要符合下列条件,链条中的mapper可以有不同的输入和输出:

  • 除最后一个mapper外,链条中每个map的输出与下一个mapper的输入相匹配;
  • 对最后一个mapper而言,其输出与reducer输入相匹配。

3、改进输出地点

3.1、不足

上面的地点取首字母来表示地点并不是完全可行,在人工分析源文件之后,许多数据问题浮出水面。

  • 州名的大写缩写不一致。
  • 大量的目击事件并非发生在美国,尽管它们可能遵守类似(城市,地区)的格式,但是它们的缩写并不在我们预计的50个地区缩写之内。
  • 有些字段根本不遵守(城市,地区)这样的规则,但仍会被正则表达式采集到。

我们需要对结果进行过滤,最好是将美国记录标准化为正确的州名输出,并将其余数据划分 为一个范围更宽泛的大类。
为了执行这个任务,需要在mapper中添加一些内容,让它明白什么是有效的美国州名缩写。 当然,我们可以将其硬编码到mapper中,但这似乎不是正确的做法。虽然现在我们计划将所有非 美国的目击事件视为一类,但我们今后还可能对该类进行扩展,比如按国家进行划分。如果将州 名缩写硬编码到mapper,那就需要每次重新编译我们的mapper。

3.2、使用 Distributed Cache 改进地点输出

我们将整理好的洲名和首字母对应的关系文件保存到HDFS上。

hdfs dfs -put states.txt(本地) states.txt(hdfs)

文件使用:首字母+制表符+州名的格式,格式如下:

AL	Alabama
AK	Alaska
AZ	Arizona
AR	Arkansas
CA	California
CO	Colorado
CT	Connecticut 
...

修改UFOLocation.java文件代码如下:

//package ufo2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mapreduce.Job;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * @author jiaoyang
 * @title: UFOLocation
 * @projectName java
 * @description: TODO
 * @date 2023/8/2215:50
 */
public class UFOLocation {
    public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
        private final static LongWritable one = new LongWritable(1);
        private static Pattern locationPattern = Pattern.compile("[a-zA-Z]{2}[^a-zA-Z]*$");
        private Map<String, String> stateNames;

        @Override
        public void configure(JobConf job) {
            try {
                Path[] cacheFiles = JobContextImpl.getLocalCacheFiles(job);
                String statesTxt = cacheFiles[0].toString().replace("file:", "").trim();
                System.out.println(statesTxt);
                setupStateMap(statesTxt);
            } catch (IOException e) {
                System.err.println("Error reading state file.");
                e.printStackTrace();
//                System.err.println(Arrays.toString(e.getStackTrace()));
                System.exit(1);
            }
        }

        private void setupStateMap(String filename) throws IOException {
            Map<String, String> states = new HashMap<String, String>();
            try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
                String line = reader.readLine();
                while (line != null) {
                    String[] split = line.split("\t");
                    states.put(split[0], split[1]);
                    line = reader.readLine();
                }
            }
            stateNames = states;
        }

        public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            String[] fields = line.split("\t");
            String location = fields[2].trim();
            if (location.length() >= 2) {
                Matcher matcher = locationPattern.matcher(location);
                if (matcher.find()) {
                    int start = matcher.start();
                    String state = location.substring(start, start + 2);
//                    output.collect(new Text(state.toUpperCase()), one);
                    output.collect(new Text(lookupState(state.toUpperCase())), one);
                }

            }
        }

        private String lookupState( String state)
        {
            String fullName = stateNames.get(state) ;
            return fullName == null? "Other": fullName ;
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        JobConf conf = new JobConf(config, UFOLocation.class);

        conf.setJobName("UFOLocation");
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(LongWritable.class);

        // 这个方法,在2.2版本已弃用DistributedCache.addCacheFile(new URI("/user/root/states.txt"), conf) ;
        Job.addCacheFile(new URI("/user/root/states.txt"), conf);

        JobConf mapconf1 = new JobConf(false);
        ChainMapper.addMapper(conf, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, true, mapconf1);

        JobConf mapconf2 = new JobConf(false);
        ChainMapper.addMapper(conf, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, true, mapconf2);
        conf.setMapperClass(ChainMapper.class);

        conf.setCombinerClass(LongSumReducer.class);
        conf.setReducerClass(LongSumReducer.class);

        FileInputFormat.setInputPaths(conf, args[0]);
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

上面的代码在main方法中添加了:

Job.addCacheFile(new URI("/user/root/states.txt"), conf);

用于将文件分发到各个节点。
MapClass内部类中添加了重写了MapReduceBase类的configure方法,该方法使用map方法将州名缩写与全称关联起来。configure方法在任务启动时被调用,其默认实现不执行任何操作。

写上面代码的时候,发现了一个问题,cacheFiles[0].toString() 返回的文件路径前面多了一个:“file:”,导致后面读取文件的时候报错,这里我直接操作字符串给替换掉了,如果有人有好的办法请多指教。

将修改好的代码编译打包,执行后获得一下结果:

[root@5b49e23aa62e ufo2]# hdfs dfs -cat output_ufo2/part-00000
Alabama 548
Alaska  234
Arizona 2097
Arkansas        534
California      7679
Colorado        1457
Connecticut     608
Delaware        127
...

4、实现计数器,统计被忽略的数据数量

上面的代码,统计了每个州的数据汇总信息,但是我们在汇总的时候,过滤掉了部分不满足统计条件的数据,这部分数据量不知道是多少,下面我们使用自定义的计数器,来统计一下这部分数据数量。 修改UFORecordValidationMapper.java文件,改造如下:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;

public class UFORecordValidationMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
    public enum LineCounters {
        BAD_LINES,
        TOO_MANY_TABS,
        TOO_FEW_TABS
    }

    @Override
    public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
        String line = value.toString();
        if (validate(line, reporter))
            output.collect(key, value);
    }

    private boolean validate(String str, Reporter reporter) {
        String[] parts = str.split("\t");
        if (parts.length != 6) {
            if (parts.length < 6) {
                reporter.incrCounter(LineCounters.TOO_FEW_TABS, 1);
            } else {
                reporter.incrCounter(LineCounters.TOO_MANY_TABS, 1);
            }
            reporter.incrCounter(LineCounters.BAD_LINES, 1);
            if ((reporter.getCounter(LineCounters.BAD_LINES).getCounter() % 10) == 0) {
                reporter.setStatus("Got 10 bad lines.");
                System.err.println("Read another 10 bad lines.");
            }
            return false;
        }

        return true;
    }
}

编译运行:

> hadoop jar ./ufo2.jar UFOLocation /user/root/ufo.tsv output_ufo3
...
UFORecordValidationMapper$LineCounters
                BAD_LINES=326
                TOO_FEW_TABS=2
                TOO_MANY_TABS=324
...

假如你用的是Hadoop context对象API,就要用Context.getCounter(). increment()方法访问计数器。