This topic describes how to use PyJindo in Python 3.6 or later to access OSS-HDFS in different ways. You can use the PyJindo package or the fsspec interface to access OSS-HDFS.
Background information
If you use the PyJindo package to access OSS-HDFS, the native interfaces of the package are used. This method depends on the API of PyJindo and requires an in-depth understanding of the specific functions and classes of the PyJindo package.
If you use the fsspec interface to access OSS-HDFS, the JindoOssFileSystem class in the PyJindo package is used. The class conforms to the fsspec protocol. This method is convenient and flexible for users who are familiar with fsspec or want to seamlessly switch between different storage systems.
The methods use different interface styles and integration manners and are both effective in accessing OSS-HDFS. You can select a method to access OSS-HDFS based on your business requirements.
Prerequisites
A cluster is created, and you have logged on to the cluster. For more information, see Create a cluster and Log on to a cluster.
Method 1: Use the PyJindo package
You can use the native interfaces and classes of the API of PyJindo to perform complex operations on OSS-HDFS. For information about log levels and API-related information, see Log levels and API description.
Step 1: Install PyJindo
EMR V5.17.X or a later minor version and EMR V3.51.X or a later minor version
PyJindo in Python 3.8 is pre-installed in clusters of EMR V5.17.X or a later minor version and clusters of EMR V3.51.X or a later minor version. Therefore, you do not need to manually install PyJindo. You can skip this step.
A minor version earlier than EMR V5.17.X or EMR V3.51.X
Download a tar.gz package that is of the latest version.
In this example, a tar.gz package whose version is 6.3.2 is downloaded. Example: jindosdk-6.3.2-linux.tar.gz.
Decompress the downloaded tar.gz package and find the desired installation package of PyJindo in the following directory structure.
In this example, Python 3.8 is used, and the installation package of PyJindo is pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl.
. ├── bin │ ├── xxx ├── conf │ ├── xxx ├── include │ ├── xxx ├── lib │ ├── xxx │ ├── native │ │ ├── xxxx │ └── site-packages │ ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl │ └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl ├── plugins │ └── xxxx ├── tools │ ├── xxx └── versions ├── xxxUpload the pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl package to a desired server.
In this example, upload the package to the home directory of your EMR cluster. For information about how to log on to the master node of a cluster, see Log on to a cluster.
Optional. Confirm environment variables.
EMR environment: By default, the following environment variables are configured. You do not need to make additional configurations.
Environment other than EMR: Configure environment variables by referring to Deploy JindoSDK in an environment other than EMR. The Hadoop configuration file and HADOOP_CONF_DIR are required only for an environment that is compatible with Hadoop.
export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
Install and upgrade pip and the PyJindo installation package.
python3.8 -m ensurepip python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/ python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl
Step 2: Write and execute a test file
Write a test file named fs_test.py.
from pyjindo import fs # The name of an Object Storage Service (OSS) bucket. Replace the name based on your business requirements. bucket = "jindosdk-****" # The endpoint of OSS-HDFS. Replace the endpoint based on your business requirements. endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com" root_path = "oss://" + endpoint + "/" sub_dir = root_path + "pyjindotest/" file_path = root_path + "hello.txt" file_path2 = sub_dir + "hello.txt" config = fs.read_config() fs = fs.connect(root_path, "root", config) # Invoke the fs.open() function to open a file in a specific path in binary write mode. If the file does not exist, the system automatically creates a file. out_file = fs.open(file_path, "wb") # Write data. out_file.write(str.encode("hello world, pyjindo")) out_file.close() in_file = fs.open(file_path, "rb") # Read all data of the file and save the data in the variable data. data = in_file.read() print("The written data is %s." % (data)) in_file.close() # List all files. ls_file = fs.listdir(root_path) print("The files in the directory are %s." % (ls_file)) # Create a directory. fs.mkdir(sub_dir) # Move and rename the file. fs.rename(file_path, file_path2) # List all files. mv_file = fs.listdir(sub_dir) print("After the file is moved, the new file is %s." % (mv_file)) # Delete the test file and relist files. fs.remove(file_path2) de_file = fs.listdir(sub_dir) print("After the test file is deleted, the files in the pyjindotest directory are %s." % (de_file))Execute the test file.
python3.8 fs_test.pyResults:
The written data is b'hello world, pyjindo'. The files in the directory are [<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt': type=File, size=20>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/': type=Directory>, and <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/': type=Directory>]. After the file is moved, the file in the directory is [<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt': type=File, size=20>]. After the test file is deleted, the file in the pyjindotest directory is [].
Method 2: Use the fsspec interface
You can use the JindoOssFileSystem class that is integrated into the fsspec interface to interact with Alibaba Cloud OSS-HDFS in a convenient manner. For information about log levels and API-related information, see Log levels and API description. For more information about interfaces, see fsspec.
Step 1: Install PyJindo
EMR V5.17.X or a later minor version and EMR V3.51.X or a later minor version
PyJindo in Python 3.8 is pre-installed in clusters of EMR V5.17.X or a later minor version and clusters of EMR V3.51.X or a later minor version. Therefore, you do not need to manually install PyJindo. You can skip this step.
A minor version earlier than EMR V5.17.X or EMR V3.51.X
Download a tar.gz package that is of the latest version.
In this example, a tar.gz package whose version is 6.3.2 is downloaded. Example: jindosdk-6.3.2-linux.tar.gz.
Decompress the downloaded tar.gz package and find the desired installation package of PyJindo in the following directory structure.
In this example, Python 3.8 is used, and the installation package of PyJindo is pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl.
. ├── bin │ ├── xxx ├── conf │ ├── xxx ├── include │ ├── xxx ├── lib │ ├── xxx │ ├── native │ │ ├── xxxx │ └── site-packages │ ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl │ └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl ├── plugins │ └── xxxx ├── tools │ ├── xxx └── versions ├── xxxUpload the pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl package to a desired server.
In this example, upload the package to the home directory of your EMR cluster. For information about how to log on to the master node of a cluster, see Log on to a cluster.
Optional. Confirm environment variables.
EMR environment: By default, the following environment variables are configured. You do not need to make additional configurations.
Environment other than EMR: Configure environment variables by referring to Deploy JindoSDK in an environment other than EMR. The Hadoop configuration file and HADOOP_CONF_DIR are required only for an environment that is compatible with Hadoop.
export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
Install and upgrade pip and the PyJindo installation package.
python3.8 -m ensurepip python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/ python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl
Step 2: Install fsspec
In this example, fssepc is installed in Python 3.8. Run the following command to install fssepc:
python3.8 -m pip install fsspec --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/Step 3: Write and execute a test file
Write a test file named ossfs_test.py.
from pyjindo.ossfs import JindoOssFileSystem # The name of an OSS bucket. Replace the name based on your business requirements. bucket = "jindosdk-****" # The endpoint of OSS-HDFS. Replace the endpoint based on your business requirements. endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com" root_path = "oss://" + endpoint + "/" sub_dir = root_path + "pyjindotest/" file_path = root_path + "hello.txt" file_path2 = sub_dir + "hello.txt" fs = JindoOssFileSystem(root_path) # Invoke the fs.open() function to open a file in a specific path in binary write mode. If the file does not exist, the system automatically creates a file. out_file = fs.open(file_path, "wb") # Write data. out_file.write(str.encode("hello world, pyjindo")) out_file.close() in_file = fs.open(file_path, "rb") # Read all data of the file and save the data in the variable data. data = in_file.read() print("The written data is %s." % (data)) in_file.close() # List all files. ls_file = fs.ls(root_path, detail=False) print("The files in the directory are %s." % (ls_file)) assert file_path in fs.glob(root_path + "*") # Create a directory. fs.mkdir(sub_dir) # Move and rename the file. fs.rename(file_path, file_path2) # List all files. mv_file = fs.listdir(sub_dir, detail=False) print("After the file is moved, the new file is %s." % (mv_file)) # Delete the test file and relist files. fs.rm(file_path2) de_file = fs.ls(sub_dir) print("After the test file is deleted, the files in the pyjindotest directory are %s." % (de_file))Execute the test file.
python3.8 ossfs_test.pyResults:
The written data is b'hello world, pyjindo'. The files in the directory are ['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/test/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/', and 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/']. After the file is moved, the file in the directory is ['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt']. After the test file is deleted, the file in the pyjindotest directory is [].
Log levels
In an EMR environment, if you want to adjust the configurations of the jindosdk.cfg file in JINDOSDK_CONF_DIR, go to the /etc/taihao-apps/jindosdk-conf directory.
[common]
logger.dir = /var/log/emr/jindosdk
logger.level = 2
logger.verbose = 0
logger.sync = false
logger.jnilogger = true
logger.consolelogger = false
logger.cleaner.enable = trueConfiguration item | Description |
logger.dir | The directory in which logs are stored. |
logger.level | The log level. We recommend that you set the value to 2, which indicates that only logs of the INFO level or a higher level are recorded. If you specify a value that is less than or equal to 1, logs of the WARN level or a higher level are recorded. |
logger.verbose | The level of detailed logs. A larger value indicates more detailed information in logs. Valid values: 0 to 99. |
logger.sync | Specifies whether to print logs in synchronous mode. We recommend that you set the value to false, which indicates that the logs are not printed in synchronous mode. |
logger.jnilogger | Log settings related to Java Native Interface (JNI). This parameter is unrelated to PyJindo. |
logger.consolelogger | Specifies whether to print logs on the terminal. In most cases, this parameter is used for debugging and is unrelated to PyJindo. |
logger.cleaner.enable | Specifies whether to enable the automatic log cleanup feature. If you enable the feature, historical log files are periodically cleared. This prevents log files from occupying a large volume of disk space. We recommend that you set the value to true. |
API description
Config class
Member function | Return value type | Description |
set(key, val) | N/A | Sets a configuration item of the string type. The values specified by key and val are strings. |
get(key, default='') | str | Obtains the value of a string-type configuration item. |
contains(key) | bool | Checks whether a specific key exists in configurations. |
FileType enumeration
Enumeration type | Enumeration value | Description |
Unknown | 0 | Files of an unknown type or unrecognized files. |
Directory | 1 | Directories. |
File | 2 | Files. |
Symlink | 3 | Symbolic links. |
FileInfo class
Member property | Return value type | Description |
type | FileType | The file type. |
is_file | bool | Indicates whether the object is a file. |
is_dir | bool | Indicates whether the object is a directory. |
is_symlink | bool | Indicates whether the object is a symbolic link. |
path | str | The full path of a file. |
user | str | The name of the file owner. |
group | str | The name of the user group to which a file belongs. |
size | int | The file size. |
perm | int | The permission bits of a file. |
atime | datetime | The time when a file was last accessed. |
mtime | datetime | The time when a file was last modified. |
FileStream class
Member function | Return value type | Description |
readable() | bool | Indicates whether you can read data from a file stream. |
writable() | bool | Indicates whether you can write data to a file stream. |
seekable() | bool | Indicates whether a file stream is seekable. If the file stream is seekable, you can invoke the seek() function to change the current position at which data is read or written in a file. |
closed() | bool | Indicates whether a file stream is closed. |
close() | N/A | Closes a file stream. If the operation fails, IOError is thrown. |
size() | int | Obtains the file size. This function is available only if a file is readable. If the operation fails, IOError is thrown. |
tell() | int | Obtains the location of a file stream. If the operation fails, IOError is thrown. |
flush() | N/A | Forcefully flushes data in the buffer to a file. If the operation fails, IOError is thrown. |
write(data) | N/A | Receives data of the bytes type and writes the data to a file. If the operation fails, IOError is thrown. |
read(nbytes) | bytes | Reads data of a size specified by nbytes from a file and returns an object of the bytes type. If the operation fails, IOError is thrown. |
pread(nbytes, offset) | bytes | Reads data of a size specified by nbytes starting at a specific offset from a file. If the operation fails, IOError is thrown. |
readall() | bytes | Reads all data of a file. If the operation fails, IOError is thrown. |
download(stream_or_path, buffer_size) | N/A | Downloads data from a file stream and writes the data to an on-premises path or a desired stream. The stream_or_path parameter specifies an on-premises path or another file stream. If the operation fails, IOError is thrown. |
upload(stream, buffer_size) | N/A | Reads data from a specified stream and writes the data to a file. If the operation fails, IOError is thrown. |
FileSystem class
Member function | Return value type | Description |
mkdir(path, recursive) | bool | Creates a directory in the specified path. If the recursive parameter is set to True, all parent directories that do not exist are recursively created. If the operation fails, IOError is thrown. |
rename(src, dest) | bool | Renames the files or directories in the source path when the files or directories are moved to the destination path. If the operation fails, IOError is thrown. |
get_file_info(path) | FileInfo | Obtains the details of a file in the specified path. If the operation fails, IOError is thrown. |
exists(path) | bool | Checks whether a file or directory in the specified path exists. If the operation fails, IOError is thrown. |
listdir(path, recursive) | FileInfo list | Lists files or subdirectories in the specified path. If the recursive parameter is set to True, files in all subdirectories are recursively listed. If the operation fails, IOError is thrown. |
chmod(path, perm) | bool | Changes the permissions on a file or directory in the specified path. This function is similar to setPermission. The perm parameter specifies an octal permission code, such as 0o777. If the operation fails, IOError is thrown. |
chown(path, owner, group) | bool | Changes the owner and group of a file or directory in the specified path. This function is similar to setOwner. The owner and group parameters specify the new username and user group name. If the operation fails, IOError is thrown. |
open(path, mode, buffer_size=None) | FileStream | Opens a file in the specified path. You can set the mode parameter to |
download(path, stream_or_path, buffer_size=None) | N/A | Downloads a file in a specified path from a remote file system to your on-premises machine. The stream_or_path parameter specifies an on-premises file path or a file stream. By default, the buffer_size parameter is set to 64 KB. If you configure the buffer_size and fs.oss.read.buffer.size parameters at the same time, the value of the fs.oss.read.buffer.size parameter takes effect. If the operation fails, IOError is thrown. |
upload(path, stream, buffer_size=None) | N/A | Uploads the specified stream to the specified path in the remote file system. By default, the buffer_size parameter is set to 64 KB. If you configure the buffer_size and fs.oss.write.buffer.size parameters at the same time, the value of the fs.oss.write.buffer.size parameter takes effect. If the operation fails, IOError is thrown. |
copy_file(src, dest, buffer_size=None) | N/A | Copies a file in the path specified by the src parameter to the path specified by the dest parameter. By default, the buffer_size parameter is set to 64 KB. If you configure the buffer_size and fs.oss.read.buffer.size parameters at the same time, the value of the fs.oss.read.buffer.size parameter takes effect. If the operation fails, IOError is thrown. |
fs module
Global function | Return value type | Description |
read_config() | Config | Reads configurations. After you invoke the function, the system checks whether the |
connect(uri, user, config) | FileSystem | Initializes the FileSystem. If the operation fails, IOError is thrown. |