Abstracting Data with RDDs
Basic RDD Transformations and Actions
Pair RDDs in PySpark
Advanced RDD Actionss
RDD: Resillient Distributed Dataset.
Parallelizing an existing collection of objects.
External dataset:
Files in HDFS.
Objects in Amazon S3 bucket.
Lines in a text file.
From exsiting RDDs.
parallelize() for creating RDDs from Python lists.
numRDD = sc.parallelize([1,2,3,4]).
helloRDD = sc.parallelize("Hello World").
textfile() for creating RDDs from external datasets.
fileRDD = sc.textFile("readme.md").
A Partition is a logical division of a large distributed dataset.
numRDD = sc.parallelize(range(10), minPartitions = 6).
fileRDD = sc.textFile("readme.md", minPartitions = 6).
The number of partitions in an RDD can be found by using getNumPartitions().
PySpark Operations = Transformations + Actions.
Transformations create new RDDs
Actions perform computation on the RDDs.
Transformations follow Lazy evaluation.
Basic RDD Tranformation: map(), filter (), flatMap(), union().
map(): applies a function to all elements in the RDD.
RDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x*x)
filter(): returns new RDD with only the elements that pass the condition.
RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x: x>2)
flatMap(): returns multiple values for each element in the original RDD.
RDD = sc.parallelize([1,2,3,4])
RDD_flatMap = RDD.flatMap(lambda x: x.split(" "))
union():
inputRDD = sc.textFile("log.txt")
errorRDD = inputRDD.filter(lambda x: "error" in x.split())
warningRDD = inputRDD.filter(lambda x: "warning" in x.split())
combineRDD = errorRDD.union(warningRDD)
Operations that return a value after running a computation on the RDD.
Basic RDD Actions: collect(), take(N).
collect(): returns all the elements of the dataset as an array.
RDD_map.collect()
# [1, 4, 6, 9]
take(N): returns an array with the first N elements of the dataset.
RDD_map.take(2)
# [1, 4]
first(): print the first element of the RDD.
RDD_map.first()
# [1]
count(): returns the number of elements in the RDD.
RDD_flatmap.count()
# 5
Real-life datasets are usually key/ value pairs.
Each row is a key and maps to one or more values.
Pair RDD: Key - identifier and Value - data.
Common ways to create pair RDDs:
From a list of key-value tuple.
my_tuple = [('Sam', 23), ('Mary', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)
From a regurelar RDD.
my_list = ['Sam 23', 'Mary 24']
regularRDD = sc.parallelize(my_list)
paiRDD_RDD = regularRDD.map(lambda s:(s.split(' ')[0], s.split(' ')[1]))
Have to pass functions that operate on key-value pairs rather than on individual elements.
Paired RDD transformations:
reduceByKey(func): Combines values with the same key.
regularRDD = sc.parallelize([('Sam', 23), ('Mary', 25), ('Sam', 27)])
pairRDD_reducebykey = regularRDD.reduceByKey(lambda x , y: x+ y)
pairRDD_reducebykey.collect()
# [('Sam', 50), ('Mary', 25)]
groupByKey(): Group values with the same key.
regularRDD = sc.parallelize([('Sam', 23), ('Mary', 25), ('Sam', 27)])
pairRDD_groupbykey = regularRDD.groupByKey().collect()
for name, age in pairRDD_groupbykey:
print(name, list(age))
# Sam [50]
# Mary [25, 27]
sortByKey(): Return an RDD sorted by the key (ascending or descending order).
pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))
pairRDD_reducebykey_rev.sortByKey(ascending = False).collect()
# [('Sam', 50), ('Mary', 25)]
join(): Join 2 par RDDs based on their key.
RDD1 = sc.parallelize([('Phu', 23), ('Kha', 25)])
RDD2 = sc.parallelize([('Phu', 30), ('Kha', 29), ('Happy', 1)])
RDD1.join(RDD2).collect()
# [('Phu', (23, 30)), ('Kha', (25,29), ('Happy', (1))]
reduce(func): Aggregating the elements of a regular RDD.
x = [1,3,4,6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x,y:x+y)
# 14
saveAsTextFile(): Save RDD into a text file inside a directory with each partition as a separate file..
RDD.saveAsTextFile("tempFile")
coalesce(): Save RDD s a single text file.
RDD.coalesce(1).saveAsTextFile("tempFile")
countByKey(): counts the number of elements for each key (only available for type (K,V).
rdd = sc.parallelize([("a",1),("b",1),("a",1)])
for kee, val in rdd.countByKey().items():
print(kee, val)
#('a', 2)
#('b', 1)
collectAsMap(): return the key-value pairs in the RDD as a dictionary.
sc.parallelize([(1,2),(3,4)]).collectAsMap()
# {1:2, 3:4}