5-小白学习大数据之Java实现UFO数据统计
上一个记录了使用Python实现UFO数据的统计案例,下面使用Java来开发一下UFO数据的统计。
1、分析
在使用Python开发MapReduce的时候,我们对数据进行了过滤,不满足6条的数据忽略,那么当我们需要使用同一个结果集进行多个维度统计的时候,是否可以将上面过滤数据的部分抽出来共用呢。
答案是肯定的,我们可以使用org.apache.hadoop.mapred.lib.ChainMapper
来解决这个问题。ChainMapper类可以顺序执行多个mapper,而且最后的mapper输出会传递给reducer。 ChainMapper不仅适用于这种数据清理,而且在分析特定作业时,先通过它执行多个map型任务 再应用reducer的做法也很常见。
这种做法需要编写一个校验mapper,它可用于将来所有的字段分析作业。校验mapper会丢弃 错误记录行,只将有效的行传入实际的业务逻辑mapper。这样的话,目前的业务逻辑mapper就可 以专注于分析数据而不用担心粗粒度的校验。
2、使用ChainMapper进行字段验证、分析
- 在
UFORecordValidationMapper.java
文件中创建如下类:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
public class UFORecordValidationMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
if (validate(line))
output.collect(key, value);
}
private boolean validate(String str) {
String[] parts = str.split("\t");
if (parts.length != 6)
return false;
return true;
}
}
- 在
UFOLocation.java
文件中创建如下类:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class UFOLocation {
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-Z]{2}[^a-zA-Z]*$");
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if (location.length() >= 2) {
Matcher matcher = locationPattern.matcher(location);
if (matcher.find()) {
int start = matcher.start();
String state = location.substring(start, start + 2);
output.collect(new Text(state.toUpperCase()), one);
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
JobConf conf = new JobConf(config, UFOLocation.class);
conf.setJobName("UFOLocation");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
JobConf mapconf1 = new JobConf(false);
ChainMapper.addMapper(conf, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, true, mapconf1);
JobConf mapconf2 = new JobConf(false);
ChainMapper.addMapper(conf, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, true, mapconf2);
conf.setMapperClass(ChainMapper.class);
conf.setCombinerClass(LongSumReducer.class);
conf.setReducerClass(LongSumReducer.class);
FileInputFormat.setInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
- 编译上面两个文件
> javac UFORecordValidationMapper.java UFOLocation.java
- 将上述两个类文件打包为ufo1.jar文件并提交作业至Hadoop
> jar cvf ufo1.jar *.class
- 将输出文件拷贝至本地文件系统并检查它
> hadoop jar ./ufo1.jar UFOLocation /user/root/ufo.tsv output
> hdfs dfs -cat /user/root/output/part-00000
AB 286
AD 6
AE 7
AI 6
AK 234
AL 548
AM 22
...
本作业的驱动程序发生的变化最大。之前的驱动配置中仅包含一个map类,而本作业的驱动配置要多次调用ChainMapper类。
在驱动中多次调用ChainMapper类的一般模式是,为每个mapper新建一个配置对象,然后将mapper添加到ChainMapper类,同时指定输入和输出位置,并引用整个作业的配置对象。
请注意,上述两个mapper的参数略有不同。它们都输入LongWritable类型的键及Text类 型的值。区别在于输出的数据类型:UFORecordValidationMapper输出LongWritable类型的 键及Text类型的值,而UFOLocationMapper则相反,输出Text类型的键及LongWritable类型 的值。
重要的是,要保证mapper链条末端的UFOLocationMapper的输入与reduce类(LongSum- Reducer)的输入类型相匹配。在使用ChainMapper类时,只要符合下列条件,链条中的mapper可以有不同的输入和输出:
- 除最后一个mapper外,链条中每个map的输出与下一个mapper的输入相匹配;
- 对最后一个mapper而言,其输出与reducer输入相匹配。
3、改进输出地点
3.1、不足
上面的地点取首字母来表示地点并不是完全可行,在人工分析源文件之后,许多数据问题浮出水面。
- 州名的大写缩写不一致。
- 大量的目击事件并非发生在美国,尽管它们可能遵守类似(城市,地区)的格式,但是它们的缩写并不在我们预计的50个地区缩写之内。
- 有些字段根本不遵守(城市,地区)这样的规则,但仍会被正则表达式采集到。
我们需要对结果进行过滤,最好是将美国记录标准化为正确的州名输出,并将其余数据划分 为一个范围更宽泛的大类。
为了执行这个任务,需要在mapper中添加一些内容,让它明白什么是有效的美国州名缩写。 当然,我们可以将其硬编码到mapper中,但这似乎不是正确的做法。虽然现在我们计划将所有非 美国的目击事件视为一类,但我们今后还可能对该类进行扩展,比如按国家进行划分。如果将州 名缩写硬编码到mapper,那就需要每次重新编译我们的mapper。
3.2、使用 Distributed Cache 改进地点输出
我们将整理好的洲名和首字母对应的关系文件保存到HDFS上。
hdfs dfs -put states.txt(本地) states.txt(hdfs)
文件使用:首字母+制表符+州名的格式,格式如下:
AL Alabama
AK Alaska
AZ Arizona
AR Arkansas
CA California
CO Colorado
CT Connecticut
...
修改UFOLocation.java
文件代码如下:
//package ufo2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mapreduce.Job;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author jiaoyang
* @title: UFOLocation
* @projectName java
* @description: TODO
* @date 2023/8/2215:50
*/
public class UFOLocation {
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-Z]{2}[^a-zA-Z]*$");
private Map<String, String> stateNames;
@Override
public void configure(JobConf job) {
try {
Path[] cacheFiles = JobContextImpl.getLocalCacheFiles(job);
String statesTxt = cacheFiles[0].toString().replace("file:", "").trim();
System.out.println(statesTxt);
setupStateMap(statesTxt);
} catch (IOException e) {
System.err.println("Error reading state file.");
e.printStackTrace();
// System.err.println(Arrays.toString(e.getStackTrace()));
System.exit(1);
}
}
private void setupStateMap(String filename) throws IOException {
Map<String, String> states = new HashMap<String, String>();
try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
String line = reader.readLine();
while (line != null) {
String[] split = line.split("\t");
states.put(split[0], split[1]);
line = reader.readLine();
}
}
stateNames = states;
}
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if (location.length() >= 2) {
Matcher matcher = locationPattern.matcher(location);
if (matcher.find()) {
int start = matcher.start();
String state = location.substring(start, start + 2);
// output.collect(new Text(state.toUpperCase()), one);
output.collect(new Text(lookupState(state.toUpperCase())), one);
}
}
}
private String lookupState( String state)
{
String fullName = stateNames.get(state) ;
return fullName == null? "Other": fullName ;
}
}
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
JobConf conf = new JobConf(config, UFOLocation.class);
conf.setJobName("UFOLocation");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
// 这个方法,在2.2版本已弃用DistributedCache.addCacheFile(new URI("/user/root/states.txt"), conf) ;
Job.addCacheFile(new URI("/user/root/states.txt"), conf);
JobConf mapconf1 = new JobConf(false);
ChainMapper.addMapper(conf, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, true, mapconf1);
JobConf mapconf2 = new JobConf(false);
ChainMapper.addMapper(conf, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, true, mapconf2);
conf.setMapperClass(ChainMapper.class);
conf.setCombinerClass(LongSumReducer.class);
conf.setReducerClass(LongSumReducer.class);
FileInputFormat.setInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
上面的代码在main
方法中添加了:
Job.addCacheFile(new URI("/user/root/states.txt"), conf);
用于将文件分发到各个节点。
在MapClass
内部类中添加了重写了MapReduceBase
类的configure
方法,该方法使用map方法将州名缩写与全称关联起来。configure
方法在任务启动时被调用,其默认实现不执行任何操作。
写上面代码的时候,发现了一个问题,cacheFiles[0].toString() 返回的文件路径前面多了一个:“file:”,导致后面读取文件的时候报错,这里我直接操作字符串给替换掉了,如果有人有好的办法请多指教。
将修改好的代码编译打包,执行后获得一下结果:
[root@5b49e23aa62e ufo2]# hdfs dfs -cat output_ufo2/part-00000
Alabama 548
Alaska 234
Arizona 2097
Arkansas 534
California 7679
Colorado 1457
Connecticut 608
Delaware 127
...
4、实现计数器,统计被忽略的数据数量
上面的代码,统计了每个州的数据汇总信息,但是我们在汇总的时候,过滤掉了部分不满足统计条件的数据,这部分数据量不知道是多少,下面我们使用自定义的计数器,来统计一下这部分数据数量。
修改UFORecordValidationMapper.java
文件,改造如下:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
public class UFORecordValidationMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
public enum LineCounters {
BAD_LINES,
TOO_MANY_TABS,
TOO_FEW_TABS
}
@Override
public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
if (validate(line, reporter))
output.collect(key, value);
}
private boolean validate(String str, Reporter reporter) {
String[] parts = str.split("\t");
if (parts.length != 6) {
if (parts.length < 6) {
reporter.incrCounter(LineCounters.TOO_FEW_TABS, 1);
} else {
reporter.incrCounter(LineCounters.TOO_MANY_TABS, 1);
}
reporter.incrCounter(LineCounters.BAD_LINES, 1);
if ((reporter.getCounter(LineCounters.BAD_LINES).getCounter() % 10) == 0) {
reporter.setStatus("Got 10 bad lines.");
System.err.println("Read another 10 bad lines.");
}
return false;
}
return true;
}
}
编译运行:
> hadoop jar ./ufo2.jar UFOLocation /user/root/ufo.tsv output_ufo3
...
UFORecordValidationMapper$LineCounters
BAD_LINES=326
TOO_FEW_TABS=2
TOO_MANY_TABS=324
...
假如你用的是Hadoop context对象API,就要用Context.getCounter(). increment()方法访问计数器。