OSS データソースは、OSS への読み取りおよび書き込みアクセスを提供します。このトピックでは、DataWorks が OSS のデータ同期をどのようにサポートするかについて説明します。
サポートされるフィールドタイプと制限事項
バッチデータの読み取り
OSS Reader は OSS からデータを読み取り、Data Integration が処理できる形式に変換します。OSS は非構造化データストレージサービスであるため、Reader は以下の特徴をサポートします。
サポート対象 | サポート対象外 |
|
|
OSS でデータを準備する際、CSV ファイルは標準の CSV 形式に準拠する必要があります。例えば、列内の二重引用符 (") は、二重引用符二つ ("") に置き換えてエスケープする必要があります。そうしないと、解析エラーが発生する可能性があります。ファイルに複数のデリミタが含まれている場合は、TXT ファイルタイプを使用することを推奨します。
OSS は、データをファイルとして保存する非構造化データソースです。同期タスクを実行する前に、フィールド構造が正しく設定されていることを確認してください。ソースデータの構造が変更された場合は、データ化けを防ぐためにタスク設定を更新する必要があります。
バッチデータの書き込み
OSS Writer は Data Integration 準拠のデータを変換し、テキストファイルとして OSS に書き込みます。OSS は非構造化データストレージサービスであるため、Writer は以下の特徴をサポートします。
サポート対象 | サポート対象外 |
|
|
カテゴリ | 列の型 |
整数 | LONG |
文字列 | STRING |
浮動小数点 | DOUBLE |
ブール値 | BOOLEAN |
日時 | DATE |
リアルタイムデータの書き込み
リアルタイムデータ書き込みをサポートします。
単一テーブルから、Hudi (0.12.x)、Paimon、Iceberg などのデータレイク形式へのリアルタイム書き込みを実行できます。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解できます。
アカウントをまたいで OSS データソースを追加する場合、必要な権限を付与する必要があります。詳細については、「バケットポリシーを使用した OSS へのクロスアカウントアクセスの許可」をご参照ください。
RAM ロールベースの権限付与を使用して OSS データソースを設定する場合、「RAM ロール権限付与を使用したデータソースの設定」をご参照ください。
リージョンをまたいで OSS データソースを追加する場合、パブリックエンドポイントを使用することを推奨します。詳細については、「エンドポイントとネットワーク接続の概要」をご参照ください。
データ同期タスクの設定
同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのバッチ同期
「コードレス UI での設定」および「スクリプトモードでの設定」をご参照ください。
スクリプトモードのパラメーターとコードサンプルについては、「付録:コードサンプルとパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期
「単一テーブルのリアルタイム同期タスクの設定」をご参照ください。
データベース全体の同期
「データベース全体のバッチ同期タスクの設定」および「データベース全体のリアルタイム同期タスクの設定」をご参照ください。
よくある質問
付録:スクリプトのサンプルとパラメーター
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
一般的な Reader スクリプトの例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"oss",// プラグイン名。
"parameter":{
"nullFormat":"",// null 値を表す文字列。
"compress":"",// 圧縮タイプ。
"datasource":"",// データソース。
"column":[// 列の定義。
{
"index":0,// 列インデックス。0 から始まります。
"type":"string"// データの型。
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", // 時刻フォーマット。「type」が「date」の場合に必須です。
"index":4,
"type":"date"
}
],
"skipHeader":"",// CSV 形式のファイルでヘッダーをスキップするかどうか。
"encoding":"",// エンコーディング。
"fieldDelimiter":",",// フィールドデリミタ。
"fileFormat": "",// ファイル形式。
"object":[]// 読み取るファイルのオブジェクトプレフィックス。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""// 許容されるエラーレコードの最大数。
},
"speed":{
"throttle":true,// true の場合、速度制限を有効にします。false の場合、「mbps」パラメーターは無視されます。
"concurrent":1, // ジョブの同時実行数。
"mbps":"12"// 速度制限の上限 (MB/s)。(1 mbps = 1 MB/s)。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}OSS からの ORC または Parquet ファイルの読み取り
HDFS Reader を再利用することで、OSS から ORC または Parquet ファイルを読み取ることができます。そのためには、標準の OSS Reader パラメーターに加えて、path (ORC の場合) および fileFormat (ORC および Parquet の場合) パラメーターを指定します。
この例は、OSS から ORC 形式のデータを読み取る方法を示しています。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }この例は、OSS から Parquet 形式のデータを読み取る方法を示しています。
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前を指定します。スクリプトモードでは、この値は設定済みのデータソースの名前と一致する必要があります。 | はい | なし |
object | OSS から同期する 1 つ以上のオブジェクトを指定します。オブジェクトは、明示的なパス、ワイルドカード、または動的パラメーターを使用して指定できます。 1. 設定方法
重要
2. 同時実行数とパフォーマンス 設定方法は、データ抽出の同時実行数を直接決定します:
| はい | なし |
parquetSchema | このパラメーターを使用して、Parquet ファイル形式のスキーマを定義します。fileFormat が parquet に設定されている場合にのみ適用されます。parquetSchema を指定した後、全体の設定が有効な JSON であることを確認してください。 parquetSchema の形式は次のとおりです:
以下は設定例です: | いいえ | なし |
column | 読み取る列のリストを指定します。「type」はソース内のデータ型を指定します。「index」はテキストファイル内の列の 0 から始まるインデックスを指定します。「value」は列の定数値を指定し、システムにソースファイルから読み取る代わりに値を生成するよう指示します。 デフォルトでは、次の例に示すように、すべての列を STRING データ型として読み取ることができます。 次の例に示すように、各列の詳細を指定することもできます。 説明 指定する各列について、「type」は必須であり、「index」または「value」のいずれかを指定する必要があります。 | はい | すべての列が STRING データ型として読み取られます。 |
fileFormat | OSS 内のソースオブジェクトのファイル形式。有効な値は「csv」と「text」です。両方の形式でカスタムデリミタがサポートされています。 | はい | csv |
fieldDelimiter | フィールドデリミタ。 説明 フィールドデリミタは必須です。指定しない場合、パラメーターはデフォルトでカンマ (,) になり、これは UI のデフォルトでもあります。 非表示文字は Unicode 表現で指定します (例:\u001b)。表示可能な文字にもこの形式を使用できます (例:パイプ記号の場合は \u007c)。 | はい | , |
lineDelimiter | 行デリミタ。 説明 このパラメーターは、「fileFormat」が「text」に設定されている場合にのみ適用されます。 | いいえ | なし |
compress | ソースファイルの圧縮タイプ。このパラメーターが設定されていない場合、ファイルは非圧縮と見なされます。サポートされているタイプ:gzip、bzip2、zip。 | いいえ | 圧縮なし |
encoding | ソースファイルの文字エンコーディング。 | いいえ | utf-8 |
nullFormat | ソースデータ内で null 値を表す文字列を指定します。これは、テキストファイルには null 値の標準的な表現がないために必要です。例:
| いいえ | なし |
skipHeader | CSV 形式のファイルには、タイトルとして機能するヘッダー行が含まれている場合があり、これをスキップする必要があります。デフォルトでは、この行はスキップされません。skipHeader パラメーターは、圧縮ファイルモードではサポートされていません。 | いいえ | false |
csvReaderConfig | CSV ファイルを解析するために使用される CsvReader の追加設定のマップ。このパラメーターが省略された場合、CsvReader はデフォルト設定を使用します。 | いいえ | なし |
Writer スクリプトの例
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",// プラグイン名。
"parameter":{
"nullFormat":"",// どの文字列を null 値として解釈するかを定義します。
"dateFormat":"",// 日付フォーマット。
"datasource":"",// データソース。
"writeMode":"",// 書き込みモード。
"writeSingleObject":"false", // true の場合、すべてのデータを単一の OSS ファイルに書き込みます。
"encoding":"",// ファイルエンコーディング。
"fieldDelimiter":",",// フィールドデリミタ。
"fileFormat":"",// ファイル形式。
"object":""// オブジェクトプレフィックス。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラー上限。
},
"speed":{
"throttle":true,// 速度制限を有効にします。false の場合、「mbps」パラメーターは無視されます。
"concurrent":1, // ジョブの同時実行数。
"mbps":"12"// 速度制限の上限 (MB/s)。(1 mbps = 1 MB/s)。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}スクリプトサンプル:OSS への ORC または Parquet ファイルの書き込み
HDFS Writer を使用して、ORC または Parquet ファイルを OSS に書き込むことができます。このメソッドは、既存の OSS Writer パラメーターに加えて、path や fileFormat などの拡張設定パラメーターを追加します。これらのパラメーターの説明については、「HDFS Writer」をご参照ください。
以下の例は、ORC または Parquet ファイルを OSS に書き込む方法を示しています:
以下のコードはデモンストレーションのみを目的としています。特定の列名と列の型に合わせてパラメーターを変更する必要があります。本番環境でこのコードをコピーして使用しないでください。
ORC ファイルを OSS に書き込む
現在、ORC ファイルはコードエディタでのみ書き込み可能です。タスクを設定するには、コードエディタに切り替える必要があります。fileFormat パラメーターを
orcに設定し、ファイルの path を指定し、各 column を{"name":"your column name","type": "your column type"}の形式で定義します。オフライン書き込みでサポートされている ORC の列の型は以下の通りです:
型
ステータス
TINYINT
サポート対象
SMALLINT
サポート対象
INT
サポート対象
BIGINT
サポート対象
FLOAT
サポート対象
DOUBLE
サポート対象
TIMESTAMP
サポート対象
DATE
サポート対象
VARCHAR
サポート対象
STRING
サポート対象
CHAR
サポート対象
BOOLEAN
サポート対象
DECIMAL
サポート対象
BINARY
サポート対象
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }Parquet ファイルを OSS に書き込む
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでは、この値は設定済みのデータソースの名前と一致する必要があります。 | はい | なし |
object | OSS 内の出力オブジェクトの名前。OSS はオブジェクト名を使用してディレクトリ構造をシミュレートします。オブジェクト名には以下の規則があります:
ランダムな UUID サフィックスを付けたくない場合は、 | はい | なし |
ossBlockSize | OSS パートのサイズ。デフォルト値は 16 MB です。ファイルの出力形式が parquet または ORC の場合、このパラメーターを object パラメーターと同じレベルで設定できます。 OSS のマルチパートアップロードは 10,000 パートに制限されているため、デフォルトのパートサイズでは最大ファイルサイズが 160 GB に制限されます。より大きなファイルをアップロードするには、パートサイズを増やす必要があります。 | いいえ | 16 |
writeMode | OSS Writer が書き込み前に既存のデータをどのように処理するかを指定します:
| はい | なし |
writeSingleObject | すべてのデータを単一のオブジェクトに書き込むかどうかを決定します。
説明
| いいえ | false |
fileFormat | 出力オブジェクトのファイル形式。以下の形式がサポートされています:
| いいえ | text |
compress | OSS に書き込まれるデータファイルの圧縮形式を指定します。このパラメーターはコードエディタで設定する必要があります。 重要 このパラメーターは Parquet および ORC ファイル形式にのみ適用され、SNAPPY 圧縮のみがサポートされています。csv および text 形式は圧縮をサポートしていません。 | いいえ | なし |
fieldDelimiter | 出力データのフィールドデリミタ。 | いいえ | , |
encoding | 出力ファイルの文字エンコーディングを指定します。 | いいえ | utf-8 |
parquetSchema | fileFormat が parquet に設定されている場合に必須です。このパラメーターは、出力 Parquet ファイルのスキーマを定義します。次の形式を使用します: 設定項目は次のように説明されます:
説明 各列定義は、最後のものを含め、セミコロンで終わる必要があります。 例: | いいえ | なし |
nullFormat | テキストファイルでは、標準の文字列を使用して null 値を定義することはできません。データ同期システムは、null 値を表す文字列を指定するために nullFormat パラメーターを提供します。例えば、 | いいえ | なし |
header | 出力ファイルに書き込むヘッダーを指定します。値は、 | いいえ | なし |
maxFileSize (高度な設定、コードレス UI ではサポートされていません) | ログファイルのローテーションと同様に、ファイルがローテーションされる前の単一出力オブジェクトの最大サイズを制御します。マルチパートアップロードの場合、パートサイズは 10 MB であり、これが最小のローテーション粒度としても機能します。 ファイルがローテーションされると、新しいオブジェクト名は、プレフィックスとランダムな UUID を既に含むベースオブジェクト名にシーケンス番号 (例:_1, _2, _3) を追加して作成されます。 説明
| いいえ | 100,000 |
suffix (高度な設定、コードレス UI ではサポートされていません) | 出力オブジェクト名に追加するサフィックスを指定します。例えば、suffix を .csv に設定すると、最終的なオブジェクト名は object-prefix_random-uuid.csv のようになります。 | いいえ | なし |
付録:Parquet データ型の変換
parquetSchema を設定しない場合、DataWorks は以下に示すようにソースデータ型を自動的に変換します。
ソースの型 | Parquet の型 | Parquet の論理型 |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | N/A |
BINARY / VARBINARY | BINARY | N/A |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | N/A |
BIGINT | INT64 | N/A |
FLOAT | FLOAT | N/A |
DOUBLE | DOUBLE | N/A |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | N/A |