Integrating PySpark with Redshift
In my article on how to connect to S3 from PySpark I showed how to setup Spark with the right libraries to be able to connect to read and right from AWS S3. In the following article I show a quick example how I connect to Redshift and use the S3 setup to write the table to file.
First of all I need the Postgres driver for Spark in order to make connecting to Redshift possible.
$ wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.6/postgresql-42.2.6.jar -P /opt/notebooks/
I have saved my configuration in the following variable for testing purposes. Of course it would be wise to store the details in environment variables or in a proper configuration file.
config = {
'aws_access_key': 'aaaaaa',
'aws_secret_key': 'bbbbb',
'aws_region': 'eu-west-2',
'aws_bucket': 'my-bucket',
'redshift_user': 'user',
'redshift_pass': 'pass',
'redshift_port': 1234,
'redshift_db': 'mydatabase',
'redshift_host': 'myhost',
}
Setting up the Spark context is straightforward. Make sure the Postgres library is available by adding it to extraClassPath
, or copy it to the jars
folder in the Spark installation location (SPARK_HOME
).
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/opt/notebooks/postgresql-42.2.6.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.access.key", config.get('aws_access_key'))
.set("spark.hadoop.fs.s3a.secret.key", config.get('aws_secret_key'))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)
Now the Spark context is set I specify the schema and the table that I want to read from Redshift and write to S3.
schema = 'custom'
table = 'postcodes'
The reading is done using the jdbc
format and specifying the Redshift details:
df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}.redshift.amazonaws.com:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema}.{table}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
Writing is easy since I specified the S3 details in the Spark configuration.
df.write.mode('overwrite').parquet("s3a://{config.get('aws_bucket')}/raw/{schema}/{table})