Spark Programming Model
Writing a Spark program consists of following steps:
Spark Shell:
Launching spark shell depends on the environment where you want to run spark jobs.
$ spark-shell --master local[N]
Some useful commands for first time users,
SparkContext:
A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. This limitation may eventually be removed.
In the Spark shell, the SparkContext is already created for you as variable sc. SC is an object and there are many methods associated to it. to see them do - scala> sc.[\t]
Other programs must use constructor to instantiate new SparkContext.
Resilient Distributed Datasets (RDDs):
An RDD is Spark's fundamental abstraction for representing collection of objects that can be distributed across machines in cluster. RDD is laid out across cluster of machines as a collection of partitions, each including a subset of data. Partitions define the unit of parallelism in Spark. The framework processes the objects within a partition in sequence, and processes multiple partitions in parallel. One easy way to create RDD is to use parallelize method of SparkContext with a local collection of objects.
val rdd = sc.parallelize(Array(1,2,3,4),4)
To create a RDD from text file residing in your machine or on HDFS use textFile method if SparkContext.
val rddText = sc.textFile('/path/to/file')
By specifying the directory name in place of file path, Spark creates RDD for all the files in the directory. Note that no actual data has been read by spark or loaded in memory yet. when time comes to compute objects within a partition Spark reads the section of the input file and applies subsequent transformations.
Actions:
The act of creating an RDD doesn't cause any distributed computing to take place on the cluster. Rather, RDDs define logical data that are intermediate steps in the computation. Distributed computation occurs upon invoking an action on RDD. Some of the examples for action are :
Data returned by Scala REPL can be somewhat hard to read, to make the display more elegant use .foreach(println) on the returned object.
Caching:
Although the contents of RDD are transient by default, Spark allows us to persist the data in RDD.
rdd.persist(storagelevel.MEMORY_SER) - this will store serialized RDD as array of bytes in memory
rdd.persist(storagelevel.MEMORY_AND_DISK) - This will store the unserialized RDD content in memory, when spark estimates that a partition will not fit in memory it will then spill to disk.
rdd.persist(storagelevel.MEMORY_AND_DISK_SER) - Similar to MEMORY_AND_DISK but this wills tore serialized RDD content.
Broadcast variable:
Broadcast variables let programmer keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
For example, to give every node a copy of a large input dataset efficiently.
Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
sc.broadcast(Array(1, 2, 3))
Accumulators variable:
Accumulators are variables that can only be “added” to through an associative operation.
Used to implement counters and sums, efficiently in parallel.
Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can extend for new types.
Only the driver program can read an accumulator’s value, not the tasks
val acc = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => acc += x)
acc.value // { this can be called only on driver side }
Spark SQL:
Spark SQL allows to intermix SQL commands to query external data, along with complex analytics in single application.
Entry point to Spark SQL is sqlContext or its children (example: HiveContext that provides additional functionality like writing queries using HiveQL parser.). To create a sqlContext we need to have sparkContext.
val sqlContext = new org.apache.spark.sql..SQLContext(sc)
import sqlContext._
Spark SQL supports automatically converting an RDD containing case classes to a DataFrame.
The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns.
case class Employee(name: String, age: Int)
val employee = sc.textFile("path to file/employee.txt").map(_.split("\t")).map(e => Employee(e(0), e(1).trim.toInt)).toDF()
DataFrame can be registered as a table
employee.registerTempTable("employee")
This tempTable can be used for running SQL queries.
The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
The columns of a row in the result can be accessed by ordinal.
val allEmp = sqlContext.sql("SELECT * FROM employee")
allEmp.show()
Spark SQL also supports read/write operations on parquet files while retaining the schema. Parquet is a columnar format, supported by many different Big Data frameworks.
Storing RDD as parquet file:
employee.saveAsParquetFile("employee.parquet")
Loading parquet file:
val parquetFile = sqlContext.parquetFile("employee.parquet")
Converting parquet file to tempTable:
parquetFile.registerTempTable("parquetTable")
val content = sql("SELECT * FROM parquetTable")
content.collect().foreach(println)
RDD allows running SQL queries by representing column labels with tick (') followed by label.
employee.where('name = "alpha").select('name)
Spark Streaming:
- Defining a set of transformations on the input set
- invoking the actions that output the transformed datasets to persistent storage or return results to driver local memory.
- Running local computations that operated on the results computed in a distributed fashion. These can help you decide the transformations and actions to take next.
Spark Shell:
Launching spark shell depends on the environment where you want to run spark jobs.
- If you have a Hadoop cluster that runs a version of Hadoop that supports YARN, then you can launch spark jobs on that cluster by using the value of yarn-client for spark master $ spark-shell --master yarn-client
- If you want to run the computations on local environment then you can launch a local spark cluster by specifying local[N], where N signifies the number of threads to run or * to match number of cores available in the machine.
$ spark-shell --master local[N]
Some useful commands for first time users,
- :help - lists available commands in the shell
- :history - lists the names of variables and functions defined in the current session
SparkContext:
A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. This limitation may eventually be removed.
In the Spark shell, the SparkContext is already created for you as variable sc. SC is an object and there are many methods associated to it. to see them do - scala> sc.[\t]
Other programs must use constructor to instantiate new SparkContext.
Resilient Distributed Datasets (RDDs):
An RDD is Spark's fundamental abstraction for representing collection of objects that can be distributed across machines in cluster. RDD is laid out across cluster of machines as a collection of partitions, each including a subset of data. Partitions define the unit of parallelism in Spark. The framework processes the objects within a partition in sequence, and processes multiple partitions in parallel. One easy way to create RDD is to use parallelize method of SparkContext with a local collection of objects.
val rdd = sc.parallelize(Array(1,2,3,4),4)
- First parameter represents the collection of objects to be parallelize.
- Second parameter defines the number of partitions.
To create a RDD from text file residing in your machine or on HDFS use textFile method if SparkContext.
val rddText = sc.textFile('/path/to/file')
By specifying the directory name in place of file path, Spark creates RDD for all the files in the directory. Note that no actual data has been read by spark or loaded in memory yet. when time comes to compute objects within a partition Spark reads the section of the input file and applies subsequent transformations.
Actions:
The act of creating an RDD doesn't cause any distributed computing to take place on the cluster. Rather, RDDs define logical data that are intermediate steps in the computation. Distributed computation occurs upon invoking an action on RDD. Some of the examples for action are :
- rdd.count() - returns number of objects in RDD
- rdd.collect() - returns all the objects in RDD
- rdd.first() - returns the first object in RDD
- rdd.take(n) - returns first n objects in RDD
- rdd.saveAsTextFile("/path/to/save/file") - saves the content of rdd to the persistent storage
Data returned by Scala REPL can be somewhat hard to read, to make the display more elegant use .foreach(println) on the returned object.
Caching:
Although the contents of RDD are transient by default, Spark allows us to persist the data in RDD.
- After the first action performed the content in RDD gets stored in memory or disk as specified.
- Next time, when any action depends on this RDD, it need not to be recomputed from its dependencies rather can be directly accessed from memory or disk.
- The cache is fault-tolerant: if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
rdd.persist(storagelevel.MEMORY_SER) - this will store serialized RDD as array of bytes in memory
rdd.persist(storagelevel.MEMORY_AND_DISK) - This will store the unserialized RDD content in memory, when spark estimates that a partition will not fit in memory it will then spill to disk.
rdd.persist(storagelevel.MEMORY_AND_DISK_SER) - Similar to MEMORY_AND_DISK but this wills tore serialized RDD content.
Broadcast variable:
Broadcast variables let programmer keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
For example, to give every node a copy of a large input dataset efficiently.
Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
sc.broadcast(Array(1, 2, 3))
Accumulators variable:
Accumulators are variables that can only be “added” to through an associative operation.
Used to implement counters and sums, efficiently in parallel.
Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can extend for new types.
Only the driver program can read an accumulator’s value, not the tasks
val acc = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => acc += x)
acc.value // { this can be called only on driver side }
Spark SQL:
Spark SQL allows to intermix SQL commands to query external data, along with complex analytics in single application.
Entry point to Spark SQL is sqlContext or its children (example: HiveContext that provides additional functionality like writing queries using HiveQL parser.). To create a sqlContext we need to have sparkContext.
val sqlContext = new org.apache.spark.sql..SQLContext(sc)
import sqlContext._
Spark SQL supports automatically converting an RDD containing case classes to a DataFrame.
The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns.
case class Employee(name: String, age: Int)
val employee = sc.textFile("path to file/employee.txt").map(_.split("\t")).map(e => Employee(e(0), e(1).trim.toInt)).toDF()
DataFrame can be registered as a table
employee.registerTempTable("employee")
This tempTable can be used for running SQL queries.
The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
The columns of a row in the result can be accessed by ordinal.
val allEmp = sqlContext.sql("SELECT * FROM employee")
allEmp.show()
Spark SQL also supports read/write operations on parquet files while retaining the schema. Parquet is a columnar format, supported by many different Big Data frameworks.
Storing RDD as parquet file:
employee.saveAsParquetFile("employee.parquet")
Loading parquet file:
val parquetFile = sqlContext.parquetFile("employee.parquet")
Converting parquet file to tempTable:
parquetFile.registerTempTable("parquetTable")
val content = sql("SELECT * FROM parquetTable")
content.collect().foreach(println)
RDD allows running SQL queries by representing column labels with tick (') followed by label.
employee.where('name = "alpha").select('name)
Spark Streaming: