Building A Development Spark Cluster with Postgres Using Docker And Make

Overview

My goal is to build a decent approximation of a production Apache Spark EMR environment for development and testing.

For development, I need:

  • A Postgres database to persist datasets.
  • A Spark cluster with two worker nodes.
  • A Jupiter notebook server for prototyping.
  • To submit a PySpark job to the Spark master.
  • A Shared filesystem for file processing and code sharing.

The approach I’ve taken is to run everything on Docker to avoid building up an environment from scratch.

The Docker setup has a develop network where all of the containers run. To support development, I’ve exposed the Spark UI, Jupter and Postgres ports. Spark UI is exposed on 8100 and Jupyter is on 9999, I use non-standard ports to avoid conflicts with other stuff on the host machine.

Docker Apache Spark Overview

Building the environment

# Clone the repo
git clone https://github.com/omahoco/spark-postgres.git

# Create the environment
cd spark-postgres
make all

Testing via Jupyter

Navigate to http://localhost:9999. Jupyter will ask you for a token for authentication.

To get the authentication token from the Jupiter server.

cd spark-postgres
make jupyter_token

In Jupyter, you can test connectivity to the spark master using the following code.

import pyspark

conf = pyspark.SparkConf().setAppName('Jupyter').setMaster('spark://spark:7077')
sc = pyspark.SparkContext(conf=conf)
session = pyspark.sql.SparkSession(sc)


rdd = sc.emptyRDD()
print(rdd.isEmpty())

sc.stop()

Next, let’s test connectivity to Postgres by counting the number of Postgres users.

import pyspark

conf = pyspark.SparkConf().setAppName('Postgres').setMaster('spark://spark:7077')
sc = pyspark.SparkContext(conf=conf)
session = pyspark.sql.SparkSession(sc)

jdbc_url = 'jdbc:postgresql://postgres/postgres'
connection_properties = {
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'stringtype': 'unspecified'}

df = session.read.jdbc(jdbc_url,'pg_catalog.pg_user',properties=connection_properties)

print(df.count())

sc.stop()

Spark Working With Files

The Spark and Jupyter containers mount the hosts /tmp folder to /data. The mount acts as a shared file system accessible to both Jupyter and Spark. In a production environment a network file system, S3 or HDFS would be used.

In the example below, we download the dataset and ask Spark to load it into a Dataframe. Next, we use an aggerate function to calculate the average systolic blood pressure from the sample.

From a Jupyter notebook:

!curl https://raw.githubusercontent.com/omahoco/spark-postgres/master/nhanes_2015_2016.csv -O /data/nhanes_2015_2016.csv
import pyspark

conf = pyspark.SparkConf().setAppName('Jupyter').setMaster('spark://spark:7077')
sc = pyspark.SparkContext(conf=conf)
session = pyspark.sql.SparkSession(sc)

# Load Some NHANYES Data - https://www.cdc.gov/nchs/nhanes/index.htm
nhanyes_df = session.read.format('csv').option("header", "true").load('/data/nhanes_2015_2016.csv')

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

# BPXSY2- Systolic: Blood pressure (second reading) mm Hg
stats_df = nhanyes_df.select(
    _mean(col('BPXSY2')).alias('BPXSY2_mean'),
    _stddev(col('BPXSY2')).alias('BPXSY2_std')
).collect()

stats_df

The output should be [Row(BPXSY2_mean=124.78301716350497, BPXSY2_std=18.527011720294993)]

Spark Working With Tables

In this example, we will create a Postgres table to track virtual coin tosses. We will use the table and Spark to calculate the frequency we get heads or tails over a million coin tosses.

Creating Random Data

Connect to Postgres and get a terminal to enter interactive queries

 docker exec -it postgres /bin/bash
 psql --user postgres

Inside psql let’s generate 1 million random coin toss events.

CREATE TABLE public.coin_toss (
    event_ts timestamp default now(),
    outcome TEXT NOT NULL
);

INSERT INTO public.coin_toss(outcome)
SELECT CASE WHEN RANDOM() < 0.5 THEN 'heads' ELSE 'tails' END FROM generate_series(1, 1000000);

Calculating the Frequencies With Spark

import pyspark
    
conf = pyspark.SparkConf().setAppName('Postgres').setMaster('spark://spark:7077')
sc = pyspark.SparkContext(conf=conf)
session = pyspark.sql.SparkSession(sc)

jdbc_url = 'jdbc:postgresql://postgres/postgres'
connection_properties = {
            'user': 'postgres',
            'password': 'postgres',
            'driver': 'org.postgresql.Driver',
            'stringtype': 'unspecified'}

df = session.read.jdbc(jdbc_url,'public.coin_toss',properties=connection_properties)

samples = df.count()
stats = df.groupBy('outcome').count()

for row in stats.rdd.collect():
    print("{} {}%".format(row['outcome'], row['count'] / samples * 100))
    
sc.stop()

Running A Spark Job

Instead of driving everything from a Jupyter notebook, we can run our Python code directly on Spark by submitting it as a job.

I’ve packaged the coin toss example as a Python file at pyspark/src/main.py.

We can submit the code as a job to spark by running:

make spark-submit

2022

Remote Development With Codespaces

9 minute read

Are you tired of spending hours setting up your local machine so you can work with a new repository? Is Docker hogging CPU and memory, killing the promise of...

Back to Top ↑

2021

How To Use Spark Partitions With Postgres

3 minute read

In the post, we will explore how we can use partitions to reduce job execution time by increasing parallelism. Specifically, we will focus on building Spark ...

Installing aws-azure-login on Apple M1

1 minute read

Aws-azure-login is a command-line utility for organisations using Azure Active Directory to authenticate users to the AWS console. It lets you use an Azure A...

Back to Top ↑

2020

Back to Top ↑

2016

Back to Top ↑

2013

Back to Top ↑