Skip to content


Spark SQL

Uses the same SQL flavor and built-in functions as Hive.


Temporary directories


Data partitioning

  • coalesce moves data to an equal or smaller number of nodes. "Combines existing partitions to avoid a full shuffle".
  • repartition divides data among the specified number of nodes (or spark.sql.shuffle.partitions by default). "The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data". Note that even if you specity a column to be used for repartitioning, you will get spark.sql.shuffle.partitions in the end, with some partitions potentially being empty and some having multiple values for the given column). See
  • spark.sql.shuffle.partitions - Rule of thumb is that each partition should process around 128 MB. If the value is close to 2000, we can bump it up to 2001 to use a different shuffle algorithm (see

NB: If you want to write one Parquet file per partition, you should repartition the dataset by the same column before writing.

See also:

Parquet options

  • sc.hadoopConfiguration.setInt("parquet.block.size", {some_value}) - Set the max size of each Parquet code, in bytes. For example, set 1024 * 1024 * 16 for 16 MB.

Memory management

  • spark.memory.fraction - The fraction of Spark memory that is reserved for Spark (as opposed to user) objects.
  • spark.memory.storageFraction - The fraction of Spark "working memory" that is reserved for caches, etc. The rest can be used for execution (like SQL queries; this is generally all we do).
  • Note that you also need to leave some memory for the operating systrm buffers, cache, libraries, etc. Using ((vmem - 1024) * 0.8) / num_executors seems to work for my use-cases. See

NB: 64GB is a rough guess at a good upper limit for a single executor [[1]].

See also:

Garbage Collector

Use the G1 garbage collector

spark_conf = SparkConf()
spark_conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC")
spark_conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")



Increase Spark memory fraction and new-to-old ratio

spark_conf = SparkConf()
spark_conf.set("spark.memory.fraction", "0.75")
spark_conf.set("spark.memory.storageFraction", "0.1")
spark_conf.set("spark.executor.extraJavaOptions", "-XX:NewRatio=3")

NB: It's ok to specify spark.driver.extraJavaOptions from PySpark.


Error java.lang.OutOfMemoryError: GC overhead limit exceeded

The program is stuck in GC cycles.

Set up JDBC server

  1. Start Spark master:

    bash ./sbin/ --host

  2. Start one (or more) Spark workers:

    bash ./sbin/ spark:// -c 28 -m 128g

  3. Start thriftserver for serving JDBC clients:

    bash ./sbin/ \ --master spark:// \ --hiveconf hive.server2.thrift.port=10001 \ --hiveconf

  4. Connect using beeline or another JDBC client:

    bash ./bin/beeline -u 'jdbc:hive2://'

    Note: Make sure that you specify all jars from the ./spark/jars/ folder for whichever JDBC client you chose to use (tested with DataGrip).

Use custom JARs in Jupyter

Using a local JAR file:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-streaming-kafka-assembly_2.10-1.6.1.jar pyspark-shell'
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
sc = pyspark.SparkContext()
ssc = StreamingContext(sc,1)
broker = "<my_broker_ip>"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test1"], {"": broker})

Using a package from

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'
df = sqlContext.load(source="com.databricks.spark.csv", header='true', inferSchema='true', path='cars.csv')

Python integration