Nitendra Gautam

Introduction to Apache SparkSQL

Spark SQL

Spark SQL or previously known as Shark (SQL on Spark)is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

Spark SQL is a Spark library that runs on top of Spark. It provides a higher-level abstraction than the Spark core API for processing structured data. Structured data includes data stored in a database, NoSQL data store, Parquet, ORC, Avro, JSON, CSV, or any other structured format.

Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.

Spark SQL can be used as a library for developing data processing applications in Scala, Java, Python, or R. It supports multiple query languages, including SQL, HiveQL, and language integrated queries. In addition, it can be used for interactive analytics with just SQL/HiveQL. In both cases, it internally uses the Spark core API to execute queries on a Spark cluster.The core of the component supports an altogether different RDD called SchemaRDD, composed of rows objects and schema objects defining data type of each column in the row. It is similar to a table in relational database.

Spark SQL Uses

One use case of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. When running SQL from within another programming language the results will be returned as a Dataset/DataFrame. You can also interact with the SQL interface using the command-line or over JDBC/ODBC.

Spark DataSets

A Dataset is a distributed collection of data which can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

A DataFrame is a DataSet of Row objects (DataSet[Row])

  • DataSets can wrap a given struct or type (DataSet[Person], DataSet[(String, Double)])
  • DataFrames schema is inferred at runtime but a DateSet can be inferred at compile time. Faster detection of errors means better Optimization
  • RDD’s can be converted to DataSets with .toDS()
  • DataSets are more efficient as they can be serialized very efficiently -even better than Kyro
  • Optimal execution plans can be determined at compile time
  • DateSets allow for better interoperability.In fact MLLib and Spark Streaming are moving toward using DataSets instead of RDD’s for their primary API
  • DataSets simplifies development as SQL operations can be performed on datasets with one line

Spark DataFrame

A DataFrame is a Dataset organized into named columns. It is Spark SQL’s primary data abstraction which represents a distributed collection of rows organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

DataFrame is a class defined in the Spark SQL library. It provides various methods for processing and analyzing structured data. For example, it provides methods for selecting columns, filtering rows, aggregating columns, joining tables, sampling data, and other common data processing tasks. Unlike RDD, DataFrame is schema aware.

An RDD is a partitioned collection of opaque elements, whereas a DataFrame knows the names and types of the columns in a dataset. As a result, the DataFrame class is able to provide a rich domain-specific-language (DSL) for data processing. The DataFrame API is easier to understand and use than the RDD API. However, if required, a DataFrame can also be operated on as an RDD. An RDD can be easily created from a DataFrame. Thus, the complete RDD interface is also available to process data represented by a DataFrame. A DataFrame can be registered as a temporary table, which can be queried with SQL or HiveQL. A temporary table is available only while the application that registered it is running.

Interaction with Spark SQL

There are several ways to interact with Spark SQL including SQL and the Dataset API.

  • Hive table : However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.

  • Spark session : The entry point to programming Spark with the Dataset and DataFrame API. Spark session : A unified entry point for manipulating data with Spark. … Beyond a time-bounded interaction, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs.

Create Spark Session

In Spark 2.0.0 or greater we can create a SparkSession object instead of a SparkContext when using Spark SQL /DataSets.

we use SparkContext from this SparkSession and use it to issue SQL queries on the Data Sets

Entry point to all the functionality in spark SQL is through Spark session class

import org.apache.spark.sql.sparksession

val spark = SparkSession 
   .appName("SPark SQL Session")
   // For Implicit Conversiosn from RDDs to Data Frames
   import spark.implicits._

val df ="file.json") //Create Data Frames from Json
//Display the Content

Extract Data from Spark SQL

val sqlDataFrame = spark.sql("SELECT * FROM people")  //Display the Data Frame

Saving Data to Peristant Tables in Spark Sql

DataFrames can also be saved as persistent tables into Hive metastore using the saveAsTable command. Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.

If the data source is based upon a file-based like text,parquet ,json ,csv etc we can specify a custom table path For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the path option, e.g. df.write.option("path", "/some/path").saveAsTable("t"). When the table is dropped, the custom table path will not be removed and the table data is still there.

If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.

Partition Discovery in Spark SQL

Table partition is an optimization approach in which data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory.The Parquet data source is now able to discover and infer partitioning information automatically.

When writing the Data in Hive Table , .partitionBy is used to Partition the Data

<Data Frame>

Caching Data In Memory in Spark SQL

Table can be cached using an in-memory columnar format by calling spark.catalog.cacheTable("table-name") or sparkDataFRame.cache() function .

To remove the table from in-memory we use spark.catalog.uncacheTable("table-name") .

Saving Modes in Spark SQL

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.

Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists (default)“error” or “errorifexists” (default)When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
SaveMode.Append“append”When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
SaveMode.Overwrite“overwrite”Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
SaveMode.Ignore“ignore”Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Data Modelling in Spark

Data modeling is a critical step in Spark SQL when we need to extract data from databases like Cassandra or any other SQL databases .

Catalyst Optimizer in Spark

Catalyst optimizer primarily leverages functional programming constructs of Scala such as pattern matching. It offers a general framework for transforming trees or lineage graph in Spark, which we use to perform analysis, optimization, planning, and runtime code generation.

Catalyst framework is an optimization framework present in Spark SQL. It allows Spark to automatically transform SQL queries by adding new optimizations to build a faster processing system.

Catalyst optimizer has two primary goals:

  • Make adding new optimization techniques easy

  • Enable external developers to extend the optimizer

Catalyst’s transformation phases in Spark SQL

1. Analyzing a logical plan to resolve references

2. Logical plan optimization

3. Physical planning

4. Code generation to compile the parts of the query to Java bytecode

Spark SQL optimization

** 1.spark.sql.codegen**

  • It asks the Spark SQL to compile each query to Java byte code before executing it

** 2. spark.sql.inMemoryColumnarStorage.batchSize**


[1] Catalyst Optimizer in Spark SQL

[2] Spark Documentation