时间:2021-05-22
错误思想
举个列子,当我们想要比较 一个 类型为 RDD[(Long, (String, Int))] 的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable 转换为 list,然后sortby,但是这样却有一个致命的缺点,就是Iterable 在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable 含有元素过多,那么极易引起OOM
val cidAndSidCountGrouped: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey() // 4. 排序, 取top10 val result: RDD[(Long, List[(String, Int)])] = cidAndSidCountGrouped.map { case (cid, sidCountIt) => // sidCountIt 排序, 取前10 // Iterable转成容器式集合的时候, 如果数据量过大, 极有可能导致oom (cid, sidCountIt.toList.sortBy(-_._2).take(5)) }首先,我们要知道,RDD 的排序需要 shuffle, 是采用了内存+磁盘来完成的排序.这样能有效避免OOM的风险,但是RDD是全部排序,所以需要针对性的过滤Key值来进行排序
方法一 利用RDD排序特点
//把long(即key值)提取出来 val cids: List[Long] = categoryCountList.map(_.cid.toLong) val buffer: ListBuffer[(Long, List[(String, Int)])] = ListBuffer[(Long, List[(String, Int)])]() //根据每个key来过滤RDD for (cid <- cids) { val arr: Array[(String, Int)] = cidAndSidCount.filter(cid == _._1) .sortBy(-_._2._2) .take(5) .map(_._2) buffer += ((cid, arr.toList)) } buffer.foreach(println)这样做也有缺点:即有多少个key,就有多少个Job,占用资源
方法二 利用TreeSet自动排序特性
def statCategoryTop10Session_3(sc: SparkContext, categoryCountList: List[CategroyCount], userVisitActionRDD: RDD[UserVisitAction]) = { // 1. 过滤出来 top10品类的所有点击记录 // 1.1 先map出来top10的品类id val cids = categoryCountList.map(_.cid.toLong) val topCategoryActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id)) // 2. 计算每个品类 下的每个session 的点击量 rdd ((cid, sid) ,1) val cidAndSidCount: RDD[(Long, (String, Int))] = topCategoryActionRDD .map(action => ((action.click_category_id, action.session_id), 1)) // 使用自定义分区器 重点理解分区器的原理 .reduceByKey(new CategoryPartitioner(cids), _ + _) .map { case ((cid, sid), count) => (cid, (sid, count)) } // 3. 排序取top10//因为已经按key分好了区,所以用Mappartitions ,在每个分区中新建一个TreeSet即可 val result: RDD[(Long, List[SessionInfo])] = cidAndSidCount.mapPartitions((it: Iterator[(Long, (String, Int))]) => {//new 一个TreeSet,并同时指定排序规则 var treeSet: mutable.TreeSet[CategorySession] = new mutable.TreeSet[CategorySession]()(new Ordering[CategorySession] { override def compare(x: CategorySession, y: CategorySession): Int = { if (x.clickCount >= y.clickCount) -1 else 1 } }) var id = 0l iter.foreach({ case (l, session) => { id = l treeSet.add(session) if (treeSet.size > 10) treeSet = treeSet.take(10) } }) Iterator(id, treeSet) }) result.collect.foreach(println) Thread.sleep(1000000) }}class CategoryPartitioner(cids: List[Long]) extends Partitioner { // 用cid索引, 作为将来他的分区索引. private val cidWithIndex: Map[Long, Int] = cids.zipWithIndex.toMap // 返回集合的长度 override def numPartitions: Int = cids.length // 根据key返回分区的索引 override def getPartition(key: Any): Int = { key match { // 根据品类id返回分区的索引! 0-9 case (cid: Long, _) => cidWithIndex(cid) } }}以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
Spark介绍按照官方的定义,Spark是一个通用,快速,适用于大规模数据的处理引擎。通用性:我们可以使用SparkSQL来执行常规分析,SparkStream
如果用户查询时,使用OrderBY排序语句指定按员工编号来排序,那么排序后产生的所有记录就是临时数据。对于这些临时数据,Oracle数据库是如何处理的呢? 通
dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能
orderby排序之null值处理方法在对业务数据排序时候,发现有些字段的记录是null值,这时排序便出现了有违我们使用习惯的数据大小顺序问题。在Oracle中
本文演示以Spark作为分析引擎,Cassandra作为数据存储,而使用SpringBoot来开发驱动程序的示例。1.前置条件安装Spark(本文使用Spark