今天是spark专题第二篇文章,我们来看spark非常主要的一个观点——RDD。

在上一讲当中我们在内陆安装好了spark,虽然我们只有local一个集群,然则仍然不故障我们举行实验。spark更大的特点就是无论集群的资源若何,举行盘算的代码都是一样的,spark会自动为我们做分布式调剂事情


RDD观点


先容spark离不开RDD,RDD是其中很主要的一个部门。然则许多初学者往往都不清晰RDD事实是什么,我自己也是一样,我在系统学习spark之前代码写了一堆,然则对于RDD等观点仍然云里雾里。

RDD的英文全名是Resilient Distributed Dataset,我把英文写出来就清晰了许多。纵然第一个单词不认识,至少也可以知道它是一个分布式的数据集。第一个单词是弹性的意思,以是直译就是弹性分布式数据集。虽然我们照样不够清晰,然则已经比只知道RDD这个观点清晰多了,

RDD是一个不可变的分布式工具聚集,每个RDD都被分为多个分区,这些分区运行在集群的差别节点上。

许多资料里只有这么一句粗浅的注释,看起来说了许多,然则我们都get不到。细想有许多疑问,最后我在大神的博客里找到了详细的注释,这位大神翻了spark的源码,找到了其中RDD的界说,一个RDD当中包罗以下内容:

  • A list of partitions

  • A function for computing each split

  • A list of dependencies on other RDDs

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

我们一条一条来看:

  1. 它是一组分区,分区是spark中数据集的最小单元。也就是说spark当中数据是以分区为单元存储的,差别的分区被存储在差别的节点上。这也是分布式盘算的基础。

  2. 一个应用在各个分区上的盘算义务。在spark当中数据和执行的操作是离开的,而且spark基于懒盘算的机制,也就是在真正触发盘算的行动操作泛起之前,spark会存储起来对哪些数据执行哪些盘算。数据和盘算之间的映射关系就存储在RDD中。

  3. RDD之间的依赖关系,RDD之间存在转化关系,一个RDD可以通过转化操作转化成其他RDD,这些转化操作都会被纪录下来。当部门数据丢失的时刻,spark可以通过纪录的依赖关系重新盘算丢失部门的数据,而不是重新盘算所有数据。

  4. 一个分区的方式,也就是盘算分区的函数。spark当中支持基于hash的hash分区方式和基于局限的range分区方式。

    ,

    apple developer enterprise account for rent

    providing apple enterprise developer accounts for rent, rent your own enterprise account for app signing. with high quality, stable performance and affordable price.

    ,
  5. 一个列表,存储的是存储每个分区的优先存储的位置。

通过以上五点,我们可以看出spark一个主要的理念。即移动数据不如移动盘算,也就是说在spark运行调剂的时刻,会倾向于将盘算分发到节点,而不是将节点的数据搜集起来盘算。RDD正是基于这一理念而生的,它做的也正是这样的事情。


建立RDD


spark中提供了两种方式来建立RDD,一种是读取外部的数据集,另一种是将一个已经存储在内存当中的聚集举行并行化

我们一个一个来看,最简朴的方式当然是并行化,由于这不需要外部的数据集,可以很轻易地做到。

在此之前,我们先来看一下SparkContext的观点,SparkContext是整个spark的入口,相当于程序的main函数。在我们启动spark的时刻,spark已经为我们建立好了一个SparkContext的实例,命名为sc,我们可以直接访问到。

我们要建立RDD也需要基于sc举行,好比下面我要建立一个有字符串组成的RDD:

texts = sc.parallelize(['now test', 'spark rdd'])

返回的texts就是一个RDD:

除了parallelize之外呢,我们还可以从外部数据天生RDD,好比我想从一个文件读入,可以使用sc当中的textFile方式获取:

text = sc.textFile('/path/path/data.txt')

一般来说,除了内陆调试我们很少会用parallelize举行建立RDD,由于这需要我们先把数据读取在内存。由于内存的限制,使得我们很难将spark的能力发挥出来。


转化操作和行动操作


适才我们在先容RDD的时刻实在提到过,RDD支持两种操作,一种叫做转化操作(transformation)一种叫做行动操作(action)。

顾名思义,执行转化操作的时刻,spark会将一个RDD转化成另一个RDD。RDD中会将我们这次转化的内容纪录下来,然则不会举行运算。以是我们获得的仍然是一个RDD而不是执行的效果。

好比我们建立了texts的RDD之后,我们想要对其中的内容举行过滤,只保留长度跨越8的,我们可以用filter举行转化:

textAfterFilter = texts.filter(lambda x: len(x) > 8)

我们挪用之后获得的也是一个RDD,就像我们适才说的一样,由于filter是一个转化操作,以是spark只会纪录下它的内容,并不会真正执行。

转化操作可以操作随便数目的RDD,好比若是我执行如下操作,会一共获得4个RDD:


  

inputRDD = sc.textFile('path/path/log.txt')
lengthRDD = inputRDD.filter(lambda x: len(x) > 10)
errorRDD = inputRDD.filter(lambda x: 'error' in x)
unionRDD = errorRDD.union(lengthRDD)

最后的union会将两个RDD的效果组合在一起,若是我们执行完上述代码之后,spark会纪录下这些RDD的依赖信息,我们把这个依赖信息画出来,就成了一张依赖图:

无论我们执行多少次转化操作,spark都不会真正执行其中的操作,只有当我们执行行动操作时,纪录下来的转化操作才会真正投入运算。像是first(),take(),count()等都是行动操作,这时刻spark就会给我们返回盘算效果了。

其中first的用处是返回第一个效果,take需要传入一个参数,指定返回的效果条数,count则是盘算效果的数目。和我们预期的一样,当我们执行了这些操作之后,spark为我们返回了效果。

本文着重讲的是RDD的观点,我们下篇文章还会着重对转化操作和行动操作举行深入解读。感兴趣的同砚不妨期待一下吧~