spark——spark中常说RDD,事实RDD是什么?
今天是spark专题第二篇文章,我们来看spark非常主要的一个观点——RDD。
在上一讲当中我们在内陆安装好了spark,虽然我们只有local一个集群,然则仍然不故障我们举行实验。spark最大的特点就是无论集群的资源若何,举行盘算的代码都是一样的,spark会自动为我们做分布式调剂事情。
RDD观点
先容spark离不开RDD,RDD是其中很主要的一个部门。然则许多初学者往往都不清晰RDD事实是什么,我自己也是一样,我在系统学习spark之前代码写了一堆,然则对于RDD等观点仍然云里雾里。
RDD的英文全名是Resilient Distributed Dataset,我把英文写出来就清晰了许多。纵然第一个单词不认识,至少也可以知道它是一个分布式的数据集。第一个单词是弹性的意思,以是直译就是弹性分布式数据集。虽然我们照样不够清晰,然则已经比只知道RDD这个观点清晰多了,
RDD是一个不可变的分布式工具聚集,每个RDD都被分为多个分区,这些分区运行在集群的差别节点上。
许多资料里只有这么一句粗浅的注释,看起来说了许多,然则我们都get不到。细想有许多疑问,最后我在大神的博客里找到了详细的注释,这位大神翻了spark的源码,找到了其中RDD的界说,一个RDD当中包罗以下内容:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
我们一条一条来看:
它是一组分区,分区是spark中数据集的最小单元。也就是说spark当中数据是以分区为单元存储的,差别的分区被存储在差别的节点上。这也是分布式盘算的基础。
一个应用在各个分区上的盘算义务。在spark当中数据和执行的操作是离开的,而且spark基于懒盘算的机制,也就是在真正触发盘算的行动操作泛起之前,spark会存储起来对哪些数据执行哪些盘算。数据和盘算之间的映射关系就存储在RDD中。
RDD之间的依赖关系,RDD之间存在转化关系,一个RDD可以通过转化操作转化成其他RDD,这些转化操作都会被纪录下来。当部门数据丢失的时刻,spark可以通过纪录的依赖关系重新盘算丢失部门的数据,而不是重新盘算所有数据。
一个分区的方式,也就是盘算分区的函数。spark当中支持基于hash的hash分区方式和基于局限的range分区方式。
,
,APPle developer enterprise account for rent
providing apple enterprise developer accounts for rent, rent your own enterprise account for app sIGning. with high quality, stable performance and affordable price.
一个列表,存储的是存储每个分区的优先存储的位置。
通过以上五点,我们可以看出spark一个主要的理念。即移动数据不如移动盘算,也就是说在spark运行调剂的时刻,会倾向于将盘算分发到节点,而不是将节点的数据搜集起来盘算。RDD正是基于这一理念而生的,它做的也正是这样的事情。
建立RDD
spark中提供了两种方式来建立RDD,一种是读取外部的数据集,另一种是将一个已经存储在内存当中的聚集举行并行化。
我们一个一个来看,最简朴的方式当然是并行化,由于这不需要外部的数据集,可以很轻易地做到。
在此之前,我们先来看一下SparkContext的观点,SparkContext是整个spark的入口,相当于程序的main函数。在我们启动spark的时刻,spark已经为我们建立好了一个SparkContext的实例,命名为sc,我们可以直接访问到。
我们要建立RDD也需要基于sc举行,好比下面我要建立一个有字符串组成的RDD:
texts = sc.parallelize(['now test', 'spark rdd'])
返回的texts就是一个RDD:
除了parallelize之外呢,我们还可以从外部数据天生RDD,好比我想从一个文件读入,可以使用sc当中的textFile方式获取:
text = sc.textFile('/path/path/data.txt')
一般来说,除了内陆调试我们很少会用parallelize举行建立RDD,由于这需要我们先把数据读取在内存。由于内存的限制,使得我们很难将spark的能力发挥出来。
转化操作和行动操作
适才我们在先容RDD的时刻实在提到过,RDD支持两种操作,一种叫做转化操作(transformation)一种叫做行动操作(action)。
顾名思义,执行转化操作的时刻,spark会将一个RDD转化成另一个RDD。RDD中会将我们这次转化的内容纪录下来,然则不会举行运算。以是我们获得的仍然是一个RDD而不是执行的效果。
好比我们建立了texts的RDD之后,我们想要对其中的内容举行过滤,只保留长度跨越8的,我们可以用filter举行转化:
textAfterFilter = texts.filter(lambda x: len(x) > 8)
我们挪用之后获得的也是一个RDD,就像我们适才说的一样,由于filter是一个转化操作,以是spark只会纪录下它的内容,并不会真正执行。
转化操作可以操作随便数目的RDD,好比若是我执行如下操作,会一共获得4个RDD:
inputRDD = sc.textFile('path/path/log.txt')
lengthRDD = inputRDD.filter(lambda x: len(x) > 10)
errorRDD = inputRDD.filter(lambda x: 'error' in x)
unionRDD = errorRDD.union(lengthRDD)
最后的union会将两个RDD的效果组合在一起,若是我们执行完上述代码之后,spark会纪录下这些RDD的依赖信息,我们把这个依赖信息画出来,就成了一张依赖图:
无论我们执行多少次转化操作,spark都不会真正执行其中的操作,只有当我们执行行动操作时,纪录下来的转化操作才会真正投入运算。像是first(),take(),count()等都是行动操作,这时刻spark就会给我们返回盘算效果了。
其中first的用处是返回第一个效果,take需要传入一个参数,指定返回的效果条数,count则是盘算效果的数目。和我们预期的一样,当我们执行了这些操作之后,spark为我们返回了效果。
本文着重讲的是RDD的观点,我们下篇文章还会着重对转化操作和行动操作举行深入解读。感兴趣的同砚不妨期待一下吧~
0
珍藏