このページでは、MaxCompute Writer でサポートされるデータ型とパラメーター、およびコードレスユーザーインターフェイス (UI) とコードエディターを使用してデータ型とパラメーターを設定する方法について説明します。
MaxCompute Writer は、開発者が MaxCompute にデータを挿入または更新するために設計されたものです。 MaxCompute Writer は、GB または TB レベルのデータを MaxCompute にインポートするのに適しています。
MaxCompute Writer は、ソースプロジェクト、テーブル、パーティション、フィールドなどの指定された情報に基づいて、トンネルを使用して MaxCompute にデータを書き込みます。 一般的なトンネルコマンドの詳細については、「トンネルコマンド」 をご参照ください 。
Data Integration は、MySQL や MaxCompute などのソースデータベースの厳密なスキーマを持つテーブルからデータをメモリに読み取り、取得したデータを宛先データストアがサポートする形式に変換してから、宛先データストアにデータを書き込みます。
パラメーター
パラメーター | 説明 | 必須事項 | デフォルト値 |
---|---|---|---|
datasource | 接続名。 追加された接続の名前と同じである必要があります。 コードエディターで接続を追加できます。 | 必須 | なし |
table | 宛先テーブルの名前。 名前は大文字と小文字が区別されません。 宛先テーブルとして指定できるテーブルは 1 つだけです。 | 必須 | なし |
partition | データが書き込まれるパーティション。 最終レベルのパーティションを指定する必要があります。 たとえば、3 レベルのパーティション分割テーブルにデータを書き込む場合は、partition
パラメーターを、次のような最終レベルのパーティション情報を含む値に設定します。 pt=20150101, type=1, biz=2 。
|
パーティション分割テーブルにデータを書き込む場合にのみ必要 | なし |
column | データが書き込まれる宛先テーブルの列。 データを宛先テーブルのすべての列に書き込む場合は、値をアスタリスク (*) に設定します。 つまり、column パラメーターを次のように設定します。
"column": ["*"] 。 データを宛先テーブルの一部の列にのみ書き込む場合は、値を指定された列に設定します。 複数入力する場合は、カンマ (,) で区切ります。 例: "column": ["id","name"] 。
|
必須 | なし |
truncate | 書き込み操作のべき等性を保証するには、値を true に設定します。 つまり、truncate パラメーターを次のように設定します。 "truncate": "true" 。 書き込みエラーが原因で失敗した同期ノードが再実行されると、MaxCompute Writer は、ソースデータを再度インポートする前に、書き込まれたデータを削除します。
これにより、再実行ごとに同じデータが書き込まれることが保証されます。
MaxCompute Writer は、MaxCompute SQL を使用してデータを削除します。 MaxCompute SQL は原子性を保証できません。 したがって、切り捨て操作はアトミック操作ではありません。 同時実行ノードが同じテーブルまたはパーティションからデータを削除する場合、競合が発生する可能性があります。 この問題を回避するには、同じパーティションにデータを書き込むために、DDL ノードを同時に実行しないことを推奨します。 同時に実行する必要があるノードに対して、異なるパーティションを作成できます。 |
必須 | なし |
コードレス UI を使用して MaxCompute Writer を設定する
- 接続を設定します。
同期ノードのソース接続と宛先接続を設定します。
パラメーター 説明 接続 前述のパラメーターの説明の datasource パラメーター。 接続タイプを選択し、DataWorks で設定されている接続の名前を入力します。 テーブル 前述のパラメーターの説明のtable パラメーター。 パーティションキー列 宛先テーブルのすべての列にデータを書き込むには、次のように入力します。 "column": ["*"] 。 partition パラメーターはワイルドカードをサポートし、1 つ以上のパーティションを含みます。 "partition":"pt=20140501/ds=*"
pt=20140501 ですべての ds パーティションにデータが書き込まれることを指定します。"partition":"pt=top?"
データが pt=top および pt=to でパーティションに書き込まれることを指定します。
データが書き込まれるパーティションキー列を指定できます。 MaxCompute テーブルのパーティションキー列が pt=${bdp.system.bizdate} である場合、 データが書き込まれる列を pt に設定できます。 列が未識別としてマークされている場合は無視してください。- すべてのパーティションにデータを書き込むには、次のように入力します。 pt=* 。
- 一部のパーティションにデータを書き込むには、対応する日付を指定します。
書き込みルール - 元のデータを削除して書き込む (上書きを挿入) :テーブルまたはパーティション内のすべてのデータは、データのインポート前に削除されます。 このルールは、
INSERT OVERWRITE
文と同じです。 - 元のデータを保持して書き込む (挿入) :データのインポート前にデータは削除されません。 新しいデータが常に追加され実行されます。 このルールは、
INSERT INTO
文と同じです。
注- MaxCompute Reader は、トンネルを使用してデータを読み取ります。 同期ノードはデータフィルタリングをサポートしていません。 代わりに、特定のテーブルまたはパーティションのすべてのデータを読み取る必要があります。
- MaxCompute Writer は、INSERT INTO 文を使用する代わりに、トンネルを使用してデータを書き込みます。 同期ノードが実行された後にのみ、宛先テーブルの完全なデータを表示できます。 ノードの依存関係に注意してください。
空の文字列を Null に変換 デフォルト値は Yes です。 - フィールドのマッピングを設定します (前述のパラメーターの説明のcolumn パラメーター)。
左側のソーステーブルのフィールドには、右側の宛先テーブルのフィールドとの 1 対 1 のマッピングがあります。
- チャンネル制御ポリシーを設定します。
コードエディターを使用して MaxCompute Writer を設定する
次のコードでは、ノードは MaxCompute テーブルにデータを書き込むように設定されています。 パラメーターの詳細については、前述のパラメーターの説明をご参照ください。
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",// ライターのタイプ。
"parameter":{
"partition":"",// パーティション情報。
"truncate":true,// 書き込みルール。
"compress":false,// 圧縮を有効化するかどうかを指定します。
"datasource":"odps_first",// 接続名。
"column": [// データが書き込まれる列。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"emptyAsNull":false,// 空の文字列を null に変換するかどうかを指定します。
"table":""// ターゲットテーブルの名前。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるダーティデータレコード数の上限。
},
"speed":{
"throttle":false,// 帯域幅スロットリングを使用するかどうかを指定。 値が false の場合は、帯域幅が調整されていないことを示します。 値が trueの場合は、帯域幅が調整されていることを示します。 最大伝送速度は、このパラメーターを true に設定した場合にのみ有効になります。
"concurrent":1,// 同時実行スレッド数の上限。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
MaxCompute のトンネルエンドポイントを指定する場合、 コードエディターで接続を設定できます。 接続を設定するには、上記のコードの "datasource":"",
を接続の詳細なパラメーターに置き換えます。 例:
"accessId":"***********",
"accessKey":"*********",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********",
補足情報
- 列フィルター
MaxCompute Writer を設定することで、列のフィルタリング、列の並べ替え、空のフィールドの null への設定など、MaxCompute がサポートしていない操作を実行できます。 宛先テーブルのすべての列にデータを書き込むには、次のように column パラメーターを設定します。
"column": ["*"]
。たとえば、MaxCompute テーブルに a、b、c の 3 つの列があり、列 c と列 b にのみデータを書き込む場合は、次のように column パラメーターを設定できます。
"column": ["c","b"]
。 ソースデータの最初の列と 2 番目の列は、MaxCompute テーブルの列 c と列 b にそれぞれ書き込まれます。 データの同期中、列 a は自動的に null に設定されます。 - 列設定エラー処理
冗長な列のデータの損失を回避し、高いデータ信頼性を保証するために、予想よりも多くの列が書き込まれる場合、MaxCompute Writer はエラーメッセージを返します。 たとえば、MaxCompute テーブルに a、b、c の 3 つの列があり、MaxCompute Writer が、3 つを超える列にデータを書き込むように設定されている場合、エラーメッセージを返します。
- パーティション設定
MaxCompute Writer は、最終レベルのパーティションにのみデータを書き込むことができ、フィールドに基づいて指定されたパーティションへのデータの書き込みをサポートしていません。 パーティション分割テーブルにデータを書き込むには、最終レベルのパーティションを指定します。 たとえば、3 レベルのパーティション分割テーブルにデータを書き込む場合は、partition パラメーターを、次のような最終レベルのパーティション情報を含む値に設定します。
pt=20150101, type=1, biz=2
。 partition パラメーターを次のように設定した場合、データを書き込むことができません。pt=20150101, type=1
またはpt=20150101
。 - ノードの再実行
書き込み操作のべき等性を保証するには、
truncate
パラメーターを true に設定します。 書き込みエラーが原因で失敗した同期ノードが再実行されると、MaxCompute Writer は、ソースデータを再度インポートする前に、書き込まれたデータを削除します。 これにより、再実行ごとに同じデータが書き込まれることが保証されます。 他の例外が原因で同期ノードが中断された場合、データをロールバックできず、ノードを自動的に再実行できません。 truncate パラメーターを true に設定することにより、書き込み操作のべき等性とデータの整合性を保証できます。注 truncate パラメーターが true に設定されている場合、指定されたパーティションまたはテーブルのすべてのデータは、再実行前に削除されます。 このパラメーターを true に設定する場合は注意が必要です。