4-小白学习大数据之脚本语言提交Hadoop任务
除了使用Java去开发Hadoop任务处理逻辑,还可以使用其他语言进行开发,例如: Python,这一次我们使用Python编写Mapper和Reduce,来处理UFO目击事件的数据。数据源可以到网上下载。
1、环境版本
- Hadoop Version 3.3.6
- Python Version 3.6.8
2、代码
本次测试代码主要统计UFO的形状。
2.1、Mapper
#! /usr/bin/env python3
# -*- coding:UTF-8 -*-
import sys
# sys.stdin为读取数据,遍历读入数据的每一行
for line in sys.stdin:
#删除开头和结尾的空格
line = line.strip()
# 以默认空格分隔行单词到words列表
words = line.split('\t')
if len(words) == 6 and words[3]:
print('%s\t%s' %(words[3],1))
代码接收文件的每一行数据,使用制表符进行分割,过滤掉数据不全和没有形状为空的垃圾数据(有时候垃圾数据也需要使用,看具体情况)
2.2、Reduce
#!/usr/bin/env python3
# -*- coding:UTF-8 -*-
#from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# line = "a 1"
for line in sys.stdin:
line = line.strip()
word,count = line.split('\t', 1)
# print(count)
try:
count = int(count)
except ValueError:
# print(1)
continue
if current_word == word:
current_count += count
else:
if current_word:
print ('%s\t%s' % (current_word, current_count))
current_count = count
current_word = word
if current_word == word:
print ('%s\t%s' % (current_word, current_count))
3、提交任务
Mapper和Reduce代码开发好了之后,就可以调用Hadoop命令提交任务了
hadoop jar /usr/local/hadoop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar -mapper "python3 /Users/local/code/python/mapper.py" -reducer "python3 /Users/local/code/python/reducer.py" -input /user/root/ufo.tsv -output /output_dir -file /Users/local/code/python/mapper.py -file /Users/local/code/python/reducer.py
hadoop-streaming-3.3.6.jar:这个是使用流的方式提交任务的jar包
-mapper:指向的是刚才编写的Mapper文件
-reducer:指向的是刚才编写的Reduce文件
-input:指向的是待分析的UFO数据文件(放到hdfs内)
-file:两个file分别指向刚才写的Mapper和Reduce文件
-output:输出文件夹(hdfs内的文件夹)
4、简单统计
在开发过程中,为了测试代码或数据规模比较小的情况下,也可以使用如下命令进行统计测试。
cat ufo.tsv | mapper.py | sort | reduce.py