针对数据收集过滤大型Spark数据集的四种方法

针对数据收集过滤大型Spark数据集的四种方法

时间:2020-11-15 作者:gykj

让我们假设有一个具有以下架构的大型数据集“ A”:

爪哇

 

1个
2
|   empId整数
3
|   sal整数
4
|   名称字符串
5
|   地址字符串
6
|   部门整数

数据集“ A”需要针对一组员工ID(empIds)“ B”(可以广播给执行者)进行过滤,以获得过滤后的数据集“ A”。福州小程序开发公司 过滤器操作可以表示为:

爪哇

 


1个
A`  = 过滤器EMPID 包含  'B' 

为了实现这种最常见的过滤方案,您可以在Spark中使用四种类型的转换,每种类型都有其优缺点。这是对使用这四个转换来执行此特定筛选方案的说明,以及有关每个转换的可靠性和效率方面的详细说明。

过滤器: 可以按以下方式使用数据集上的过滤器转换(对布尔条件表达式或布尔返回过滤器函数过滤数据集记录):

爪哇

 


1个
1. 数据集< Ť >  A`  = 过滤器 条件
2
2. 数据集< Ť >  A`  = 过滤器FilterFunction < T >  func
3
3. 数据集< Ť >  A`  = 过滤器String  conditionExpr

对于过滤方案,如前所述,可以对“ A”使用“ Filter”转换,将“ FilterFunction”作为输入。对包含在相应数据集分区中的每个记录调用“ FilterFunction”,并返回“ true”或“ false”。在我们的过滤方案中,将对数据集“ A”的每个记录调用FilterFunction,并检查记录的“ empId”是否存在于广播的empId中,“ B”(“ B”由a支持)对应的HashTable)。

不管数据集“ A”的大小如何,如上所述的过滤器转换的使用都非常简单,可靠和有效。这是因为,转换是逐条记录地调用的。此外,由于广播的empIds集由执行程序上的哈希表支持,因此对每个记录的过滤器功能中的过滤查找仍然有效。

映射: 映射转换(在数据集的每个记录上应用一个函数,以返回空,相同或不同的记录类型)在数据集上的使用方式如下

爪哇

 


1个
数据集< Ù >  A`  = mapMapFunction < TU >  func编码器< U > 编码器

对于过滤方案,如前所述,可以对“ A”使用“ Map”转换,将“ MapFunction”作为输入。在我们的筛选方案中,将在数据集“ A”的每个记录上调用“ MapFunction”,并检查记录的“ empId”是否存在于广播的empIds“ B”集中(由相应的HashTable支持) )。如果记录存在,则将从MapFunction返回相同的记录。如果该记录不存在,则将返回NULL 。同样,MapFunction的编码器输入将与数据集“ A”的编码器输入相同。

尽管“ MapFunction”的语义与“ FilterFunction”相似,但如上所述的“ Map”转换在过滤场景中的使用与直接的“ Filter”转换方法相比并不那么简单和优美。必须在转换中明确提供其他编码器输入。同样,在调用“ Map”转换之后,需要对输出进行NULL值过滤,因此,“ Map”方法的效率要低于“ Filter”方法。但是,该方法的可靠性类似于“过滤器”方法,因为不管“ A”的大小如何,该方法都不会出现问题。这是因为’Map’转换也逐条记录地被调用。

MapPartitions:Mappartitions转换对每个数据集返回一个空值或迭代器的分区(在我最近出版的书“ Spark Partitioning指南:Spark Partitioning深入介绍”中了解有关Spark分区的更多信息)数据集上相同或不同记录类型的集合)的使用方式如下

爪哇

 


1个
数据集< Ù >  A`  = 地图MapPartitionsFunction < TU >  func编码器< U > 编码器

对于过滤方案,如前所述,还可以对“ A”使用“ MapPartitions”转换,将“ MapPartitionsFunction”作为输入。在我们的过滤方案中,将在数据集“ A”的每个分区上调用“ MapPartitionsFunction”,在该分区的所有记录上进行迭代,并检查记录中的每个“ empId”是否存在于记录中。广播的empIds集“ B”(由相应的HashTable支持)。如果记录存在,则将其添加到在“ MapPartitionsFunction”中初始化的可返回集合中。最后,从“ MapPartitionsFunction”返回可返回集合的迭代器。

与“ Map”和“ Filter”方法相比,“ MapPartitions”方法通常更有效,因为它是分区明智的,而不是记录明智的。但是,类似于“地图”,必须在转换中显式设置编码器输入。同样,如果数据集“ A”的某些分区的大小超过为执行每个分区计算任务而设置的内存,则“ MapPartitions”方法可能变得非常不可靠。这是因为以下事实:更大的分区会导致潜在的更大的可返回集合,从而导致内存溢出。

内部联接:内部联接转换通过以下方式应用于两个输入数据集A和B :

爪哇

 


1个
数据集<>  A`  = join数据集<?>  B joinExprs

对于过滤方案,如前所述,还可以对“ A”使用“内部联接”转换,该转换在联接条件(A.empId等于B.empId)上联接“ B”的数据集表示形式,并且仅选择每个联接记录中的“ A”。

“内部联接”方法返回通用“行”对象的数据集,因此需要使用编码器将其转换回A记录类型的数据集,以匹配确切的过滤器语义。但是,类似于“过滤器”方法,“内部联接”方法是高效且可靠的。效率来自以下事实:由于“ B”是可广播的,因此Spark将选择最有效的“ Boradcast Hash Join”方法来执行Join。同样,可靠性来自于这样一个事实,即“内部联接”方法将适用于“ A”的大型数据集,就像“过滤器”方法一样。

考虑到所有方法,从可靠性和效率的角度来看,我会选择“筛选”方法作为最安全的选择。另外,请注意,“过滤器”方法还允许我以“内部联接”不允许的效率和鲁棒性执行反搜索。

版权所有:https://www.eraycloud.com 转载请注明出处