OSS データソースは、Object Storage Service (OSS) からのデータの読み取りと OSS へのデータの書き込みのための双方向チャネルを提供します。このトピックでは、DataWorks における OSS データソースのデータ同期機能について説明します。
サポートされているフィールドタイプと制限
オフライン読み取り
OSS Reader は OSS からデータを読み取り、Data Integration プロトコルに変換します。OSS は非構造化データ向けのストレージサービスです。Data Integration のために、OSS Reader は以下の機能をサポートしています。
サポート | 非サポート |
|
|
OSS に CSV ファイルとしてデータを準備する場合、ファイルは標準的な CSV フォーマットである必要があります。たとえば、カラム内に二重引用符(")が含まれる場合は、それを二つの二重引用符("")に置き換える必要があります。そうしないと、ファイルが誤って分割されます。ファイルに複数のデリミタが含まれる場合は、テキストファイルタイプを使用することを推奨します。
OSS はファイル型データを格納する非構造化データソースです。データを同期する前に、フィールド構造が期待通りであることを確認してください。同様に、非構造化データソース内のデータ構造が変更された場合は、タスク設定で再度フィールド構造を確認する必要があります。確認を行わないと、同期中にデータが文字化けする可能性があります。
オフライン書き込み
OSS Writer は、データ同期プロトコルから OSS 内のテキストファイルにデータを変換します。OSS は非構造化データ向けのストレージサービスです。OSS Writer は以下の機能をサポートしています。
サポート | 非サポート |
|
|
型分類 | Data Integration カラム設定タイプ |
整数型 | LONG |
文字列型 | STRING |
浮動小数点型 | DOUBLE |
ブール値型 | BOOLEAN |
日付・時刻型 | DATE |
リアルタイム書き込み
リアルタイム書き込みをサポートしています。
Hudi (0.12.x)、Paimon、Iceberg などのデータレイクへの単一テーブルからのリアルタイム書き込みをサポートしています。
データソースの作成
DataWorks で同期タスクを開発する前に、データソース管理の手順に従って、必要なデータソースを DataWorks に追加する必要があります。DataWorks コンソールでパラメーターの説明を確認し、データソースを追加する際の各パラメーターの意味を理解してください。
クロスアカウント OSS データソースを作成する場合、対応するアカウントに権限を付与する必要があります。詳細については、「バケットポリシーを使用して OSS へのクロスアカウントアクセスを許可する」をご参照ください。
OSS データソースの権限付与に RAM ロールを使用する場合は、「RAM ロール権限付与モードを使用したデータソースの設定」をご参照ください。
クロスリージョンの OSS データソースを作成する場合は、パブリックエンドポイントを使用してください。詳細については、「エンドポイントとネットワーク接続の概要」をご参照ください。
データ同期タスクの開発
同期タスクの設定エントリポイントおよび手順については、以下の設定ガイドをご参照ください。
単一テーブルのオフライン同期タスクの設定ガイド
設定手順の詳細については、「コードレス UI 設定」および「コードエディタ設定」をご参照ください。
コードエディタのすべてのパラメーターおよびスクリプトデモについては、「付録:スクリプトデモとパラメーター説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの設定ガイド
構成プロセスの詳細については、Data Integration でリアルタイム同期タスクを設定するおよびDataStudio でリアルタイム同期タスクを設定するをご参照ください。
データベース全体の同期の設定ガイド
構成プロセスの詳細については、「データベース全体のオフライン同期タスク」および「データベース全体のリアルタイム同期タスク」をご参照ください。
よくある質問
付録:スクリプトデモとパラメーター説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合は、統一されたスクリプト形式の要件に基づいて、スクリプト内で関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際にデータソース用に設定する必要のあるパラメーターについて説明します。
Reader スクリプトデモ:一般的な例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"oss",// プラグイン名。
"parameter":{
"nullFormat":"",// null を表す文字列を定義します。
"compress":"",// テキスト圧縮タイプ。
"datasource":"",// データソース。
"column":[// フィールド。
{
"index":0,// カラムインデックス。
"type":"string"// データの型。
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", // 時間フォーマット。
"index":4,
"type":"date"
}
],
"skipHeader":"",// CSV 形式のファイルにヘッダー行がある場合にスキップします。
"encoding":"",// エンコード形式。
"fieldDelimiter":",",// 列区切り文字。
"fileFormat": "",// テキストファイル形式。
"object":[]// オブジェクトプレフィックス。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""// エラーレコード数。
},
"speed":{
"throttle":true,// throttle が false の場合、mbps パラメーターは無効になり、レートは制限されません。throttle が true の場合、レートは制限されます。
"concurrent":1, // 同時実行ジョブ数。
"mbps":"12"// レート制限。1 mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader スクリプトデモ:OSS から ORC または Parquet ファイルを読み取る
OSS から ORC または Parquet 形式のファイルを読み取るには、HDFS Reader を再利用できます。既存の OSS Reader パラメーターに加えて、Path(ORC 用)およびFileFormat(ORC および Parquet 用)などの拡張設定パラメーターも使用されます。
以下は、OSS から ORC ファイルを読み取る例です。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }以下は、OSS から Parquet ファイルを読み取る例です。
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。このパラメーターの値は、コードエディタで追加したデータソースの名前と同じである必要があります。 | はい | なし |
Object | OSS から同期する 1つまたは複数のオブジェクトを指定します。完全なパス、ワイルドカード文字を含むパス、または動的パラメーターを含むパスを使用してオブジェクトを指定できます。 1. 設定方法
重要
2. 同時読み取りメカニズムとパフォーマンス パスの設定方法によって、データ抽出の並列性とパフォーマンスが決まります。
| はい | なし |
parquetSchema | このパラメーターは、OSS から Parquet ファイルを読み取る場合にのみ必要です。fileFormat が parquet に設定されている場合にのみ有効です。このパラメーターは Parquet ファイル内のデータ型を記述します。設定が有効な JSON 形式であることを確認してください。 `parquetSchema` の形式は以下のとおりです。
以下は設定例です。 | いいえ | なし |
column | 読み取るフィールドのリスト。`type` はソースデータのデータ型を指定します。`index` はテキストファイル内のカラム番号(0 から開始)を指定します。`value` は現在の型が定数であることを指定します。このカラムのデータはソースファイルから読み取られず、`value` に基づいて自動的に生成されます。 デフォルトでは、すべてのデータを String 型として読み取ることができます。設定は以下のとおりです。 カラムフィールド情報を指定することもできます。設定は以下のとおりです。 説明 指定するカラム情報において、`type` は必須です。`index` または `value` のいずれかを指定する必要があります。 | はい | すべてのデータが STRING 型として読み取られます。 |
fileFormat | OSS 内のソースオブジェクトのファイル形式。有効な値は `csv` および `text` です。どちらの形式もカスタムデリミタをサポートしています。 | はい | csv |
fieldDelimiter | ファイルを読み取る際に使用される列区切り文字。 説明 OSS Reader がデータを読み取る際には、列区切り文字を指定する必要があります。指定しない場合、デフォルトはカンマ(,)になります。これは設定ページのデフォルト値でもあります。 デリミタが可視文字でない場合は、その Unicode エンコーディングを入力してください。例:\u001b または \u007c。 | はい | , |
lineDelimiter | 行区切り文字。 説明 このパラメーターは `fileFormat` が `text` に設定されている場合にのみ有効です。 | いいえ | なし |
compress | テキストファイルの圧縮形式。デフォルト値は空で、圧縮なしを意味します。サポートされている形式は gzip、bzip2、zip です。 | いいえ | 圧縮なし |
encoding | ソースファイルのエンコード形式。 | いいえ | utf-8 |
nullFormat | テキストファイルでは、ヌルポインタを標準的な文字列で定義できません。`nullFormat` を使用して、どの文字列が null を表すかを定義します。例:
| いいえ | なし |
skipHeader | CSV 形式のファイルのヘッダー行をスキップします。デフォルト値は false です。skipHeader パラメーターは圧縮ファイルではサポートされていません。 | いいえ | false |
csvReaderConfig | CSV ファイルを読み取るためのパラメーター。これはマップです。CsvReader は CSV ファイルを読み取るために使用されます。これらのパラメーターを設定しない場合、デフォルト値が使用されます。 | いいえ | なし |
Writer スクリプトデモ:一般的な例
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",// プラグイン名。
"parameter":{
"nullFormat":"",// null を表す文字列を定義します。
"dateFormat":"",// 日付フォーマット。
"datasource":"",// データソース。
"writeMode":"",// 書き込みモード。
"writeSingleObject":"false", // 同期されたデータを単一の OSS ファイルに書き込むかどうかを指定します。
"encoding":"",// エンコード形式。
"fieldDelimiter":",",// 列区切り文字。
"fileFormat":"",// テキストファイル形式。
"object":""// オブジェクトプレフィックス。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラーレコード数。
},
"speed":{
"throttle":true,// throttle が false の場合、mbps パラメーターは無効になり、レートは制限されません。throttle が true の場合、レートは制限されます。
"concurrent":1, // 同時実行ジョブ数。
"mbps":"12"// レート制限。1 mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer スクリプトデモ:OSS への ORC または Parquet ファイルの書き込み
OSS への ORC または Parquet ファイルの書き込みには、HDFS Writer を再利用できます。既存の OSS Writer パラメーターに加えて、Path および FileFormat などの拡張パラメーターを使用できます。これらのパラメーターの詳細については、「HDFS Writer」をご参照ください。
以下は、OSS への ORC または Parquet ファイルの書き込み例です。
以下のコードは参考用です。カラム名およびデータ型に応じてパラメーターを修正してください。コードをそのままコピーしないでください。
OSS への ORC ファイルの書き込み
ORC ファイルを書き込むには、コードエディタを使用する必要があります。fileFormat を
orcに設定し、path を書き込むファイルのパスに設定し、column を{"name": "your column name", "type": "your column type"}の形式で設定します。書き込みをサポートしている ORC 型は以下のとおりです。
フィールド型
OSS へのオフライン書き込み(ORC 形式)
TINYINT
サポート
SMALLINT
サポート
INT
サポート
BIGINT
サポート
FLOAT
サポート
DOUBLE
サポート
TIMESTAMP
サポート
DATE
サポート
VARCHAR
サポート
STRING
サポート
CHAR
サポート
BOOLEAN
サポート
DECIMAL
サポート
BINARY
サポート
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }OSS への Parquet ファイルの書き込み
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。このパラメーターの値は、コードエディタで追加したデータソースの名前と同じである必要があります。 | はい | なし |
object | OSS に書き込むファイルの名前。OSS はファイル名を使用してディレクトリ構造をシミュレートします。OSS におけるオブジェクト名の制限は以下のとおりです。
ランダムな UUID の付加を避けたい場合は、 | はい | なし |
ossBlockSize | 各データブロックのサイズ(MB 単位)。デフォルト値は 16 です。このパラメーターは `fileFormat` が parquet または ORC の場合にのみサポートされています。object パラメーターと同じレベルでこのパラメーターを設定できます。 OSS のフラグメントアップロードは最大 10,000 ブロックをサポートしているため、デフォルトの単一ファイルサイズは 160 GB に制限されています。ブロック数が制限を超える場合は、ブロックサイズを増やすことでより大きなファイルのアップロードをサポートできます。 | いいえ | 16 |
writeMode | 書き込み前に既存データをどのように処理するかを指定します。
| はい | なし |
writeSingleObject | データを単一ファイルに書き込むかどうかを指定します。
説明
| いいえ | false |
fileFormat | オブジェクトファイルの形式。以下の形式がサポートされています。
| いいえ | text |
compress | OSS に書き込まれるオブジェクトファイルの圧縮形式。このパラメーターはコードエディタで設定する必要があります。 重要 CSV および TEXT ファイルタイプは圧縮をサポートしていません。Parquet および ORC ファイルは SNAPPY 圧縮のみをサポートしています。 | いいえ | なし |
fieldDelimiter | 列区切り文字。 | いいえ | , |
encoding | ファイルエンコーディングを設定します。 | いいえ | utf-8 |
parquetSchema | このパラメーターは、OSS に Parquet ファイルを書き込む場合に必要です。オブジェクトファイルの構造を記述します。このパラメーターは fileFormat が parquet に設定されている場合にのみ有効です。形式は以下のとおりです。 設定項目は以下のとおりです。
説明 各行の設定は、最後の行を含めてセミコロンで終了する必要があります。 以下は例です。 | いいえ | なし |
nullFormat | テキストファイルでは、ヌルポインタを標準的な文字列で定義できません。nullFormat を使用して null を表す文字列を定義します。例: | いいえ | なし |
header | オブジェクトファイルのヘッダー。例: | いいえ | なし |
maxFileSize (高度な設定。コードレス UI ではサポートされていません。) | 単一オブジェクトファイルの最大サイズ(MB 単位)。デフォルト値は 10,000 × 10 MB です。これは log4j でのログファイルサイズの制御に似ています。OSS のフラグメントアップロードを使用する場合、各ブロックは 10 MB です(これはログファイルローテーションの最小粒度でもあり、`maxFileSize` が 10 MB 未満の場合、10 MB として扱われます)。各 OSS InitiateMultipartUploadRequest は最大 10,000 ブロックをサポートしています。 ローテーションが発生すると、オブジェクト名は元のオブジェクトプレフィックスに _1, _2, _3 などのサフィックスとランダムな UUID を付加して形成されます。 説明
| いいえ | 100,000 |
suffix (高度な設定。コードレス UI ではサポートされていません。) | 生成されるファイル名のサフィックス。例: suffix を .csv に設定した場合、最終的なファイル名は fileName****.csv になります。 | いいえ | なし |
付録:Parquet データ型の変換ポリシー
`parquetSchema` パラメーターを設定しない場合、DataWorks は以下のポリシーに従ってソースフィールドのデータ型を変換します。
変換後のデータ型 | Parquet 型 | Parquet 論理型 |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | 該当なし |
BINARY / VARBINARY | BINARY | 該当なし |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | 該当なし |
BIGINT | INT64 | 該当なし |
FLOAT | FLOAT | 該当なし |
DOUBLE | DOUBLE | 該当なし |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | 該当なし |