Using SFTP with Spark
Pre-req¶
- Setup a simple SFTP server on your (Ubuntu) machine.
- Add a file with the following content:
$ cat /home/ftpuser/data/sample.psv
title|name|age
mr|john doe|34
mrs|jane doe|30
- ZIP the PSV file.
$ zip /home/ftpuser/data/testpsv.zip /home/ftpuser/data/sample.psv
- Have Spark installed
- Have the JAR from https://github.com/springml/spark-sftp available in your Spark JAR directory.
Setup context¶
In [2]:
# !wget https://repo1.maven.org/maven2/com/springml/sftp.client/1.0.3/sftp.client-1.0.3.jar
In [3]:
# !wget https://repo1.maven.org/maven2/com/springml/spark-sftp_2.10/1.0.2/spark-sftp_2.10-1.0.2.jar
In [4]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
os.environ['HADOOP_HOME'] = '/opt/hadoop/'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'
os.environ['PYSPARK_PYTHON'] = 'python3'
os.environ['LD_LIBRARY_PATH'] = '/opt/hadoop/lib/native'
os.environ['SPARK_DIST_CLASSPATH'] = "/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*"
os.environ['SPARK_HOME'] = '/opt/spark/'
conf = (
SparkConf()
.setAppName("Spark SFTP Test")
.set("spark.hadoop.fs.sftp.impl", "org.apache.hadoop.fs.sftp.SFTPFileSystem")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)
Read a file directly into a dataframe¶
In [5]:
df = sqlContext\
.read\
.format("com.springml.spark.sftp")\
.option("host", os.environ.get('FTP_HOST'))\
.option("username", os.environ.get('FTP_USER'))\
.option("password", os.environ.get('FTP_PASS'))\
.option("fileType", "csv")\
.option("delimiter", "|")\
.option("quote", "\"")\
.option("escape", "\\")\
.option("multiLine", "true")\
.option("inferSchema", "false")\
.option("header", "false")\
.load("/data/sample.psv")\
.show()
Read a file as binary file and transform into a dataframe¶
In [6]:
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField("first_name", StringType()),
StructField("last_name", StringType()),
StructField("age", StringType())
])
file_path = 'data/sample.psv'
psv_df = sc\
.binaryFiles(f"sftp://{os.environ.get('FTP_USER')}:{os.environ.get('FTP_PASS')}@{os.environ.get('FTP_HOST')}/{file_path}")\
.map(lambda row: row[1].decode('utf-8').strip())\
.flatMap(lambda row: str(row).split('\n'))\
.map(lambda row: str(row).split('|'))\
.toDF(schema=schema)\
.show()
Extract a ZIP archive and parse the content into a dataframe¶
The following snippet extracts a ZIP file in memory and returns the content of the first file.
In [7]:
import io
import zipfile
def zip_extract(row):
file_path, content = row
zfile = zipfile.ZipFile(io.BytesIO(content), "r")
files = [i for i in zfile.namelist()]
return zfile.open(files[0]).read().decode("utf-8", errors='ignore')
file_path = 'data/testpsv.zip'
psv_df = sc\
.binaryFiles(f"sftp://{os.environ.get('FTP_USER')}:{os.environ.get('FTP_PASS')}@{os.environ.get('FTP_HOST')}/{file_path}")\
.map(zip_extract)\
.map(lambda row: row.strip())\
.flatMap(lambda row: str(row).split('\n'))\
.map(lambda row: str(row).split('|'))\
.toDF(schema=schema)\
.show()