Skip to content

Spark

Spark SQL

Uses the same SQL flavor and built-in functions as Hive.

  • https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inFunctions
  • https://spark.apache.org/docs/latest/sql-programming-guide.html#supported-hive-features
  • https://docs.databricks.com/spark/latest/spark-sql/index.html#sql-language-manual

Temporary directories

  • https://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/spark/sparkConfiguration.html
  • https://www.ibm.com/support/knowledgecenter/en/SSCTFE_1.1.0/com.ibm.azk.v1r1.azka100/topics/azkic_t_createworkdirs.htm

Data partitioning

  • coalesce moves data to an equal or smaller number of nodes. "Combines existing partitions to avoid a full shuffle".
  • repartition divides data among the specified number of nodes (or spark.sql.shuffle.partitions by default). "The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data". Note that even if you specity a column to be used for repartitioning, you will get spark.sql.shuffle.partitions in the end, with some partitions potentially being empty and some having multiple values for the given column). See https://stackoverflow.com/a/42780452/2063031.
  • spark.sql.shuffle.partitions - Rule of thumb is that each partition should process around 128 MB. If the value is close to 2000, we can bump it up to 2001 to use a different shuffle algorithm (see https://stackoverflow.com/a/36459198/2063031).

NB: If you want to write one Parquet file per partition, you should repartition the dataset by the same column before writing.

See also:

Parquet options

  • sc.hadoopConfiguration.setInt("parquet.block.size", {some_value}) - Set the max size of each Parquet code, in bytes. For example, set 1024 * 1024 * 16 for 16 MB.

Memory management

https://spark.apache.org/docs/latest/configuration.html#memory-management

  • spark.memory.fraction - The fraction of Spark memory that is reserved for Spark (as opposed to user) objects.
  • spark.memory.storageFraction - The fraction of Spark "working memory" that is reserved for caches, etc. The rest can be used for execution (like SQL queries; this is generally all we do).
  • Note that you also need to leave some memory for the operating systrm buffers, cache, libraries, etc. Using ((vmem - 1024) * 0.8) / num_executors seems to work for my use-cases. See https://stackoverflow.com/a/46801844/2063031.

NB: 64GB is a rough guess at a good upper limit for a single executor [[1]].

See also:

Garbage Collector

Use the G1 garbage collector

spark_conf = SparkConf()
spark_conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC")
spark_conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")

Sources:

  • https://stackoverflow.com/a/34590161/2063031
  • https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

Increase Spark memory fraction and new-to-old ratio

spark_conf = SparkConf()
spark_conf.set("spark.memory.fraction", "0.75")
spark_conf.set("spark.memory.storageFraction", "0.1")
spark_conf.set("spark.executor.extraJavaOptions", "-XX:NewRatio=3")

NB: It's ok to specify spark.driver.extraJavaOptions from PySpark.

Errors

Error java.lang.OutOfMemoryError: GC overhead limit exceeded

The program is stuck in GC cycles.

https://stackoverflow.com/a/1393503/2063031

Set up JDBC server

  1. Start Spark master:

    bash ./sbin/start-master.sh --host 192.168.6.210

  2. Start one (or more) Spark workers:

    bash ./sbin/start-slave.sh spark://192.168.6.210:7077 -c 28 -m 128g

  3. Start thriftserver for serving JDBC clients:

    bash ./sbin/start-thriftserver.sh \ --master spark://192.168.6.210:7077 \ --hiveconf hive.server2.thrift.port=10001 \ --hiveconf hive.server2.thrift.bind.host 192.168.6.210

  4. Connect using beeline or another JDBC client:

    bash ./bin/beeline -u 'jdbc:hive2://192.168.6.210:10001'

    Note: Make sure that you specify all jars from the ./spark/jars/ folder for whichever JDBC client you chose to use (tested with DataGrip).

Use custom JARs in Jupyter

Using a local JAR file:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-streaming-kafka-assembly_2.10-1.6.1.jar pyspark-shell'
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
sc = pyspark.SparkContext()
ssc = StreamingContext(sc,1)
broker = "<my_broker_ip>"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test1"], {"metadata.broker.list": broker})
directKafkaStream.pprint()
ssc.start()

Using a package from http://spark-packages.org:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'
df = sqlContext.load(source="com.databricks.spark.csv", header='true', inferSchema='true', path='cars.csv')

Python integration