Using Azure Blob Storage and Parquet
Objective¶
This notebook shows how to interact with Parquet on Azure Blob Storage.
Please note that it is not possible to write Parquet to Blob Storage using PySpark. I have tried with version 2.2, 2.3 and 2.4 but none of them work (yet). It connects and creates the folder, but no data is written. Azure support was not able to help me, except for advising me to use HDinsights.
Imports¶
from azure.storage.blob import BlockBlobService
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
from configparser import RawConfigParser
from pyspark import SparkConf, SparkContext, SQLContext
Definitions¶
BLOB_NAME = "characters.parquet"
Setup Blob¶
# Read the configuration
config = RawConfigParser()
config.read("blobconfig.ini")
# Create blob_service
blob_service = BlockBlobService(
account_name=config["blob-store"]["blob_account_name"],
account_key=config["blob-store"]["blob_account_key"],
)
Setup Spark¶
In order to connect to Azure Blob Storage with Spark, we need to download two JARS (hadoop-azure-2.7.3.jar
and azure-storage-6.1.0.jar
) and add them to the Spark configuration. I chose these specific versions since they were the only ones working with reading data using Spark 2.4.0. Additionally, the fs.azure
needs to be set to the Azure FileSytem and fs.azure.account.key.<youraccountname>.blob.core.windows.net
should contain the account key.
def setup_spark(config):
""" Setup Spark to connect to Azure Blob Storage """
jars = [
"spark-2.4.0-bin-hadoop2.7/jars/hadoop-azure-2.7.3.jar",
"spark-2.4.0-bin-hadoop2.7/jars/azure-storage-6.1.0.jar",
]
conf = (
SparkConf()
.setAppName("Spark Blob Test")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.set(
f"fs.azure.account.key.{config['blob-store']['blob_account_name']}.blob.core.windows.net",
config["blob-store"]["blob_account_key"],
)
)
sc = SparkContext(conf=conf).getOrCreate()
return SQLContext(sc)
# Create Spark context
sql_context = setup_spark(config)
# Create dataframe
df = pd.DataFrame.from_dict(
[("Mario", "Red"), ("Luigi", "Green"), ("Princess", "Pink")]
).rename(columns={0: "name", 1: "color"})
print(df.head())
Using PyArrow with Pandas it is easy to write a dataframe to Blob Storage. Convert the Pandas dataframe into Parquet using a buffer and write the buffer to a blob.
def write_pandas_dataframe_to_blob(blob_service, df, container_name, blob_name):
""" Write Pandas dataframe to blob storage """
buffer = BytesIO()
df.to_parquet(buffer)
blob_service.create_blob_from_bytes(
container_name=container_name, blob_name=blob_name, blob=buffer.getvalue()
)
# Write to blob using pyarrow
write_pandas_dataframe_to_blob(blob_service, df, config['blob-store']['blob_container'], BLOB_NAME)
Reading data back from blob using Pandas is identical. The data is read into a ByteStream
from the blob storage.
def get_pandas_dataframe_from_parquet_on_blob(blob_service, container_name, blob_name):
""" Get a dataframe from Parquet file on blob storage """
byte_stream = BytesIO()
try:
blob_service.get_blob_to_stream(
container_name=container_name, blob_name=blob_name, stream=byte_stream
)
df = pq.read_table(source=byte_stream).to_pandas()
finally:
byte_stream.close()
return df
# Read from blob using pyarrow
rdf = get_pandas_dataframe_from_parquet_on_blob(
blob_service, config['blob-store']['blob_container'], BLOB_NAME
)
print(rdf.head())
Reading the data using Spark for a single file Parquet blob is done using the following function. Note the path that uses the wasbs
protocol.
def get_pyspark_dataframe_from_parquet_on_blob(config, sql_context, container_name, blob_name):
""" Get a dataframe from Parquet file on blob storage using PySpark """
path = f"wasbs://{container_name}@{config['blob-store']['blob_account_name']}.blob.core.windows.net/{blob_name}"
return sql_context.read.parquet(path)
# Read from blob using PySpark
sdf = get_pyspark_dataframe_from_parquet_on_blob(
config, sql_context, config['blob-store']['blob_container'], BLOB_NAME
)
print(sdf.show())