让我们假设有一个具有以下架构的大型数据集“ A”:
根:
| — empId:整数
| — sal:整数
| — 名称:字符串
| — 地址:字符串
| — 部门:整数
数据集“ A”需要针对一组员工ID(empIds)“ B”(可以广播给执行者)进行过滤,以获得过滤后的数据集“ A”。福州小程序开发公司 过滤器操作可以表示为:
A` = 甲。过滤器(甲。EMPID 包含 在 'B' )
为了实现这种最常见的过滤方案,您可以在Spark中使用四种类型的转换,每种类型都有其优缺点。这是对使用这四个转换来执行此特定筛选方案的说明,以及有关每个转换的可靠性和效率方面的详细说明。
过滤器: 可以按以下方式使用数据集上的过滤器转换(对布尔条件表达式或布尔返回过滤器函数过滤数据集记录):
1. 数据集< Ť > A` = 甲。过滤器(列 条件)
2. 数据集< Ť > A` = 甲。过滤器(FilterFunction < T > func)
3. 数据集< Ť > A` = 甲。过滤器(String conditionExpr)
对于过滤方案,如前所述,可以对“ A”使用“ Filter”转换,将“ FilterFunction”作为输入。对包含在相应数据集分区中的每个记录调用“ FilterFunction”,并返回“ true”或“ false”。在我们的过滤方案中,将对数据集“ A”的每个记录调用FilterFunction,并检查记录的“ empId”是否存在于广播的empId中,“ B”(“ B”由a支持)对应的HashTable)。
不管数据集“ A”的大小如何,如上所述的过滤器转换的使用都非常简单,可靠和有效。这是因为,转换是逐条记录地调用的。此外,由于广播的empIds集由执行程序上的哈希表支持,因此对每个记录的过滤器功能中的过滤查找仍然有效。
映射: 映射转换(在数据集的每个记录上应用一个函数,以返回空,相同或不同的记录类型)在数据集上的使用方式如下:
数据集< Ù > A` = 甲。map(MapFunction < T,U > func,编码器< U > 编码器)
对于过滤方案,如前所述,可以对“ A”使用“ Map”转换,将“ MapFunction”作为输入。在我们的筛选方案中,将在数据集“ A”的每个记录上调用“ MapFunction”,并检查记录的“ empId”是否存在于广播的empIds“ B”集中(由相应的HashTable支持) )。如果记录存在,则将从MapFunction返回相同的记录。如果该记录不存在,则将返回NULL 。同样,MapFunction的编码器输入将与数据集“ A”的编码器输入相同。
尽管“ MapFunction”的语义与“ FilterFunction”相似,但如上所述的“ Map”转换在过滤场景中的使用与直接的“ Filter”转换方法相比并不那么简单和优美。必须在转换中明确提供其他编码器输入。同样,在调用“ Map”转换之后,需要对输出进行NULL值过滤,因此,“ Map”方法的效率要低于“ Filter”方法。但是,该方法的可靠性类似于“过滤器”方法,因为不管“ A”的大小如何,该方法都不会出现问题。这是因为’Map’转换也逐条记录地被调用。
MapPartitions:Mappartitions转换对每个数据集返回一个空值或迭代器的分区(在我最近出版的书“ Spark Partitioning指南:Spark Partitioning深入介绍”中了解有关Spark分区的更多信息)数据集上相同或不同记录类型的集合)的使用方式如下:
数据集< Ù > A` = 甲。地图(MapPartitionsFunction < T,U > func,编码器< U > 编码器)
对于过滤方案,如前所述,还可以对“ A”使用“ MapPartitions”转换,将“ MapPartitionsFunction”作为输入。在我们的过滤方案中,将在数据集“ A”的每个分区上调用“ MapPartitionsFunction”,在该分区的所有记录上进行迭代,并检查记录中的每个“ empId”是否存在于记录中。广播的empIds集“ B”(由相应的HashTable支持)。如果记录存在,则将其添加到在“ MapPartitionsFunction”中初始化的可返回集合中。最后,从“ MapPartitionsFunction”返回可返回集合的迭代器。
与“ Map”和“ Filter”方法相比,“ MapPartitions”方法通常更有效,因为它是分区明智的,而不是记录明智的。但是,类似于“地图”,必须在转换中显式设置编码器输入。同样,如果数据集“ A”的某些分区的大小超过为执行每个分区计算任务而设置的内存,则“ MapPartitions”方法可能变得非常不可靠。这是因为以下事实:更大的分区会导致潜在的更大的可返回集合,从而导致内存溢出。
内部联接:内部联接转换通过以下方式应用于两个输入数据集A和B :
数据集<行> A` = 甲。join(数据集<?> B,列 joinExprs)
对于过滤方案,如前所述,还可以对“ A”使用“内部联接”转换,该转换在联接条件(A.empId等于B.empId)上联接“ B”的数据集表示形式,并且仅选择每个联接记录中的“ A”。
“内部联接”方法返回通用“行”对象的数据集,因此需要使用编码器将其转换回A记录类型的数据集,以匹配确切的过滤器语义。但是,类似于“过滤器”方法,“内部联接”方法是高效且可靠的。效率来自以下事实:由于“ B”是可广播的,因此Spark将选择最有效的“ Boradcast Hash Join”方法来执行Join。同样,可靠性来自于这样一个事实,即“内部联接”方法将适用于“ A”的大型数据集,就像“过滤器”方法一样。
考虑到所有方法,从可靠性和效率的角度来看,我会选择“筛选”方法作为最安全的选择。另外,请注意,“过滤器”方法还允许我以“内部联接”不允许的效率和鲁棒性执行反搜索。