Using Spark to read from S3
Goal¶
We want to read data from S3 with Spark. Ideally we want to be able to read Parquet files from S3 into our Spark Dataframe.
Preparation¶
On my Kubernetes cluster I am using the Pyspark notebook. In the home folder on the container I downloaded and extracted Spark 2.4.0. After extracting I set the SPARK_HOME
environment variable.
import os
import sys
os.environ["SPARK_HOME"] = "/home/jovyan/spark-2.4.0-bin-hadoop2.7"
We need to download the libraries to be able to communicate with AWS and use S3 as a file system. Download the following two jars to the jars
folder in the Spark installation.
!wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -P $SPARK_HOME/jars/
!wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar -P $SPARK_HOME/jars/
In order to read from AWS S3, we need to set some parameters in the configuration file for spark. This is normally located at $SPARK_HOME/conf/spark-defaults.conf
. Enter the following three key value pairs replacing the obvious values:
# spark-defaults.conf
spark.hadoop.fs.s3a.access.key=MY_ACCESS_KEY
spark.hadoop.fs.s3a.secret.key=MY_SECRET_KEY
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
Script¶
Set the Spark configuration and create the Spark context and the SQL context.
from pyspark import SparkConf, SparkContext, SQLContext
conf = (SparkConf()
.setAppName("S3 Configuration Test")
.set("spark.executor.instances", "1")
.set("spark.executor.cores", 1)
.set("spark.executor.memory", "2g"))
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
Path where to write to and read from:
path = "s3a://user-jwaterschoot/mario-colors/"
Let's create a simple RDD and save it as a dataframe in Parquet format:
rdd = sc.parallelize([('Mario', 'Red'), ('Luigi', 'Green'), ('Princess', 'Pink')])
rdd.toDF(['name', 'color']).write.parquet(path)
Read the data back from the S3 path:
df = sqlContext.read.parquet(path)
df.show(5)