Skip to content

Spark

Launching Spark

  • The SparkConf object sets the configuration for the Spark Application.
  • The SparkContext is the entry point of Spark functionality. It allows your Spark Application to access Spark Cluster with the help of Resource Manager.
  • The SparkSession is the entry point into the Structured API.
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

# set master can increase the cores
cnfg = SparkConf().setAppName("CustomerApplication").setMaster("local[2]")
sc = SparkContext(conf=cnfg)
spark = SparkSession(sc)

Read Files

# from database
cnfg = SparkConf().setAppName("CustomerApplication").setMaster("local[2]")
cnfg.set("spark.jars","D:\\mysql-connector-java-5.1.49.jar")
sc = SparkContext(conf=cnfg)
spark = SparkSession(sc)
df = (spark.read.format("jdbc")
        .options(url="jdbc:mysql://localhost/videoshop",
        driver="com.mysql.jdbc.Driver",
        dbtable="SomeTableName",
        user="venkat", password="P@ssw0rd1").load())

# read csv
df = (spark.read.option("header", "true").option("inferSchema", "true").csv(filepath))
# read json
df = (spark.read.option("header", "true").option("inferSchema", "true").json(filepath))

We can also define a schema for a particular data.

custschema = StructType([
                StructField("Customerid", IntegerType(), True),
                StructField("CustName", StringType(), True),
                StructField("MemCat", StringType(), True),
                StructField("Age", IntegerType(), True),
                StructField("Gender", StringType(), True),
                StructField("AmtSpent", DoubleType(), True),
                StructField("Address", StringType(), True),
                StructField("City", StringType(), True),
                StructField("CountryID", StringType(), True),
                StructField("Title", StringType(), True),
                StructField("PhoneNo", StringType(), True)
            ])

df.printSchema()
df = (spark.read
        .schema(schema=custschema)
        .csv(inputFilePath))
df.show()

Write Files

Note that we write files to a folder. The file name is system generated, e.g. "part-00000-2ee2d8b6-169a-4f48-a503-8b6a2fdedab0-c000.json".

df.write.json("D:\\CountryOUT")

Basic Summary

# bool for col truncation
df.show(20, False) 
# schema
df.printSchema()
# statistics
df.describe("MemberCategory", "Gender", "AmountSpent", "Age").show()

Spark SQL

The Spark SQL API works in structure similar to SQL. This is used when the data format is well structured, and presentable in a tabular format. Spark SQL is a high level API compared to RDD, therefore is easy to use.

# simple query
df2 = df.orderBy("Age").where("Age>20").select(df["CustomerID"], df["CustomerName"], df["Age"])
df2.show(200, False)

# aggregate (single value)
tot = df.agg(sum("AmountSpent")).first()[0]
tot = df.agg(avg("Age")).first()[0]
std = df.agg(stddev_pop("AmountSpent")).first()[0]
skw = df.agg(skewness("AmountSpent")).first()[0]

# group by
df.groupBy("MemberCategory").sum("AmountSpent").show(200,False)

We can also do joins from multiple dataframes.

customerFilePath = r"D:\workspace\Customer.csv"
countryFilePath = r"D:\workspace\Country.csv"

dfCustomer = ( spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(customerFilePath) )

dfCountry = ( spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(countryFilePath) )

joinDF = dfCustomer.join(dfCountry, "CountryCode")

( joinDF.select("CustomerID", "CustomerName", "CountryCode",
"CountryName", "Currency", "TimeZone")
.show(300, False) ) )

RDD

Resilient Distributed Dataset