4-小白学习大数据之脚本语言提交Hadoop任务

除了使用Java去开发Hadoop任务处理逻辑,还可以使用其他语言进行开发,例如: Python,这一次我们使用Python编写Mapper和Reduce,来处理UFO目击事件的数据。数据源可以到网上下载。

1、环境版本

  • Hadoop Version 3.3.6
  • Python Version 3.6.8

2、代码

本次测试代码主要统计UFO的形状。

2.1、Mapper

#! /usr/bin/env python3
# -*- coding:UTF-8 -*-

import sys

# sys.stdin为读取数据,遍历读入数据的每一行
for line in sys.stdin:     
    #删除开头和结尾的空格
    line = line.strip()   
    # 以默认空格分隔行单词到words列表
    words = line.split('\t')  
    if len(words) == 6 and words[3]:
        print('%s\t%s' %(words[3],1))
	

代码接收文件的每一行数据,使用制表符进行分割,过滤掉数据不全和没有形状为空的垃圾数据(有时候垃圾数据也需要使用,看具体情况)

2.2、Reduce

#!/usr/bin/env python3
# -*- coding:UTF-8 -*-

#from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# line = "a	1"
for line in sys.stdin:
	line = line.strip()

	word,count = line.split('\t', 1)
	# print(count)
	try:
	    count = int(count)
	except ValueError:
		# print(1)
	    continue

	if current_word == word:
	    current_count += count
	else:
	    if current_word:
	        print ('%s\t%s' % (current_word, current_count))
	    current_count = count
	    current_word = word
	        
if current_word == word:
    print ('%s\t%s' % (current_word, current_count))

3、提交任务

Mapper和Reduce代码开发好了之后,就可以调用Hadoop命令提交任务了

hadoop jar /usr/local/hadoop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar -mapper "python3 /Users/local/code/python/mapper.py" -reducer "python3 /Users/local/code/python/reducer.py" -input /user/root/ufo.tsv -output /output_dir -file /Users/local/code/python/mapper.py -file /Users/local/code/python/reducer.py

hadoop-streaming-3.3.6.jar:这个是使用流的方式提交任务的jar包
-mapper:指向的是刚才编写的Mapper文件
-reducer:指向的是刚才编写的Reduce文件
-input:指向的是待分析的UFO数据文件(放到hdfs内)
-file:两个file分别指向刚才写的Mapper和Reduce文件
-output:输出文件夹(hdfs内的文件夹)

4、简单统计

在开发过程中,为了测试代码或数据规模比较小的情况下,也可以使用如下命令进行统计测试。

cat ufo.tsv | mapper.py | sort | reduce.py