DataWorksは、Hadoop Distributed File System ( HDFS ) データソースとのデータの読み取りと書き込みを行うためのHDFS ReaderとHDFS Writerを提供しています。このトピックでは、HDFSデータソースとのデータ同期機能について説明します。
サポートされているHDFSバージョン
Alibaba Cloud Apsara File Storage for HDFSはサポートされていません。
制限事項
バッチデータ読み取り
HDFS Readerを使用する場合は、以下の点に注意してください。
共有リソースグループとHDFS間で複雑なネットワーク接続が必要になります。そのため、同期タスクを実行するには、Data Integration専用リソースグループを使用することをお勧めします。 Data Integration専用リソースグループがHDFSのNameNodeノードとDataNodeノードにアクセスできることを確認してください。
デフォルトでは、HDFSはデータセキュリティを確保するためにネットワークホワイトリストを使用します。この場合、HDFS Readerを使用する同期タスクを実行するには、Data Integration専用リソースグループを使用することをお勧めします。
コードエディタを使用してHDFS Readerを使用する同期タスクを設定する場合、使用するHDFSデータソースのネットワーク接続テストは不要です。システムが接続テストでエラーを報告した場合、エラーは無視できます。
データ同期タスクを開始するには、管理者アカウントを使用する必要があります。管理者アカウントに、関連するHDFSファイルのデータを読み取りおよび書き込みする権限があることを確認してください。管理者アカウントに権限がない場合は、コードエディタを使用してデータ同期タスクを設定し、タスクのコードに設定
"hdfsUsername": "Authorized account" // 認証済みアカウントを追加できます。
HDFS Readerは、次の機能をサポートしています。
text、ORC、RC、Sequence、CSV、およびParquetファイル形式をサポートしています。これらの形式のファイルに格納されているデータは、論理的な2次元テーブルとして編成する必要があります。
さまざまな型のデータを文字列として読み取り、定数とカラムプルーニングをサポートします。
アスタリスク(
*)と疑問符(?)を含む再帰的な読み取りと正規表現をサポートします。ORCファイルをSnappyまたはZLIB形式で圧縮します。
SequenceファイルをLZO形式で圧縮します。
複数のファイルからデータを並列スレッドで読み取ります。
CSV ファイルを GZIP、BZ2、ZIP、LZO、LZO_DEFLATE、または Snappy 形式で圧縮します。
JDK 1.6で動作するHive 1.1.1およびHadoop 2.7.1をサポートしています。HDFS Readerは、テスト中にHive 1.2.0およびHadoop 2.5.0またはHadoop 2.6.0で正常に実行できます。
HDFS Readerは、内部のシャーディング方法のため、並列スレッドを使用して単一のファイルを読み取ることはできません。
バッチデータ書き込み
HDFS Writerを使用する場合は、以下の点に注意してください。
HDFS Writerは、論理的な2次元テーブルを格納するtext、ORC、およびParquetファイルのみをHDFSに書き込むことができます。
HDFSは分散ファイルシステムであり、スキーマがありません。そのため、ファイルの一部の列のデータのみをHDFSに書き込むことはできません。
DECIMAL、BINARY、ARRAYS、MAPS、STRUCTS、UNIONなどのHiveデータ型はサポートされていません。
HDFS Writerは、一度にパーティション化されたHiveテーブルの1つのパーティションにのみデータを書き込むことができます。
テキストファイルをHDFSに書き込むには、ファイル内の区切り文字が、関連付けるHiveテーブル内の区切り文字と同じであることを確認してください。このようにして、HDFSに書き込まれたファイルの列をHiveテーブルの列に関連付けることができます。
Hive 1.1.1およびHadoop 2.7.1 ( JDKバージョン: 1.7 ) がインストールされている環境でHDFS Writerを使用できます。JDKはJava Development Kitの略です。HDFS Writerは、Hive 1.2.0およびHadoop 2.5.0またはHadoop 2.6.0がインストールされているテスト環境でHDFSにファイルを書き込むことができます。
HDFS Writerは、Data Integration専用リソースグループのみをサポートしています。
仕組み
HDFS Writerは、次の方法でHDFSにファイルを書き込みます。
指定したパス パラメータに基づいて、HDFSに存在しない一時ディレクトリを作成します。
一時ディレクトリは、path_ランダムなサフィックスの形式で指定されます。
リーダーから取得したファイルを一時ディレクトリに書き込みます。
すべてのファイルが書き込まれた後、ファイルを一時ディレクトリから指定されたディレクトリに移動します。HDFSに書き込むファイルの名前は、HDFSに既存のファイルの名前と異なる必要があります。
一時ディレクトリを削除します。ネットワークの中断が原因でHDFS WriterがHDFSに接続できない場合は、一時ディレクトリと一時ディレクトリ内のすべてのファイルを 手動で 削除する必要があります。
データを同期するには、特定のファイルに対する読み取りおよび書き込み権限を持つ管理者アカウントを使用する必要があります。
データ型マッピング
バッチデータ読み取り
Hiveはファイルのメタデータを保持し、MySQLデータベースなどの独自のメタデータベースにメタデータを格納します。HDFS Readerは、Hiveのメタデータベース内のメタデータにアクセスしたり、クエリを実行したりすることはできません。そのため、変換するデータ型を指定する必要があります。
次の表に、HDFS ReaderがHiveのRC、Parquet、ORC、text、およびSequenceファイルのデータ型を変換するデータ型マッピングを示します。
カテゴリ | Data Integrationデータ型 | Hiveデータ型 |
整数 | long | TINYINT、SMALLINT、INT、およびBIGINT |
浮動小数点 | double | FLOATおよびDOUBLE |
文字列 | string | STRING、CHAR、VARCHAR、STRUCT、MAP、ARRAY、UNION、およびBINARY |
日付と時刻 | date | DATEおよびTIMESTAMP |
BOOLEAN | boolean | boolean |
LONG: HDFSファイルの整数型のデータ。例: 123456789。
DOUBLE: HDFSファイルの浮動小数点型のデータ。例: 3.1415。
BOOLEAN: HDFSファイルのブール型のデータ。例: trueまたはfalse。データは大文字と小文字を区別しません。
DATE: HDFSファイルの日付と時刻型のデータ。例: 2014-12-31 00:00:00。
HiveでサポートされているTIMESTAMPデータ型は、ナノ秒まで正確にすることができます。そのため、textファイルとORCファイルに格納されているTIMESTAMP型のデータは、2015-08-21 22:40:47.397898389のようになります。データがData IntegrationのDATE型に変換されると、データのナノ秒部分が失われます。そのため、変換されたデータの型をSTRINGに指定して、変換後もデータのナノ秒部分が保持されるようにする必要があります。
バッチデータ書き込み
HDFS Writerは、text、ORC、またはParquetファイルをHDFSの指定されたディレクトリに書き込むことができます。ファイルの列をHiveテーブルの列に関連付けることができます。HDFS Writerは、ほとんどのHiveデータ型をサポートしています。システムのデータ型がサポートされていることを確認してください。
次の表に、HDFS Writerがデータ型を変換するデータ型マッピングを示します。
ファイル内の指定された列のデータ型は、Hiveテーブルの列のデータ型と同じである必要があります。
カテゴリ | Hiveデータ型 |
整数 | TINYINT、SMALLINT、INT、およびBIGINT |
浮動小数点 | FLOATおよびDOUBLE |
文字列 | CHAR、VARCHAR、およびSTRING |
BOOLEAN | BOOLEAN |
日付と時刻 | DATEおよびTIMESTAMP |
データ同期タスクの開発
データ同期タスクのエントリ ポイントと設定手順については、以下のセクションを参照してください。パラメータ設定については、タスクの設定タブにある各パラメータの情報ヒントを表示してください。
データソースの追加
特定のデータソースとのデータ同期タスクを設定する前に、データソースをDataWorksに追加する必要があります。詳細については、「データソースの追加と管理」をご参照ください。
単一テーブルのデータを同期するためのバッチ同期タスクの設定
設定手順の詳細については、「コードレスUIを使用したバッチ同期タスクの設定」および「コードエディタを使用したバッチ同期タスクの設定」をご参照ください。
コードエディタを使用してバッチ同期タスクを設定する場合に設定されるすべてのパラメータと実行されるコードについては、「付録: コードとパラメータ」をご参照ください。
付録: コードとパラメータ
付録: コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合は、コードエディタの形式要件に基づいて、関連データソースのリーダーとライターのパラメータを設定する必要があります。形式要件の詳細については、「コードエディタを使用したバッチ同期タスクの設定」をご参照ください。以下の情報は、コードエディタのリーダーとライターのパラメータの設定詳細について説明しています。
HDFS Readerのコード
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "hdfs",// プラグイン名。
"parameter": {
"path": "",// データを読み取るファイルのパス。
"datasource": "",// データソースの名前。
"hadoopConfig":{
"dfs.data.transfer.protection": "integrity",
"dfs.datanode.use.datanode.hostname" :"true",
"dfs.client.use.datanode.hostname":"true"
},
"column": [
{
"index": 0,// ソースファイルの列のインデックス。インデックスは 0 から始まり、HDFS Reader がソースファイルの最初の列からデータを読み取ることを示します。
"type": "string"// フィールドタイプ。
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "boolean"
},
{
"format": "yyyy-MM-dd HH:mm:ss",// 時間形式。
"index": 4,
"type": "date"
}
],
"fieldDelimiter": ",",// 列の区切り文字。
"encoding": "UTF-8",// エンコード形式。
"fileType": ""// ファイル形式。
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// 許容されるダーティデータレコードの最大数。
},
"speed": {
"concurrent": 3,// 並列スレッドの最大数。
"throttle": true, // スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"mbps":"12"// 最大伝送速度。単位: MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}次の例は、parquetSchemaパラメータを使用したHDFS Readerの設定を示しています。
fileTypeパラメータは、parquetに設定する必要があります。
HDFS ReaderにParquetファイルから特定の列を読み取らせる場合は、parquetSchemaパラメータに完全なスキーマを指定し、columnパラメータのindexフィールドを使用して読み取る列を指定する必要があります。
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/addata.db/dw_ads_rtb_monitor_minute/thedate=20170103/hour_id=22/*",
"defaultFS": "h10s010.07100.149:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
}
],
"fileType": "parquet",
"encoding": "UTF-8",
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
}
}HDFS Readerのコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
path | データを読み取るファイルのパス。複数のファイルからデータを読み取る場合は、
pathパラメータを設定する場合は、以下の点に注意してください。
| はい | デフォルト値なし |
defaultFS | HDFSのNameNodeノードのエンドポイント。共有リソースグループは、高可用性に関連する高度なHadoopパラメータをサポートしていません。 | はい | デフォルト値なし |
fileType | データを読み取るファイルの形式。HDFS Readerはファイル形式を自動的に識別し、関連する読み取りポリシーを使用します。HDFS Readerはデータを読み取る前に、指定されたパスのすべてのファイルがfileTypeパラメータで指定された形式と一致するかどうかを確認します。ファイルの形式がfileTypeパラメータで指定された形式と一致しない場合、データ同期タスクは失敗します。 fileTypeパラメータの有効な値:
HDFS Readerは、text形式とORC形式のファイルを異なる方法で解析します。データがHiveの複合データ型からData IntegrationでサポートされているSTRING型に変換される場合、変換結果はtext形式とORC形式で異なります。複合データ型には、MAP、ARRAY、STRUCT、およびUNIONが含まれます。次の例は、MAP型からSTRING型への変換の結果を示しています。
変換結果から、データは変更されないままですが、形式がわずかに異なることがわかります。そのため、同期する列でHiveの複合データ型を使用する場合は、統一されたファイル形式を使用することをお勧めします。 推奨されるベストプラクティス:
| はい | デフォルト値なし |
column | データを読み取る列の名前。typeフィールドはデータ型を指定します。indexフィールドは列のIDを指定します。 0 から始まります。valueフィールドは定数を指定します。valueフィールドを指定すると、HDFS Readerはこのフィールドの値を読み取ります。デフォルトでは、HDFS Readerはすべてのデータを文字列として読み取ります。この場合、このパラメータを columnパラメータでは、typeフィールドとindexフィールドとvalueフィールドのいずれか1つを設定する必要があります。例: 説明
| はい | デフォルト値なし |
fieldDelimiter | データを読み取る列の区切り文字。ソースファイルがtextファイルの場合は、列の区切り文字を指定する必要があります。列の区切り文字を指定しない場合、HDFS Readerはデフォルトでカンマ(,)を列の区切り文字として使用します。ソースファイルがORCファイルの場合は、列の区切り文字を指定する必要はありません。HDFS ReaderはHiveのデフォルトの区切り文字である\u0001を使用します。 説明
| いいえ | , |
encoding | データを読み取るファイルのエンコード形式。 | いいえ | utf-8 |
nullFormat | ヌルポインタを表す文字列。textファイルでは、ヌルポインタを表す標準文字列はありません。このパラメータを使用して、ヌルポインタを表す文字列を定義できます。 たとえば、このパラメータを 説明 文字列NULLはヌルポインタとは異なります。両者の違いに注意してください。 | いいえ | デフォルト値なし |
compress | fileTypeパラメータがCSVに設定されている場合の圧縮形式。サポートされている圧縮形式は、GZIP、BZ2、ZIP、LZO、LZO_DEFLATE、Hadoop-Snappy、およびFraming-Snappyです。 説明
| いいえ | デフォルト値なし |
parquetSchema | Parquetファイルのデータのスキーマの説明。fileTypeパラメータをparquetに設定した場合は、parquetSchemaパラメータを設定する必要があります。parquetSchemaパラメータの値がJSON構文に準拠していることを確認してください。 parquetSchemaパラメータには、次のフィールドが含まれています。
設定例: | いいえ | デフォルト値なし |
csvReaderConfig | CSVファイルを読み取るために必要な構成です。パラメーター値は、MAPタイプと一致する必要があります。CSVファイルリーダーを使用して、CSVファイルからデータを読み取ることができます。CSVファイルリーダーは、複数の構成をサポートしています。 次の例は、一般的な構成を示しています。 以下の設定は、すべてのフィールドとそのデフォルト値を示しています。MAP タイプの csvReaderConfig パラメーターを設定する場合は、以下の設定で提供されているフィールド名を使用する必要があります。 | いいえ | デフォルト値なし |
hadoopConfig | 高可用性に関連するパラメーターなど、Hadoop の詳細パラメーターの設定。共有リソースグループは、高可用性に関連する高度な Hadoop パラメーターをサポートしていません。 説明 上記の設定は、HDFS Reader で Kerberos 認証を設定するために使用されます。HDFS データソースに Kerberos 認証を設定する場合、HDFS Reader で設定する必要はありません。HDFS データソースを追加する方法の詳細については、「HDFSデータソース」をご参照ください。 | いいえ | デフォルト値なし |
haveKerberos | Kerberos 認証を有効にするかどうかを指定します。デフォルト値: false。このパラメーターを true に設定する場合は、kerberosKeytabFilePath パラメーターと kerberosPrincipal パラメーターも構成する必要があります。 | いいえ | false |
kerberosKeytabFilePath | Kerberos 認証用のキータブファイルの絶対パス。 haveKerberos パラメーターが true に設定されている場合、このパラメーターは必須です。 | いいえ | デフォルト値なし |
kerberosPrincipal | Kerberos プリンシパル (例: ****/hadoopclient@**.***)。 haveKerberos パラメーターが true に設定されている場合、このパラメーターは必須です。 説明 Kerberos 認証には、keytab ファイルの絶対パスが必要です。そのため、Data Integration 専用のリソースグループに対して Kerberos 認証を設定する必要があります。次のコードは、設定例を示しています: | いいえ | デフォルト値なし |
HDFS Writer のコード
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "hdfs",// プラグイン名。
"parameter": {
"path": "",// ファイルを書き込む HDFS のディレクトリ。
"fileName": "",// HDFS に書き込むファイルの名前のプレフィックス。
"compress": "",// HDFS に書き込むファイルの圧縮形式。
"datasource": "",// データソースの名前。
"column": [
{
"name": "col1",// 列の名前。
"type": "string"// 列のデータ型。
},
{
"name": "col2",
"type": "int"
},
{
"name": "col3",
"type": "double"
},
{
"name": "col4",
"type": "boolean"
},
{
"name": "col5",
"type": "date"
}
],
"writeMode": "",// 書き込みモード。
"fieldDelimiter": ",",// 列の区切り文字。
"encoding": "",// エンコード形式。
"fileType": "text"// HDFS に書き込むファイルの形式。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// 許容されるダーティデータレコードの最大数。
},
"speed": {
"concurrent": 3, // 並列スレッドの最大数。
"throttle": false, // スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}HDFS Writer のコードのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
defaultFS | HDFS の NameNode ノードのアドレス (例: | はい | デフォルト値なし |
fileType | HDFS に書き込むファイルの形式。有効な値:
| はい | デフォルト値なし |
path | ファイルを書き込む HDFS のディレクトリ。HDFS Writer は、並列スレッドの構成に基づいて、複数のファイルをディレクトリに書き込みます。 ファイルの列を Hive テーブルの列に関連付けるには、path パラメーターを HDFS の Hive テーブルのストレージパスに設定します。たとえば、Hive データウェアハウスに指定されているストレージパスは | はい | デフォルト値なし |
fileName | HDFS に書き込むファイルの名前のプレフィックス。指定されたプレフィックスにランダムなサフィックスが付加され、各スレッドで使用される実際のファイル名が形成されます。 | はい | デフォルト値なし |
column | データを書き込む列の名前。Hive テーブルの一部の列にのみデータを書き込むことはできません。 ファイルの列を Hive テーブルの列に関連付けるには、各列の name パラメーターと type パラメーターを設定します。name パラメーターは列の名前を指定し、type パラメーターは列のデータ型を指定します。 column パラメーターは、次の形式で設定できます。 | fileType パラメーターが text または orc に設定されている場合は必須 | デフォルト値なし |
writeMode | 書き込みモード。有効な値:
説明 Parquet ファイルは append モードをサポートしていません。Parquet ファイルを書き込むには、writeMode パラメーターを nonConflict に設定する必要があります。 | はい | デフォルト値なし |
fieldDelimiter | HDFS に書き込むファイルで使用される列区切り文字。Hive テーブルと同じ区切り文字を使用していることを確認してください。そうでない場合、Hive テーブルのデータをクエリできません。 説明 1 文字の区切り文字のみがサポートされています。複数文字の区切り文字を指定すると、エラーが報告されます。 | fileType パラメーターが text または orc に設定されている場合は必須 | デフォルト値なし |
compress | HDFS に書き込むファイルの圧縮形式。デフォルトでは、このパラメーターは空のままです。これは、ファイルが圧縮されていないことを示します。 テキストファイルの場合、GZIP および BZIP2 圧縮形式がサポートされています。 | いいえ | デフォルト値なし |
encoding | HDFS に書き込むファイルのエンコード形式。 | いいえ | デフォルト値なし |
parquetSchema | HDFS に書き込む Parquet ファイルのスキーマ。fileType パラメーターが parquet に設定されている場合にのみ、このパラメーターを使用できます。形式: フィールド:
説明 最後の行を含む各行は、セミコロン (;) で終わる必要があります。 例: | いいえ | デフォルト値なし |
hadoopConfig | 高可用性に関連するパラメーターなど、高度な Hadoop パラメーターの設定。Data Integration の共有リソースグループを使用している場合、高可用性に関連する高度な Hadoop パラメーターを設定することはできません。これらのパラメーターを設定する場合は、Data Integration のカスタムリソースグループを使用する必要があります。詳細については、「Data Integration のカスタムリソースグループの作成と使用」をご参照ください。 | いいえ | デフォルト値なし |
dataxParquetMode | Parquet ファイルの同期モード。有効な値: fields および columns。このパラメーターを fields に設定すると、HDFS Writer は ARRAY、MAP、STRUCT などの複合データ型のデータを書き込むことができます。 このパラメーターを fields に設定すると、HDFS Writer は HDFS over Object Storage Service (OSS) をサポートします。この場合、HDFS は OSS をストレージサービスとして使用し、HDFS Writer は Parquet ファイルを OSS に書き込みます。hadoopConfig パラメーターに次の OSS 関連パラメーターを追加できます。
次のサンプルコードは、OSS に接続する方法の例を示しています。 | いいえ | columns |
haveKerberos | Kerberos 認証が必要かどうかを指定します。 このパラメーターを true に設定すると、kerberosKeytabFilePath パラメーターと kerberosPrincipal パラメーターが必要になります。 | いいえ | false |
kerberosKeytabFilePath | Kerberos 認証用の keytab ファイルの絶対パス。 | haveKerberos パラメーターが true に設定されている場合は必須 | デフォルト値なし |
kerberosPrincipal | Kerberos プリンシパル (例: ****/hadoopclient@**.***)。haveKerberos パラメーターが true に設定されている場合は、このパラメーターが必要です。 Kerberos 認証には、keytab ファイルの絶対パスが必要です。Kerberos 認証を使用するには、カスタムリソースグループで Kerberos 認証を設定する必要があります。次のコードは、構成例を示しています。 | いいえ | デフォルト値なし |