时间:2021-05-19
Hadoop streaming
Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口。所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写。
streamming是天然适用于文字处理的(text processing),当然,也仅适用纯文本的处理,对于需要对象和序列化的场景,hadoop streaming无能为力。它力图使我们能够快捷的通过各种脚本语言,快速的处理大量的文本文件。以下是steaming的一些特点:
常用的Streaming编程语言:
Ruby
下面是一个Ruby编写的MapReduce程序的示例:
map
max_temperature_map.rb:
reduce
max_temperature_reduce.rb:
运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \-input input/ncdc/sample.txt \-output output \-mapper ch02/src/main/ruby/max_temperature_map.rb \-reducer ch02/src/main/ruby/max_temperature_reduce.rbPython
Map
#!/usr/bin/env pythonimport reimport sysfor line in sys.stdin:val = line.strip()(year, temp, q) = (val[15:19], val[87:92], val[92:93])if (temp != "+9999" and re.match("[01459]", q)):print "%s\t%s" % (year, temp)Reduce
#!/usr/bin/env pythonimport sys(last_key, max_val) = (None, -sys.maxint)for line in sys.stdin:(key, val) = line.strip().split("\t")if last_key and last_key != key:print "%s\t%s" % (last_key, max_val)(last_key, max_val) = (key, int(val))else:(last_key, max_val) = (key, max(max_val, int(val)))if last_key:print "%s\t%s" % (last_key, max_val)运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \-input input/ncdc/sample.txt \-output output \-mapper ch02/src/main/ruby/max_temperature_map.py\-reducer ch02/src/main/ruby/max_temperature_reduce.pyBash shell
Map
#!/usr/bin/env bash# NLineInputFormat gives a single line: key is offset, value is S3 URIread offset s3file# Retrieve file from S3 to local diskecho "reporter:status:Retrieving $s3file" >&2$HADOOP_INSTALL/bin/hadoop fs -get $s3file .# Un-bzip and un-tar the local filetarget=`basename $s3file .tar.bz2`mkdir -p $targetecho "reporter:status:Un-tarring $s3file to $target" >&2tar jxf `basename $s3file` -C $target# Un-gzip each station file and concat into one fileecho "reporter:status:Un-gzipping $target" >&2for file in $target/*/*dogunzip -c $file >> $target.allecho "reporter:status:Processed $file" >&2done# Put gzipped version into HDFSecho "reporter:status:Gzipping $target and putting in HDFS" >&2gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \-D mapred.reduce.tasks=0 \-D mapred.map.tasks.speculative.execution=false \-D mapred.task.timeout=12000000 \-input ncdc_files.txt \-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \-output output \-mapper load_ncdc_map.sh \-file load_ncdc_map.shCombiner
在streaming模式下,仍然可以运行Combiner,两种方法:
这里具体解释第二种方法:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \-input input/ncdc/all \-output output \-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |ch02/src/main/ruby/max_temperature_reduce.rb" \-reducer ch02/src/main/ruby/max_temperature_reduce.rb \-file ch02/src/main/ruby/max_temperature_map.rb \-file ch02/src/main/ruby/max_temperature_reduce.rb注意看-mapper这一行,通关管道的方式,把mapper的临时输出文件(intermediate file,Map完成后的临时文件)作为输入,送到sort进行排序,然后送到reduce脚本,来完成类似于combiner的工作。这时候的输出才真正的作为shuffle的输入,被分组并在网络上发送到Reduce
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
1.HADOOP背景介绍1.1什么是HADOOP1.HADOOP是apache旗下的一套开源软件平台2.HADOOP提供的功能:利用服务器集群,根据用户的自定义
环境:CentOs6.4Hadoop版本:hadoop-0.20.2命令介绍:复制代码代码如下:./hadoopfs-putxxxxhdfs://cui:900
假设Hadoop的安装目录HADOOP_HOME为/home/admin/hadoop。启动与关闭启动Hadoop1.进入HADOOP_HOME目录。2.执行s
问题导读1.Hadoop3.x通过什么方式来容错?2.Hadoop3.x存储开销减少了多少?3.Hadoop3.xMRAPI是否兼容hadoop1.x?一、目的
Linux环境:CentOs6.4Hadoop版本:hadoop-0.20.2验证Hadoop是否安装成功主要通过以下两个网址。http://localhost