File Management¶
This section demonstrates various methods to work with files on BDS’ HDFS, see the individual framework’s documentation for details.
A Kerberos ticket is needed to connect to the BDS cluster. This authentication ticket can be obtained with the refresh_ticket()
method or with the use of the Vault and a BDSSercretKeeper
object. This section will demonstrate the use of the BDSSecretKeeper
object as this is more secure and is the preferred method.
FSSpec¶
The fsspec
or Filesystem Spec is an interface that allows access to local, remote, and embedded file systems. You use it to access data stored in the BDS’ HDFS. This connection is made with the WebHDFS protocol.
The fsspec
library must be able to access BDS so a Kerberos ticket must be generated. The secure and recommended method to do this is to use BDSSecretKeeper
that stores the BDS credentials in the vault not the notebook session.
This section outlines some common file operations, see the fsspec
API Reference for complete details on the features that are demonstrated and additional functionality.
Pandas and PyArrow can also use fsspec
to perform file operations.
Connect¶
Credentials and configuration information is stored in the vault. This information is used to obtain a Kerberos ticket and define the hdfs_config
dictionary. This configuration dictionary is passed to the fsspec.filesystem() method to make a connection to the BDS’ underlying HDFS storage.
import ads
import fsspec
from ads.secrets.big_data_service import BDSSecretKeeper
from ads.bds.auth import has_kerberos_ticket, krbcontext
ads.set_auth("resource_principal")
with BDSSecretKeeper.load_secret("<secret_id>") as cred:
with krbcontext(principal = cred["principal"], keytab_path = cred['keytab_path']):
hdfs_config = {
"protocol": "webhdfs",
"host": cred["hdfs_host"],
"port": cred["hdfs_port"],
"kerberos": "True"
}
fs = fsspec.filesystem(**hdfs_config)
Delete¶
Delete files from HDFS using the .rm() method. It accepts a path of the files to delete.
fs.rm("/data/biketrips/2020??-tripdata.csv", recursive=True)
Download¶
Download files from HDFS to a local storage device using the .get() method. It takes the HDFS path of the files to download, and the local path to store the files.
fs.get("/data/biketrips/20190[123456]-tripdata.csv", local_path="./first_half/", overwrite=True)
List¶
The .ls() method lists files. It returns the matching file names as a list.
fs.ls("/data/biketrips/2019??-tripdata.csv")
['201901-tripdata.csv',
'201902-tripdata.csv',
'201903-tripdata.csv',
'201904-tripdata.csv',
'201905-tripdata.csv',
'201906-tripdata.csv',
'201907-tripdata.csv',
'201908-tripdata.csv',
'201909-tripdata.csv',
'201910-tripdata.csv',
'201911-tripdata.csv',
'201912-tripdata.csv']
Upload¶
The .put() method is used to upload files from local storage to HDFS. The first parameter is the local path of the files to upload. The second parameter is the HDFS path where the files are to be stored.
.upload() is an alias of .put()
.
.. code-block:: python3
- fs.put(
lpath=”./first_half/20200[456]-tripdata.csv”, rpath=”/data/biketrips/second_quarter/”
)
Ibis¶
Ibis is an open-source library by Cloudera that provides a Python framework to access data and perform analytical computations from different sources. Ibis allows access to the data ising HDFS. You use the ibis.impala.hdfs_connect()
method to make a connection to HDFS, and it returns a handler. This handler has methods such as .ls()
to list, .get()
to download, .put()
to upload, and .rm()
to delete files. These operations support globbing. Ibis’ HDFS connector supports a variety of additional operations.
Connect¶
After obtaining a Kerberos ticket, the hdfs_connect()
method allows access to the HDFS. It is a thin wrapper around a fsspec file system. Depending on your system configuration, you may need to define the ibis.options.impala.temp_db
and ibis.options.impala.temp_hdfs_path
options.
import ibis
with BDSSecretKeeper.load_secret("<secret_id>") as cred:
with krbcontext(principal=cred["principal"], keytab_path=cred['keytab_path']):
hdfs = ibis.impala.hdfs_connect(host=cred['hdfs_host'], port=cred['hdfs_port'],
use_https=False, verify=False,
auth_mechanism='GSSAPI', protocol='webhdfs')
Delete¶
Delete files from HDFS using the .rm() method. It accepts a path of the files to delete.
hdfs.rm("/data/biketrips/2020??-tripdata.csv", recursive=True)
Download¶
Download files from HDFS to a local storage device using the .get() method. It takes the HDFS path of the files to download, and the local path to store the files.
hdfs.get("/data/biketrips/20190[123456]-tripdata.csv", local_path="./first_half/", overwrite=True)
List¶
The .ls() method lists files. It returns the matching file names as a list.
hdfs.ls("/data/biketrips/2019??-tripdata.csv")
['201901-tripdata.csv',
'201902-tripdata.csv',
'201903-tripdata.csv',
'201904-tripdata.csv',
'201905-tripdata.csv',
'201906-tripdata.csv',
'201907-tripdata.csv',
'201908-tripdata.csv',
'201909-tripdata.csv',
'201910-tripdata.csv',
'201911-tripdata.csv',
'201912-tripdata.csv']
Upload¶
Use the .put() method to upload files from local storage to HDFS. The first parameter is the HDFS path where the files are to be stored. The second parameter is the local path of the files to upload.
hdfs.put(rpath="/data/biketrips/second_quarter/",
lpath="./first_half/20200[456]-tripdata.csv",
overwrite=True, recursive=True)
Pandas¶
Pandas allows access to BDS’ HDFS system through FSSpec. This section demonstrates some common operations.
Connect¶
import ads
import fsspec
from ads.secrets.big_data_service import BDSSecretKeeper
from ads.bds.auth import has_kerberos_ticket, krbcontext
ads.set_auth("resource_principal")
with BDSSecretKeeper.load_secret("<secret_id>") as cred:
with krbcontext(principal = cred["principal"], keytab_path = cred['keytab_path']):
hdfs_config = {
"protocol": "webhdfs",
"host": cred["hdfs_host"],
"port": cred["hdfs_port"],
"kerberos": "True"
}
fs = fsspec.filesystem(**hdfs_config)
File Handle¶
You can use the fsspec
.open() method to open a data file. It returns a file handle. That file handle, f
, can be passed to any Pandas’ methods that support file handles. In this example, a file on a BDS’ HDFS cluster is read into a Pandas dataframe.
with fs.open("/data/biketrips/201901-tripdata.csv", "r") as f:
df = pd.read_csv(f)
URL¶
Pandas supports fsspec
so you can preform file operations by specifying a protocol string. The WebHDFS
protocol is used to access files on BDS’ HDFS system. The protocol string has this format:
webhdfs://host:port/path/to/data
The host
and port
parameters can be passed in the protocol string as follows:
df = pd.read_csv(f"webhdfs://{hdfs_config['host']}:{hdfs_config['port']}/data/biketrips/201901-tripdata.csv",
storage_options={'kerberos': 'True'})
You can also pass the host
and port
parameters in the dictionary used by the storage_options
parameter. The sample code for hdfs_config
defines the host and port with the keyes host
and port
respectively.
hdfs_config = {
"protocol": "webhdfs",
"host": cred["hdfs_host"],
"port": cred["hdfs_port"],
"kerberos": "True"
}
In this case, Pandas uses the following syntax to read a file on BDS’ HDFS cluster:
df = pd.read_csv(f"webhdfs:///data/biketrips/201901-tripdata.csv",
storage_options=hdfs_config)
PyArrow¶
PyArrow is a Python interface to Apache Arrow. Apache Arrow is an in-memory columnar analytical tool that is designed to process data at scale. PyArrow supports the fspec.filesystem()
through the use of the filesystem
parameter in many of its data operation methods.
Connect¶
Make a connection to BDS’ HDFS using fsspec
:
import ads
import fsspec
from ads.secrets.big_data_service import BDSSecretKeeper
from ads.bds.auth import has_kerberos_ticket, krbcontext
ads.set_auth("resource_principal")
with BDSSecretKeeper.load_secret("<secret_id>") as cred:
with krbcontext(principal = cred["principal"], keytab_path = cred['keytab_path']):
hdfs_config = {
"protocol": "webhdfs",
"host": cred["hdfs_host"],
"port": cred["hdfs_port"],
"kerberos": "True"
}
fs = fsspec.filesystem(**hdfs_config)
Filesystem¶
The following sample code shows several different PyArrow methods for working with BDS’ HDFS using the filesystem
parameter:
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
ds = ds.dataset("/path/on/BDS/HDFS/data.csv", format="csv", filesystem=fs)
pq.write_table(ds.to_table(), '/path/on/BDS/HDFS/data.parquet', filesystem=fs)
import pandas as pd
import numpy as np
idx = pd.date_range('2022-01-01 12:00:00.000', '2022-03-01 12:00:00.000', freq='T')
df = pd.DataFrame({
'numeric_col': np.random.rand(len(idx)),
'string_col': pd._testing.rands_array(8,len(idx))},
index = idx
)
df["dt"] = df.index
df["dt"] = df["dt"].dt.date
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path="/path/on/BDS/HDFS", partition_cols=["dt"],
flavor="spark", filesystem=fs)