OSS データソースは、OSS からのデータの読み取りと OSS へのデータの書き込みのための双方向チャネルを提供します。このトピックでは、DataWorks が OSS データソースとデータを同期する方法について説明します。
サポートされているフィールドタイプと制限
オフライン読み取り
OSS Reader は、非構造化データを格納するためのサービスである OSS からデータを読み取り、そのデータを Data Integration プロトコルに変換します。OSS Reader は、次の機能をサポートしています。
サポート | 非サポート |
|
|
OSS でデータを準備する際、データが CSV ファイルにある場合、そのファイルは標準の CSV 形式である必要があります。たとえば、列に二重引用符 (") が含まれている場合は、それを 2 つの二重引用符 ("") に置き換える必要があります。そうしないと、ファイルが正しく分割されない可能性があります。ファイルに複数の区切り文字が含まれている場合は、テキストファイルタイプを使用してください。
OSS は非構造化データソースです。データを同期する前に、フィールド構造が期待どおりであることを確認してください。同様に、ソースのデータ構造が変更された場合は、タスク設定のフィールド構造を更新する必要があります。そうしないと、同期中にデータが文字化けする可能性があります。
オフライン書き込み
OSS Writer は、データ同期プロトコルからテキストファイルにデータを変換し、非構造化データを格納するためのサービスである OSS に書き込みます。OSS Writer は、次の機能をサポートしています。
サポート | 非サポート |
|
|
型カテゴリ | Data Integration 列設定タイプ |
整数 | LONG |
文字列 | STRING |
浮動小数点 | DOUBLE |
ブール値 | BOOLEAN |
日付と時刻 | DATE |
リアルタイム書き込み
リアルタイム書き込みがサポートされています。
単一テーブルから Hudi (0.12.x)、Paimon、Iceberg などのデータレイクへのリアルタイム書き込みがサポートされています。
データソースの作成
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターのインフォチップを表示して、パラメーターの意味を理解できます。
クロスアカウントアクセスのために OSS データソースを作成する場合、関連するアカウントに必要な権限を付与する必要があります。詳細については、「バケットポリシーを使用して OSS へのクロスアカウントアクセスを実装する」をご参照ください。
RAM ロールを使用して OSS データソースを設定するには、「RAM ロールを使用して権限付与のためにデータソースを設定する」をご参照ください。
クロスリージョン接続のために OSS データソースを作成する場合、パブリックエンドポイントを使用する必要があります。詳細については、「エンドポイントとネットワーク接続の概要」をご参照ください。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのオフライン同期タスクの設定
詳細については、「コードレス UI でタスクを設定する」および「コードエディタでタスクを設定する」をご参照ください。
コードエディタのパラメーターとスクリプトデモの完全なリストについては、「付録: スクリプトデモとパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの設定
詳細については、「Data Integration でリアルタイム同期タスクを設定する」および「DataStudio でリアルタイム同期タスクを設定する」をご参照ください。
完全なデータベース同期タスクの設定
詳細については、「完全なデータベースのオフライン同期タスク」および「完全なデータベースのリアルタイム同期タスク」をご参照ください。
よくある質問
付録: スクリプトデモとパラメーターの説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいてスクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでタスクを設定する」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明しています。
Reader スクリプトデモ: 一般的な例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"oss",// プラグイン名。
"parameter":{
"nullFormat":"",// null と解釈できる文字列を定義します。
"compress":"",// テキスト圧縮タイプ。
"datasource":"",// データソース。
"column":[// フィールド。
{
"index":0,// 列インデックス。
"type":"string"// データの型。
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", // 時刻フォーマット。
"index":4,
"type":"date"
}
],
"skipHeader":"",// CSV 形式のファイルでヘッダーをスキップするかどうかを指定します。
"encoding":"",// エンコード形式。
"fieldDelimiter":",",// 列区切り文字。
"fileFormat": "",// テキストタイプ。
"object":[]// オブジェクトプレフィックス。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""// 許可されるダーティデータレコードの数。
},
"speed":{
"throttle":true,// throttle を false に設定すると、mbps パラメーターは効果がなく、レート制限は課されません。throttle を true に設定すると、レート制限が課されます。
"concurrent":1, // 同時ジョブの数。
"mbps":"12"// レート制限。1 Mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader スクリプトデモ: OSS から ORC または Parquet ファイルを読み取る
OSS から ORC または Parquet 形式のファイルを読み取るために、DataWorks は HDFS Reader を再利用します。既存の OSS Reader パラメーターに加えて、Path (ORC) や FileFormat (ORC, Parquet) などの他の設定パラメーターもサポートされています。
次の例は、OSS から ORC ファイルを読み取る方法を示しています。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }次の例は、OSS から Parquet ファイルを読み取る方法を示しています。
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。 | はい | なし |
Object | OSS から同期する 1 つ以上のオブジェクトを指定します。明示的なパス、ワイルドカード文字、または動的パラメーターを使用してオブジェクトを指定できます。 1. 設定方法
重要
2. 同時読み取りメカニズムとパフォーマンス 設定方法は、データ抽出の同時パフォーマンスに直接影響します:
| はい | なし |
parquetSchema | このパラメーターは、OSS から Parquet ファイルを読み取る際に設定します。[fileFormat] が [parquet] に設定されている場合にのみ有効です。このパラメーターは、Parquet ファイルに格納されているデータ型を指定します。このパラメーターを指定した後、全体の設定が JSON 構文に準拠していることを確認する必要があります。 parquetSchema パラメーターの形式は次のとおりです:
次の例は、このパラメーターを設定する方法を示しています。 | いいえ | なし |
column | 読み取るフィールドのリスト。`type` はソースデータのデータ型を指定します。`index` はデータを読み取る列を指定します。`index` の値は 0 から始まります。`value` は現在の列が定数であることを指定します。データはソースファイルから読み取られず、このパラメーターの値に基づいて自動的に生成されます。 デフォルトでは、すべてのデータを STRING 型として読み取ることができます。設定は次のとおりです。 列フィールド情報を指定できます。設定は次のとおりです。 説明 列情報を指定する場合、`type` パラメーターを指定する必要があります。`index` または `value` パラメーターのいずれかを指定する必要があります。 | はい | すべてのデータは STRING 型として読み取られます。 |
fileFormat | OSS 内のソースファイルの形式。有効な値: csv および text。両方の形式でカスタム区切り文字がサポートされています。 | はい | csv |
fieldDelimiter | ソースファイル内の列を区切る区切り文字。 説明 OSS Reader がデータを読み取る場合、列区切り文字を指定する必要があります。指定しない場合、デフォルトでコンマ (,) が使用されます。コンマ (,) は UI のデフォルト値でもあります。 区切り文字が表示可能な文字でない場合は、その Unicode 表現を入力します。例: \u001b または \u007c。 | はい | , |
lineDelimiter | ソースファイル内の行を区切る区切り文字。 説明 このパラメーターは、fileFormat が text に設定されている場合にのみ有効です。 | いいえ | なし |
compress | テキストファイルの圧縮形式。デフォルト値は空で、ファイルが圧縮されていないことを示します。有効な値: gzip、bzip2、zip。 | いいえ | 非圧縮 |
encoding | ソースファイルのエンコード形式。 | いいえ | utf-8 |
nullFormat | テキストファイルでは、標準の文字列を使用してヌルポインタを定義することはできません。Data Integration は、null と解釈できる文字列を定義するために nullFormat パラメーターを提供します。例:
| いいえ | なし |
skipHeader | CSV 形式のファイルでヘッダーをスキップするかどうかを指定します。デフォルト値は false です。skipHeader パラメーターは圧縮ファイルではサポートされていません。 | いいえ | false |
csvReaderConfig | CSV ファイルを読み取るためのパラメーター。このパラメーターは Map 型です。CsvReader は CSV ファイルの読み取りに使用されます。複数のパラメーターを設定できます。このパラメーターを設定しない場合、デフォルト値が使用されます。 | いいえ | なし |
Writer スクリプトデモ: 一般的な例
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",// プラグイン名。
"parameter":{
"nullFormat":"",// Data Integration は、null と解釈できる文字列を定義するために nullFormat パラメーターを提供します。
"dateFormat":"",// 日付フォーマット。
"datasource":"",// データソース。
"writeMode":"",// 書き込みモード。
"writeSingleObject":"false", // 同期されたデータを単一の OSS ファイルに書き込むかどうかを指定します。
"encoding":"",// エンコード形式。
"fieldDelimiter":",",// 列区切り文字。
"fileFormat":"",// テキストタイプ。
"object":""// オブジェクトプレフィックス。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許可されるダーティデータレコードの数。
},
"speed":{
"throttle":true,// throttle を false に設定すると、mbps パラメーターは効果がなく、レート制限は課されません。throttle を true に設定すると、レート制限が課されます。
"concurrent":1, // 同時ジョブの数。
"mbps":"12"// レート制限。1 Mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer スクリプトデモ: ORC または Parquet ファイルを OSS に書き込むスクリプトの設定
DataWorks は HDFS Writer を再利用して、ORC または Parquet ファイルを OSS に書き込みます。既存の OSS Writer パラメーターに加えて、Path や FileFormat などの追加パラメーターもサポートされています。パラメーターの詳細については、「HDFS Writer」をご参照ください。
次の例は、ORC または Parquet ファイルを OSS に書き込む方法を示しています:
次のコードは一例です。列名と型に基づいてパラメーターを変更する必要があります。コードを直接コピーしないでください。
データを ORC 形式で OSS に書き込む
ORC ファイルはコードエディタでのみ書き込むことができます。fileFormat を
orcに設定し、path を宛先ファイルパスに設定し、column を{"name":"your column name","type": "your column type"}形式で設定します。書き込み操作では、次の ORC データ型がサポートされています:
フィールドタイプ
OSS へのオフライン書き込み (ORC 形式)
TINYINT
サポート
SMALLINT
サポート
INT
サポート
BIGINT
サポート
FLOAT
サポート
DOUBLE
サポート
TIMESTAMP
サポート
DATE
サポート
VARCHAR
サポート
STRING
サポート
CHAR
サポート
BOOLEAN
サポート
DECIMAL
サポート
BINARY
サポート
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }データを Parquet 形式で OSS に書き込む
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。 | はい | なし |
object | OSS Writer によって書き込まれるファイルの名前。OSS はファイル名を使用してフォルダをシミュレートします。OSS にはオブジェクト名に関する次の制限があります:
サフィックスとしてランダムな UUID を使用したくない場合は、 | はい | なし |
ossBlockSize | OSS パートのサイズ。デフォルトサイズは 16 MB です。このパラメーターは、ファイルが parquet または ORC 形式で書き込まれる場合にのみサポートされます。このパラメーターは object パラメーターと同じレベルで追加できます。 OSS のマルチパートアップロードは最大 10,000 パートをサポートするため、単一ファイルのデフォルトサイズは 160 GB に制限されます。パート数が制限を超えた場合は、パートサイズを大きくして、より大きなファイルのアップロードをサポートできます。 | いいえ | 16 |
writeMode | OSS Writer がデータを書き込む前に使用するデータ処理方法:
| はい | なし |
writeSingleObject | データを OSS の単一ファイルに書き込むかどうかを指定します:
説明
| いいえ | false |
fileFormat | 書き込むファイルの形式。次の形式がサポートされています:
| いいえ | text |
compress | OSS に書き込むデータファイルの圧縮形式。このパラメーターはコードエディタで設定する必要があります。 説明 csv および text ファイルタイプでは圧縮はサポートされていません。Parquet および ORC ファイルは、gzip や snappy などの圧縮形式をサポートしています。 | いいえ | なし |
fieldDelimiter | 宛先ファイル内の列を区切る区切り文字。 | いいえ | , |
encoding | 宛先ファイルのエンコード形式。 | いいえ | utf-8 |
parquetSchema | このパラメーターは、データを Parquet 形式で OSS に書き込む場合に必須です。宛先ファイルの構造を記述するために使用されます。このパラメーターは、[fileFormat] が [parquet] に設定されている場合にのみ有効です。形式は次のとおりです。 設定項目は次のとおりです:
説明 各行の設定は、最後の行を含め、セミコロンで終わる必要があります。 次の例を考えてみましょう。 | いいえ | なし |
nullFormat | テキストファイルでは、標準の文字列を使用して null (ヌルポインタ) を定義することはできません。データ同期システムは、null 値を表す文字列を定義するために nullFormat パラメーターを提供します。たとえば、 | いいえ | なし |
header | OSS に書き込まれるファイルのヘッダー。例: | いいえ | なし |
maxFileSize (高度な設定、コードレス UI ではサポートされていません) | OSS に書き込まれる単一オブジェクトファイルの最大サイズ。デフォルト値は 10,000 × 10 MB です。これは、log4j ログを印刷する際のログファイルのサイズを制御するのと似ています。OSS がマルチパートアップロードを実行する場合、各パートのサイズは 10 MB です。これは、ログファイルのローテーションの最小粒度でもあります。10 MB 未満の maxFileSize 値は 10 MB として扱われます。各 OSS InitiateMultipartUploadRequest は最大 10,000 パートをサポートします。 ローテーションが発生すると、オブジェクト名は、元のオブジェクトプレフィックスに UUID と _1, _2, or _3 などのサフィックスを追加して形成されます。 説明
| いいえ | 100,000 |
suffix (高度な設定、コードレス UI ではサポートされていません) | データが書き込まれるときに生成されるファイル名のサフィックス。たとえば、suffix を .csv に設定すると、最終的なファイル名は fileName****.csv になります。 | いいえ | なし |
付録: Parquet データ型の変換ポリシー
parquetSchema パラメーターを設定しない場合、DataWorks は事前定義されたポリシーに基づいてソースフィールドのデータ型を変換します。次の表に、この変換ポリシーを示します。
変換後のデータ型 | Parquet 型 | Parquet 論理型 |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | 該当なし |
BINARY / VARBINARY | BINARY | 該当なし |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | 該当なし |
BIGINT | INT64 | 該当なし |
FLOAT | FLOAT | 該当なし |
DOUBLE | DOUBLE | 該当なし |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | 該当なし |