时间:2021-05-19
一、背景
Hadoop中实现了用于全局排序的InputSampler类和TotalOrderPartitioner类,调用示例是org.apache.hadoop.examples.Sort。
但是当我们以Text文件作为输入时,结果并非按Text中的string列排序,而且输出结果是SequenceFile。
原因:
1) hadoop在处理Text文件时,key是行号LongWritable类型,InputSampler抽样的是key,TotalOrderPartitioner也是用key去查找分区。这样,抽样得到的partition文件是对行号的抽样,结果自然是根据行号来排序。
2)大数据量时,InputSampler抽样速度会非常慢。比如,RandomSampler需要遍历所有数据,IntervalSampler需要遍历文件数与splits数一样。SplitSampler效率比较高,但它只抽取每个文件前面的记录,不适合应用于文件内有序的情况。
二、功能
1. 实现了一种局部抽样方法PartialSampler,适用于输入数据各文件是独立同分布的情况
2. 使RandomSampler、IntervalSampler、SplitSampler支持对文本的抽样
3. 实现了针对Text文件string列的TotalOrderPartitioner
三、实现
1. PartialSampler
PartialSampler从第一份输入数据中随机抽取第一列文本数据。PartialSampler有两个属性:freq(采样频率),numSamples(采样总数)。
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); ArrayList<K> samples = new ArrayList<K>(numSamples); Random r = new Random(); long seed = r.nextLong(); r.setSeed(seed); LOG.debug("seed: " + seed); // 对splits【0】抽样 for (int i = 0; i < 1; i++) { System.out.println("PartialSampler will getSample splits["+i+"]"); RecordReader<K,V> reader = inf.getRecordReader(splits[i], job, Reporter.NULL); K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { if (r.nextDouble() <= freq) { if (samples.size() < numSamples) { // 选择value中的第一列抽样 Text value0 = new Text(value.toString().split("\t")[0]); samples.add((K) value0); } else { // When exceeding the maximum number of samples, replace a // random element with this one, then adjust the frequency // to reflect the possibility of existing elements being // pushed out int ind = r.nextInt(numSamples); if (ind != numSamples) { Text value0 = new Text(value.toString().split("\t")[0]); samples.set(ind, (K) value0); } freq *= (numSamples - 1) / (double) numSamples; } key = reader.createKey(); } } reader.close(); } return (K[])samples.toArray(); }首先通过InputFormat的getSplits方法得到所有的输入分区;
然后扫描第一个分区中的记录进行采样。
记录采样的具体过程如下:
从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq
如果大于则放弃这条记录;
如果小于,则判断当前的采样数是否小于最大采样数,
如果小于则这条记录被选中,被放进采样集合中;
否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。
然后依次遍历分区中的其它记录。
note:
1)PartialSampler只适用于输入数据各文件是独立同分布的情况。
2)自带的三种Sampler通过修改samples.add(key)为samples.add((K) value0); 也可以实现对第一列的抽样。
2. TotalOrderPartitioner
TotalOrderPartitioner主要改进了两点:
1)读partition时指定keyClass为Text.class
因为partition文件中的key类型为Text
在configure函数中,修改:
2)查找分区时,改用value查
public int getPartition(K key, V value, int numPartitions) { Text value0 = new Text(value.toString().split("\t")[0]); return partitions.findPartition((K) value0); }3. Sort
1)设置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass
2)初始化InputSampler对象,抽样
3)partitionFile通过CacheFile传给TotalOrderPartitioner,执行MapReduce任务
Class<? extends InputFormat> inputFormatClass = TextInputFormat.class; Class<? extends OutputFormat> outputFormatClass = TextOutputFormat.class; Class<? extends WritableComparable> outputKeyClass = Text.class; Class<? extends Writable> outputValueClass = Text.class; jobConf.setMapOutputKeyClass(LongWritable.class); // Set user-supplied (possibly default) job configs jobConf.setNumReduceTasks(num_reduces); jobConf.setInputFormat(inputFormatClass); jobConf.setOutputFormat(outputFormatClass); jobConf.setOutputKeyClass(outputKeyClass); jobConf.setOutputValueClass(outputValueClass); if (sampler != null) { System.out.println("Sampling input to effect total-order sort..."); jobConf.setPartitionerClass(TotalOrderPartitioner.class); Path inputDir = FileInputFormat.getInputPaths(jobConf)[0]; inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf)); //Path partitionFile = new Path(inputDir, "_sortPartitioning"); TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); InputSampler.<K,V>writePartitionFile(jobConf, sampler); URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning"); DistributedCache.addCacheFile(partitionUri, jobConf); DistributedCache.createSymlink(jobConf); } FileSystem hdfs = FileSystem.get(jobConf); hdfs.delete(outputpath); hdfs.close(); System.out.println("Running on " + cluster.getTaskTrackers() + " nodes to sort from " + FileInputFormat.getInputPaths(jobConf)[0] + " into " + FileOutputFormat.getOutputPath(jobConf) + " with " + num_reduces + " reduces."); Date startTime = new Date(); System.out.println("Job started: " + startTime); jobResult = JobClient.runJob(jobConf);四、执行
usage:
hadoop jar yitengfei.jar com.yitengfei.Sort [-m <maps>] [-r <reduces>]
[-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)
-splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)
-splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)
-splitPartial <double pcnt> <numSamples> <maxsplits> | // Sample from partial splits at random (general) ]
<input> <output> <partitionfile>
Example:
hadoop jar yitengfei.jar com.yitengfei.Sort -r 10 -splitPartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition
五、性能
200G输入数据,15亿条url,1000个分区,排序时间只用了6分钟
总结
以上就是本文关于Hadoop对文本文件的快速全局排序实现方法及分析的全部内容,希望对大家有所帮助 ,感兴趣的朋友可以继续参阅本站:hadoop重新格式化HDFS步骤解析、浅谈七种常见的Hadoop和Spark项目案例。如有不足之处,欢迎留言指出,感谢朋友们对本站的支持!
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
使用sort命令可以生成文本文件,sort可针对文本文件的内容,以行为单位来排序。sort命令既可以从特定的文件,也可以从stdin中获取输入。 文本文件是一
sort命令是帮我们依据不同的数据类型进行排序,其语法及常用参数格式:sort[-bcfMnrtk][源文件][-o输出文件]补充说明:sort可针对文本文件的
本文实例讲述了Python基于jieba库进行简单分词及词云功能实现方法。分享给大家供大家参考,具体如下:目标:1.导入一个文本文件2.使用jieba对文本进行
本文实例讲述了Java文本文件操作方法。分享给大家供大家参考。具体分析如下:最初Java是不支持对文本文件的处理的,为了弥补这个缺憾而引入了Reader和Wri
sort对文本文件内容进行排序用法:sort+选项+文件名(可跟多个文件)示例1:cat1.txtsort1.txt#文字,默认按字母a-z排序实示例2:cat