时间:2021-05-19
详解HDFS多文件Join操作的实例
最近在做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,
下面是个简单的例子;采用两个表来做left join其中数据结构如下:
A 文件:
a|1b|2|c
B文件:
a|b|1|2|c
即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键
代码如下:
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.util.ReflectionUtils;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import cn.eshore.traffic.hadoop.util.CommUtil;import cn.eshore.traffic.hadoop.util.StringUtil;/** * @ClassName: DataJoin * @Description: HDFS JOIN操作 * @author hadoop * @date 2012-12-18 下午5:51:32 */public class InstallJoin extends Configured implements Tool {private String static enSplitCode = "\\|";private String static splitCode = "|";// 自定义Reducerpublic static class ReduceClass extends DataJoinReducerBase {@Overrideprotected TaggedMapOutput combine(Object[] tags, Object[] values) {String joinedStr = "";//该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】//去掉则为All Joinif (tags.length == 1 && tags[0].toString().contains("install")) {return null;}Map<String, String> map = new HashMap<String, String>();for (int i = 0; i < values.length; i++) {TaggedWritable tw = (TaggedWritable) values[i];String line = ((Text) tw.getData()).toString();String[] tokens = line.split(enSplitCode, 8);String groupValue = tokens[6];String type = tokens[7];map.put(type, groupValue);}joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30"));TaggedWritable retv = new TaggedWritable(new Text(joinedStr));retv.setTag((Text) tags[0]);return retv;}}// 自定义Mapperpublic static class MapClass extends DataJoinMapperBase {//自定义Key【类似数据库中的主键/外键】@Overrideprotected Text generateGroupKey(TaggedMapOutput aRecord) {String line = ((Text) aRecord.getData()).toString();String[] tokens = line.split(CommUtil.enSplitCode);String key = "";String type = tokens[7];//由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据if ("7".equals(type)) {key = tokens[0]+"|"+tokens[1];}else if ("30".equals(type)) {key = tokens[0]+"|"+tokens[2];}return new Text(key);}@Overrideprotected Text generateInputTag(String inputFile) {return new Text(inputFile);}@Overrideprotected TaggedMapOutput generateTaggedMapOutput(Object value) {TaggedWritable retv = new TaggedWritable((Text) value);retv.setTag(this.inputTag);return retv;}}public static class TaggedWritable extends TaggedMapOutput {private Writable data;// 自定义public TaggedWritable() {this.tag = new Text("");}public TaggedWritable(Writable data) {this.tag = new Text("");this.data = data;}@Overridepublic Writable getData() {return data;}@Overridepublic void write(DataOutput out) throws IOException {this.tag.write(out);out.writeUTF(this.data.getClass().getName());this.data.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {this.tag.readFields(in);String dataClz = in.readUTF();if (this.data == null|| !this.data.getClass().getName().equals(dataClz)) {try {this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);} catch (ClassNotFoundException e) {e.printStackTrace();}}this.data.readFields(in);}}/*** job运行*/@Overridepublic int run(String[] paths) throws Exception {int no = 0;try {Configuration conf = getConf();JobConf job = new JobConf(conf, InstallJoin.class);FileInputFormat.setInputPaths(job, new Path(paths[0]));FileOutputFormat.setOutputPath(job, new Path(paths[1]));job.setJobName("join_data_test");job.setMapperClass(MapClass.class);job.setReducerClass(ReduceClass.class);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(TaggedWritable.class);job.set("mapred.textoutputformat.separator", CommUtil.splitCode);JobClient.runJob(job);no = 1;} catch (Exception e) {throw new Exception();}return no;}//测试public static void main(String[] args) {String[] paths = {"hdfs://master...:9000/home/hadoop/traffic/join/newtype","hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }int res = 0;try {res = ToolRunner.run(new Configuration(), new InstallJoin(), paths);} catch (Exception e) {e.printStackTrace();}System.exit(res);}}如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
hdfs文件操作操作示例,包括上传文件到HDFS上、从HDFS上下载文件和删除HDFS上的文件,大家参考使用吧复制代码代码如下:importorg.apache
Java执行hadoop的基本操作实例代码向HDFS上传本地文件publicstaticvoiduploadInputFile(StringlocalFile)
hdfs工作原理如下: 1、客户端通过调用FileSystem对象的open括号来读取希望打开的文件。对于HDFS来说,这个对象是分布式文件系统的一个实例。
hdfs中block默认保存3份。 HDFS被设计成支持大文件,适用HDFS的是那些需要处理大规模的数据集的应用。 Hadoop分布式文件系统(HDFS)被
对HDFS上的文件进行上传和下载是对集群的基本操作,在《HADOOP权威指南》一书中,对文件的上传和下载都有代码的实例,但是对如何配置HADOOP客户端却是没有