Parquet files and data sets on a remote file system with Python's pyarrow library

enter image description here

As I mentioned in my previous blog post, while continuing working with Oracle and PL SQL, we are migrating some processes to Python using parquet files. A requirement related to Python and parquet files came up a short while ago and I thought it could be interesting.

So, in medias res; we want to be able to read and write single parquet files and partitioned parquet data sets on a remote server. Meaning, the processing server is not the same where the data is.

If you are not familiar with parquet files or how to read and write them with Python, a perfect start is to have a look at this and this.

Reading or writing a parquet file or partitioned data set on a local file system is relatively easy, we can just use the methods provided by the pyarrow library. The good news is that it’s also quite simple to do so using a remote file system. However, it took me a while to find a “better” solution.

First Glance, First Solution

When I started googling, I couldn’t really find an “exact” solution for this problem. Even the official documentation of the pyarrow library mentions only Hadoop file system as an example. Having seen this, I started reviewing the filesystem.py module from the pyarrow library. The file system classes there didn’t really meet my needs, so I thought fine, I’d create my own class inheriting from one of the existing classes here. S3FSWrapper seemed to be the closest to my needs based on its existing methods. The choice is not wide-ranged as there is only the local file system class, HDFS or S3FS (Amazon S3-compatible storage). So, after some tests and debugging I came up with a solution. As shown below, by overriding some of the inherited methods I created a file system class that worked. Depending on the version of the pyarrow library we may or may not need this new class to read a parquet data set. For instance, for pyarrow==0.17.1 we need to have it, for the - currently - latest pyarrow==1.0.1 we don’t need it. However, the class is always needed to write a parquet data set. The example shown below uses the latest version (v1.0.1) of pyarrow.

from pyarrow.filesystem import S3FSWrapper
import pyarrow.parquet as pq
import fsspec


class RemoteParquetFilesystem(S3FSWrapper):
    """The init parameter of this class is a file system object,
     for example an fsspec.implementations.sftp.SFTPFileSystem object.
     """

    def _isfilestore(self):
        """Has to return True so that pyarrow creates the folder if
        it does not exist.
        """
        return True

    def open(self, path, mode='rb', block_size=4096):
        """Adding block_size to speed up performance.
        Best number for block size may differ.
        """
        return self.fs.open(path, mode=mode, block_size=block_size)

    def walk(self, path):
        """
        Directory tree generator, like os.walk.
        Overwritten, because the original S3FSWrapper version is for S3FS.
        """
        return self.fs.walk(path)


# For reading we only need an fsspec.implementations.sftp.SFTPFileSystem object
fs_sftp = fsspec.filesystem('sftp', host='host',
                            port=22, username='username', password='password')
# Reading the COUNTRY_ID=PT partition of the parquet data set
pq_table = pq.read_table('/Path/of/parquet_dataset',
                         filters=[('COUNTRY_ID', '=', 'PT')], columns=None,
                         filesystem=fs_sftp)

# Creating an object of our custom file system class
fs = RemoteParquetFilesystem(fs_sftp)
# Writing the data into the parquet data set
pq.write_to_dataset(pq_table,
                    root_path='/Path/of/parquet_dataset',
                    partition_filename_cb=lambda x: '_'.join(x)+'.parquet',
                    partition_cols=['COUNTRY_ID'], filesystem=fs)

The partition_filename_cb parameter that I use in the example to write the parquet data set creates the parquet file always with the same name, the partition names joined by an underscore. In our case, PT.parquet. As we have only one partition level, there is nothing to be joined. Without using this parameter we would always generate a different parquet file in the partition folder with a unique uuid name.

Final Solution

I was about to apply this solution to the project, but I felt there should be a simpler one. If I need to provide an SFTPFileSystem object to my new class, why couldn’t I find a way to use simply an SFTPFileSystem object only? I found out I could, pretty easily. And I don’t even have to create a custom class inheriting from SFTPFileSystem.

The example below shows how to read and write a single parquet file on a remote file system.

import pyarrow as pa  
import pyarrow.parquet as pq  
import pandas as pd  
from fsspec.implementations.sftp import SFTPFileSystem  
  
  
# Creating a pandas dataframe that we will convert into a parquet table  
df = pd.DataFrame([{'title': 'David Bowie', 'year': 1969},  
                   {'title': 'The Man Who Sold the World', 'year': 1970},  
                   {'title': 'Hunky Dory', 'year': 1971}])  
# Converting the dataframe into a parquet table so that  
# we have something to write into a file  
pq_table = pa.Table.from_pandas(df)  
  
# Creating a remote file system object  
fs = SFTPFileSystem(host='hostname', port=22,  
                    username='username', password='password')  
  
# Writing the parquet file on remote  
pq.write_table(pq_table, '/tmp/new_parquet.parquet', filesystem=fs)  
  
# Reading the parquet file from remote  
df = pq.read_table('/tmp/new_parquet.parquet', filesystem=fs).to_pandas()  
  
# Checking whether it was read correctly  
print(df.head())

(Reading and writing a parquet file on a local file system would be exactly the same, except we don’t need the filesystem parameter.)

Reading and writing parquet data sets on a remote file system is also pretty easy with the SFTPFileSystem class, please check the example below.

import pyarrow.parquet as pq
from fsspec.implementations.sftp import SFTPFileSystem


# Creating an SFTPFileSystem object
fs = SFTPFileSystem(host='hostname', port=22,
                    username='username', password='password')

# Reading the COUNTRY_ID=PT partition of the parquet data set
pq_table = pq.read_table('/Path/of/parquet_dataset',
                         filters=[('COUNTRY_ID', '=', 'PT')], columns=None,
                         filesystem=fs)

# Writing the data into the parquet data set
pq.write_to_dataset(pq_table,
                    root_path='/Path/of/parquet_dataset',
                    partition_filename_cb=lambda x: '_'.join(x)+'.parquet',
                    partition_cols=['COUNTRY_ID'], filesystem=fs)

For the sake of simplicity I ignored proper exception handling throughout my examples.

My case shows how many ways there are usually to approach a “problem” and often times we don’t start with the proper one. With a bit of extra effort hopefully we can find a better solution which can prove to be profitable in the long term.

One thing is for sure; the sooner we stop, the sooner we can go and party hard.

Comments

Post a Comment