Create a Column.
Filtering Data.
Selecting.
Aggregating.
Grouping.
Joining.
# Create the DataFrame flights
flights = spark.table("flights")
# Show the head
flights.show()
# Add duration_hrs
flights = flights.withColumn("duration_hrs", flights.air_time/60)
.filter()= WHERE clause
# Filter flights by passing a string
print(flights.columns)
long_flights1 = flights.filter("distance > 1000")
# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)
# Print the data to check they're equal
long_flights1.show()
long_flights2.show()
.filter() = WHERE clause
# Filter flights by passing a string
print(flights.columns)
long_flights1 = flights.filter("distance > 1000")
# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)
# Print the data to check they're equal
long_flights1.show()
long_flights2.show()
.select(): perform column wise operations.
.alias(): rename a column you are selecting.
flights.select((flights.air_time/60).alias("duration_hrs"))
.selectExpr(): takes SQL expression as a string.
flights.selectExpr("air_time/60 as duration_hrs")
Example:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)
# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
.min(), .max(), .count(): are Grouped Data methods.
These are created by calling: .groupBy()
Example 1:
df.groupBy().min("col").show()
Example 2:
# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()
# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin == "SEA").groupBy().max("air_time").show()
Example 3:
# Average duration of Delta flights
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").show()
# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()
pyspark.sql.GroupedData: addition of groups.
These are created by calling: .groupBy() == GROUP BY in sql
Example 1:
# Group by tailnum
by_plane = flights.groupBy("tailnum")
# Number of flights each plane made
by_plane.count().show()
# Group by origin
by_origin = flights.groupBy("origin")
# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()
.agg(): pass an aggregate column expression that uses any of the aggregate functions from the pyspark.sql.functions submodule.
These are created by calling: .groupBy() == GROUP BY in sql
Example 1:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F
# Group by month and dest
by_month_dest = flights.groupBy("month","dest")
# Average departure delay by month and destination
by_month_dest.avg("dep_delay").show()
# Standard deviation of departure delay
by_month_dest.agg(F.stddev("dep_delay")).show()
A join will combine two different tables along a column that they share. This column is called the key.
.join(): task three arguments.
The second DataFrame that you want to join to the first one.
on: name of key columns, as a string.
how: specifies the kind of join to perform (how = "leftouter")
Example 1:
# Examine the data
print(airports.show())
# Rename the faa column
airports = airports.withColumnRenamed("faa","dest")
# Join the DataFrames
flights_with_airports = flights.join(airports, "dest", "leftouter")
# Examine the new DataFrame
print(flights_with_airports.show())
Joining in Pandas: https://www.datacamp.com/courses/joining-data-with-pandas