このトピックでは、Alibaba Cloud EMR クラスターを例として、Alibaba Cloud Jindo DistCp を使用して Hadoop 分散ファイルシステム (HDFS) から Object Storage Service (OSS) にデータを移行する方法について説明します。
背景情報
従来のビッグデータ環境では、HDFS は大規模データの基盤ストレージとしてよく使用されます。データ移行とコピーのための最も一般的なツールは、Hadoop に含まれる DistCp ツールです。しかし、このツールは OSS の特徴を十分に活用できず、効率が低く、データ整合性を保証できません。さらに、このツールは機能的なオプションが少なく、ユーザーの要件を満たすことができません。
Alibaba Cloud Jindo DistCp は、大規模クラスター内またはクラスター間でファイルをコピーする分散ファイルコピーツールです。Jindo DistCp は、ファイルの配布、エラー処理、および回復に MapReduce を使用します。ファイルとディレクトリのリストを MapReduce タスクの入力として使用します。各タスクは、ソースリストからファイルの一部をコピーします。HDFS クラスター間、HDFS と OSS 間、および OSS バケット間のデータコピーシナリオを完全にサポートします。また、さまざまなカスタムコピーパラメーターと戦略も提供します。
Hadoop DistCp と比較して、Alibaba Cloud Jindo DistCp を使用して HDFS から OSS にデータを移行すると、次の利点があります。
高効率。テストシナリオでは最大 1.59 倍の速度を達成できます。
豊富な基本機能。複数のコピーメソッドとシナリオベースの最適化戦略を提供します。
OSS との緊密な統合。データアーカイブやファイル圧縮などの操作を提供します。
No-Rename コピー。これにより、データ整合性が保証されます。
包括的なシナリオ。Hadoop DistCp を完全に置き換えることができ、現在 Hadoop 2.7+ および Hadoop 3.x をサポートしています。
前提条件
EMR V3.28.0 以降のバージョンを実行する EMR クラスターを作成済みであること。詳細については、「クラスターの作成」をご参照ください。
EMR V3.28.0 以降のバージョンを実行する EMR クラスターでは、Shell コマンドを介して Jindo DistCp を使用できます。詳細については、「Jindo DistCp 使用ガイド」をご参照ください。EMR V3.28.0 より前のバージョンを実行する EMR クラスターでは、互換性の問題が発生する可能性があります。チケットを送信することで解決できます。
セルフマネージド ECS クラスターを使用する場合、Hadoop 2.7+ または Hadoop 3.x 環境と、MapReduce ジョブを実行する機能が必要です。
ステップ 1: JAR パッケージのダウンロード
EMR クラスターにログインします。
EMR on ECS コンソールにログインします。
作成した EMR クラスターをクリックします。
[ノード管理] タブをクリックし、ノードグループの左側にある
をクリックします。ECS ID をクリックします。ECS インスタンスページで、インスタンス ID の右側にある [リモート接続] をクリックします。
キーペアまたはパスワードを使用して Secure Shell (SSH) プロトコルで Windows または Linux 環境からクラスターにログインする場合は、「クラスターへのログイン」をご参照ください。
最新バージョンの
Jindosdk-${version}.tar.gzパッケージをダウンロードして解凍します。ダウンロードリンクについては、「JindoData のダウンロード」をご参照ください。
ステップ 2: OSS AccessKey の構成
OSS AccessKey は、次のいずれかの方法で構成できます。
サンプルコマンドで AccessKey を構成する
hadoop jar jindo-distcp-tool-${version}.jar --src /tmp/ --dest oss://examplebucket/ --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --parallelism 10構成ファイルで AccessKey を事前構成する
Hadoop 構成ファイル core-site.xml があるディレクトリに移動します。
cd /etc/emr/hadoop-conf/EMR の一般的なファイルパスの詳細については、「一般的なファイルパス」をご参照ください。
core-site.xml ファイルを開きます。
vim core-site.xml次の構成を core-site.xml ファイルに追加します。
<configuration> <property> <name>fs.oss.accessKeyId</name> <value>LTAI****************</value> </property> <property> <name>fs.oss.accessKeySecret</name> <value>yourAccessKeySecret</value> </property> <property> <name>fs.oss.endpoint</name> <!-- Alibaba Cloud ECS 環境では、oss-cn-xxx-internal.aliyuncs.com などの内部 OSS エンドポイントを使用することをお勧めします --> <value>oss-cn-xxx.aliyuncs.com</value> </property> </configuration>
パスワードなしのログインを構成する
パスワードなしのログインを構成して、AccessKey をプレーンテキストで保存することを回避し、セキュリティを向上させることができます。詳細については、「JindoFS SDK のパスワードなし機能を使用する」をご参照ください。
ステップ 3: データの移行またはコピー
次のコマンドを実行して、HDFS 内のデータを表示します。
hdfs dfs -ls /次の結果が返されます。
Found 8 items drwxrwxrwx - admin supergroup 0 2023-10-26 10:55 /.sysinfo drwxrwxrwx - hadoop supergroup 0 2023-10-26 10:55 /apps drwxrwxrwx - root supergroup 0 2022-08-03 15:54 /data -rw-r----- 1 root supergroup 13 2022-08-25 11:45 /examplefile.txt drwxrwxrwx - spark supergroup 0 2023-10-26 14:49 /spark-history drwx-wx-wx - hive supergroup 0 2023-10-26 13:35 /tmp drwxrwxrwx - hive supergroup 0 2023-10-26 14:48 /user drwxrwxrwx - hadoop supergroup 0 2023-10-26 14:48 /yarnjindo-distjob-tool-${version}.jar パッケージがあるディレクトリに切り替えます。
cd /opt/apps/JINDOSDK/jindosdk-current/toolsHDFS から OSS にデータを移行します。
完全なデータ移行またはコピー
指定された HDFS ディレクトリ /tmp から宛先の OSS パス oss://examplebucket にデータの完全な移行またはコピーを実行するには、次のサンプルコマンドを実行します。
hadoop jar jindo-distcp-tool-${version}.jar --src /tmp/ --dest oss://examplebucket/ --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --parallelism 10次の表に、サンプルコマンドのパラメーターとオプションを示します。
パラメーターとオプション
説明
例
--src
移行またはコピーする HDFS データのソースパス。
/tmp/
--dest
移行またはコピーしたデータを格納する OSS の宛先パス。
oss://examplebucket/
--hadoopConf
OSS にアクセスする権限を持つ AccessKey ID、AccessKey Secret、およびエンドポイントを指定します。
AccessKey ID と AccessKey Secret の取得方法の詳細については、「AccessKey の作成」をご参照ください。
OSS がサポートするリージョンと対応するエンドポイントのリストについては、「エンドポイントとデータセンター」をご参照ください。
重要ECS 環境では、oss-cn-xxx-internal.aliyuncs.com などの内部 OSS エンドポイントを使用することをお勧めします。
--hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com
--parallelism
クラスターリソースに基づいてタスクの同時実行性を調整します。
10
増分データ移行またはコピー
最後の完全な移行またはコピーの後にソースパスの新しいデータのみをコピーする場合は、--update オプションを使用して増分データ移行またはコピーを実行できます。
次のサンプルコマンドは、指定された HDFS ディレクトリ /tmp から宛先の OSS パス oss://examplebucket/ にデータの増分移行またはコピーを実行します。
hadoop jar jindo-distcp-tool-${version}.jar --src /tmp/ --dest oss://examplebucket/ --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --update --parallelism 10--update オプションを使用すると、チェックサムがデフォルトで有効になります。チェックサムを有効にすると、DistCp はソースパスと宛先パスにあるファイルのファイル名、ファイルサイズ、およびチェックサムを比較します。ソースパスにあるファイルのファイル名、ファイルサイズ、またはチェックサムが宛先パスにあるファイルと異なる場合、増分移行またはコピータスクが自動的にトリガーされます。
ソースパスと宛先パスにあるファイルのチェックサムを比較する必要がない場合は、--disableChecksum オプションを追加してチェックサム検証を無効にできます。以下はサンプルコマンドです。
hadoop jar jindo-distcp-tool-${version}.jar --src /tmp/ --dest oss://examplebucket/ --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --update --disableChecksum --parallelism 10
付録 1: Jindo DistCp でサポートされているパラメーターとオプション
Jindo DistCp は一連のパラメーターとオプションを提供します。次のコマンドを実行して、各パラメーターとオプションの具体的な使用方法を取得できます。
hadoop jar jindo-distcp-tool-${version}.jar --help次の表に、パラメーター、オプション、およびそのサンプル値を示します。
パラメーターとオプション | 説明 | サンプル値 |
--src | コピーのソースパスを指定します。 | --src oss://examplebucket/sourcedir |
--dest | コピーの宛先パスを指定します。 | --dest oss://examplebucket/destdir |
--bandWidth | DistCp タスクで使用される単一マシンの帯域幅を指定します。単位: MB。 | --bandWidth 6 |
--codec | ファイルの圧縮メソッドを指定します。現在のバージョンは、gzip、gz、lzo、lzop、および snappy コーデック、およびキーワード none と keep をサポートしています。キーワードは次のように説明されます。
説明 オープンソースの Hadoop クラスター環境で lzo 圧縮メソッドを使用するには、gplcompression ネイティブライブラリと hadoop-lzo パッケージがインストールされていることを確認してください。必要な環境が利用できない場合は、他の圧縮メソッドを使用することをお勧めします。 | --codec gz |
--policy | OSS にコピーされた後のファイルのストレージタイプを指定します。有効な値:
| --policy coldArchive |
--filters | ファイルパスを指定します。このファイルの各行は、DistCp タスクでコピーまたは比較する必要がないファイルに対応する正規表現を構成します。 | --filters test.txt |
--srcPrefixesFile | コピーするファイルのリストを指定します。リスト内のファイルには、src パスのプレフィックスが付きます。 | --srcPrefixesFile prefixes.txt |
--parallelism | MapReduce タスクの mapreduce.job.reduces パラメーターを指定します。クラスターリソースに基づいて parallelism の値をカスタマイズして、distcp タスクの同時実行性を制御できます。 説明 EMR 環境でのこのパラメーターのデフォルト値は 7 です。 | --parallelism 20 |
--tmp | DistCp ツールを使用するときに一時ファイルを格納するディレクトリを指定します。 | --tmp /tmp |
--hadoopConf | OSS にアクセスする権限を持つ AccessKey とエンドポイントを指定します。 | --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-xxx.aliyuncs.com |
--disableChecksum | チェックサム検証を無効にします。 | --disableChecksum |
--deleteOnSuccess | コピータスクが完了した後、ソースパス内のファイルが削除されることを指定します。 | --deleteOnSuccess |
--enableTransaction | デフォルトでは、JindoDistCp はタスクレベルの整合性を使用します。ジョブレベルの整合性とジョブ間のトランザクションサポートを確保するには、--enableTransaction パラメーターを使用できます。 | --enableTransaction |
--ignore | データ移行中に発生する例外を無視します。関連するエラーはタスクを中断せず、最終的に DistCp カウンターとして公開されます。--enableCMS が使用されている場合、通知も指定された方法で送信されます。 | -ignore |
--diff | 現在のコピータスクですべてのファイルがコピーされたかどうかを確認し、コピーされなかったファイルのリストを生成します。 | --diff |
--update | 増分同期機能を使用します。これは、最後の完全な移行またはコピーの後でソースパスに追加された新しいデータのみが宛先パスに同期されることを意味します。 | --update |
--preserveMeta | Owner、Group、Permission、Atime、Mtime、Replication、BlockSize、XAttrs、ACL などのメタデータをデータとともに移行します。 | --preserveMeta |
--jobBatch | 各 distcp タスクによって処理されるファイル数を指定します。デフォルト値は 1000 です。 | --jobBatch 1000 |
--taskBatch | 各 distcp サブタスクによって処理されるファイル数を指定します。デフォルト値は 10 です。 | --taskBatch 10 |
付録 2: シナリオ例
シナリオ 1: JindoDistCp を使用してデータが正常に転送された後、データ整合性をどのように検証しますか?
JindoDistCp は、データ整合性を検証するために次の 2 つのメソッドを提供します。
メソッド 1: DistCp カウンター
DistCp カウンター情報の BYTES_EXPECTED や FILES_EXPECTED などのパラメーターを使用して、データ整合性を検証できます。
例 JindoDistcpCounter BYTES_COPIED=10000 BYTES_EXPECTED=10000 FILES_COPIED=11 FILES_EXPECTED=11 ... Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0次の表に、この例に含まれる可能性のあるカウンターパラメーターを示します。
パラメーター
説明
BYTES_COPIED
正常にコピーされたバイト数。
BYTES_EXPECTED
コピーが期待されるバイト数。
FILES_COPIED
正常にコピーされたファイル数。
FILES_EXPECTED
コピーが期待されるファイル数。
FILES_SKIPPED
増分コピー中にスキップされたファイル数。
BYTES_SKIPPED
増分コピー中にスキップされたバイト数。
COPY_FAILED
コピーに失敗したファイル数。値が 0 でない場合、アラートがトリガーされます。
BYTES_FAILED
コピーに失敗したバイト数。
DIFF_FILES
ソースパスと宛先パスの間で異なるファイル数。値が 0 でない場合、アラートがトリガーされます。
DIFF_FAILED
比較操作が異常だったファイル数。これは DIFF_FILES の数に含まれます。
SRC_MISS
ソースパスに存在しないファイル数。これは DIFF_FILES の数に含まれます。
DST_MISS
宛先パスに存在しないファイル数。これは DIFF_FILES の数に含まれます。
LENGTH_DIFF
ソースと宛先でサイズが一致しないファイル数。これは DIFF_FILES の数に含まれます。
CHECKSUM_DIFF
チェックサム検証に失敗したファイル数。これは COPY_FAILED の数に含まれます。
SAME_FILES
ソースパスと宛先パスで同一のファイル数。
メソッド 2: --diff オプションを使用する
サンプルコマンドで --diff オプションを使用して、ソースパスと宛先パスのファイル名とサイズを比較できます。
hadoop jar jindo-distcp-tool-${version}.jar --src /tmp/ --dest oss://examplebucket/ --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --diff
シナリオ 2: OSS に書き込まれたファイルを低頻度アクセス、アーカイブ、またはコールドアーカイブとして保存するには、どのパラメーターを使用すればよいですか?
サンプルコマンドに --policy オプションを追加して、OSS に書き込まれるファイルのストレージタイプを指定できます。次の例は、低頻度アクセスストレージタイプを指定する方法を示しています。
hadoop jar jindo-distcp-tool-${version}.jar --src /tmp/ --dest oss://examplebucket/ --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --policy ia --parallelism 10アーカイブストレージタイプを指定するには、--policy ia を --policy archive に置き換えます。コールドアーカイブストレージタイプを指定するには、--policy ia を --policy coldArchive に置き換えます。さらに、コールドアーカイブは一部のリージョンでのみサポートされています。詳細については、「コールドアーカイブ」をご参照ください。
シナリオ 3: 移行またはコピータスクが完了した後、宛先パスのデータのみを保持し、ソースパスから指定されたデータを削除するには、どのパラメーターを使用すればよいですか?
--deleteOnSuccess オプションを使用すると、移行またはコピータスクが完了した後に、宛先の OSS パスのデータのみを保持し、ソースの HDFS パスから指定されたデータを削除できます。
hadoop jar jindo-distcp-tool-${version}.jar --src /tmp/ --dest oss://examplebucket/ --hadoopConf fs.oss.accessKeyId=LTAI**************** --hadoopConf fs.oss.accessKeySecret=yourAccessKeySecret --hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --deleteOnSuccess --parallelism 10