Data Integration は、ウィザードモードとスクリプトモードでのデータ同期をサポートします。 スクリプトモードはより柔軟ですが、ウィザードモードはより簡単です。
ここでは、Data Integration のスクリプトモードを使用して、Table Store の増分データを OpenSearch に同期させる方法について説明します。
チャンネル
Data Integration のスクリプトモード
- リーダー: OTSStream Reader
- ライター: OSSWriter
Table Store の設定
事前の設定は不要です。
OSS の設定
事前の設定は不要です。
Data Integration の設定
- Table Store データソースを作成します。
注
- Table Store データソースを既に作成している場合は、この手順をスキップしてください。
- データソースを作成したくない場合は、後続の設定ページでエンドポイント、instanceName、AccessKeyID および AccessKeySecretを指定できます。
データソースの作成方法の詳細については、 「Table Store データソースを作成する」をご参照ください。
- OSS データソースの作成
この手順は手順 1 とほぼ同じです。 データソースとして OSS を選択するだけです。注 OSS データソースのパラメーター設定中、Endpoint に bucketName は含まれません。
- 同期タスクの構成
- Data Integration コンソールにログインします。
- 「同期タスク」ページで、 [スクリプトモード] をクリックします。
- 表示される「インポートテンプレート」ダイアログボックスで、 ソースのタイプを Table Store ストリーム (OTS ストリーム) に、対象のタイプを OSS に設定します。
- [OK] をクリックして構成を終了します。
- 設定項目の設定
- 設定ページでは、OTSStreamReader と OSSWriter のテンプレートが提供されています。 以下の注釈を参照して構成を完了してください。
{ "type": "job", "version": "1.0", "configuration": { "setting": { "errorLimit": { "record": "0" # Allowed number of errors. エラー数が値を上回る場合、同期タスクは失敗します。 }, "speed": { "mbps": "1", # Maximum traffic of each synchronization task. "concurrent": "1" # Number of concurrent synchronization tasks each time. } }, "reader": { "plugin": "otsstream", # Name of the Reader plugin. "parameter": { "datasource": "", # Name of the Table Store data source. このパラメーターが設定されている場合、エンドポイント、accessID、acessKey、instanceNameの設定は不要です。 "dataTable": "", # Name of the table in Table Store. "statusTable": "TableStoreStreamReaderStatusTable", # Table Store Stream ステータスを保存しているテーブルは、デフォルト値を使用することを推奨します。 "startTimestampMillis": "", # Start time of the export. 増分エクスポートモードでは、タスクは周期的に実行される必要があり、開始時間はそれぞれの実行によって異なります。 そのため、たとえば $(start_time) のような変数の設定が必要です。 "endTimestampMillis": "", # End time of the export. $(end_time) のような変数の設定が必要です。 "date": "yyyyMMdd", # Date from which data is exported. このパラメータは、startTimestampMillis および endTimestampMillis と同じであるため、削除する必要があります。 "mode": "single_version_and_update_only", # Table Store Stream からのデータエクスポートのフォーマット。 パラメーターはsingle_version_and_update_only に設定します。 設定テンプレートにこのパラメーターが含まれていない場合、追加します。 "column":[ # Names of the columns to be exported from Table Store to OSS. 設定テンプレートにこのパラメーターが含まれていない場合、追加します。 必要に応じてこのパラメーターを設定します。 { "name": "uid" # Name of the column. Table Store のプライマリキー列です。 }, { "name": "name" # Name of the column. Table Storeの属性列です。 }, ], "isExportSequenceInfo": false, # このパラメーターは single_version_and_update_only モード時のみ false に設定できます。 "maxRetries": 30 # Maximum number of retry times. } }, "writer": { "plugin": "oss", # Name of the Writer plugin "parameter": { "datasource": "", # Name of the OSS data source "object": "", # Prefix of the name of the last file to be backed up to OSS. 推奨する値は Table Store インスタンス名、テーブル名、データ、あるいはたとえば "instance/table/{date}" です。 "writeMode": "truncate", # truncate, append, and nonConflict are supported. truncate は同じ名前の既存ファイルを削除する際に使用され、append は同じ名前の既存ファイルにデータを追加する際に使用されます。nonConflict は同じ名前のファイルが存在する時にエラーを返します。 "fileFormat": "csv", # File format "encoding": "UTF-8", # Encoding mode "nullFormat": "null", # Mode of representation in a TXT file under control "dateFormat": "yyyy-MM-dd HH:mm:ss", # # Time format "fieldDelimiter": "," # Delimiter of each column } } } }
注 詳しい設定の説明は、「OTSStreamReader の設定」および「OSSWriter の設定」をご参照ください。 - [保存] をクリックします。
- 設定ページでは、OTSStreamReader と OSSWriter のテンプレートが提供されています。 以下の注釈を参照して構成を完了してください。
- タスクの実行
- [操作] をクリックします。
- 表示されたダイアログボックスで、変数パラメーターを設定します。
- [OK] をクリックします。
- タスクが完了したら、 OSS コンソールにログインして、ファイルがバックアップされているかどうかを確認します。
- スケジューリングを設定します。
- [送信] をクリックします。
- 表示されたダイアログボックスで、スケジューリングパラメーターを設定します。
パラメーターは次のように記述されます。
パラメーター 説明 スケジューリングのタイプ [サイクル制御] をクリックします。 自動的再試行 このパラメーターは、タスクが失敗した場合に、タスクが 2 分間隔で 3 回再実行されることを示します。 開始日 デフォルト値が推奨されます。これは 1970 年 1 月 1 日から 100 年後までです。 スケジューリング周期 分を選択します。 開始時間 「00:00 から 23:59」を選択します。これは、1 日中スケジュールが必要であることを示します。 間隔 5 分を選択します。 start_time "$[yyyymmddhh24miss-10/24/60]" と入力します。これは、スケジューリングタスクの時間から 10 分を引いた時間を示します。 end_time "$[yyyymmddhh24miss-5/24/60]" と入力します。これは、スケジューリングタスクの時間から 5 分引いた時間を示しています。 date "${bdp.system.bizdate}" と入力します。これは、スケジュール日を示します。 依存属性 依存関係が存在する場合は、このパラメーターを設定してください。 依存関係が存在しない場合は、このパラメーターを設定しないでください。 クロスサイクル依存 自己依存は、前のスケジューリングサイクルが完了した後にのみ操作を続行できます。 - [OK] をクリックします。
定期同期タスクが設定されており、設定ファイルのステータスは読み取り専用です。
- タスクを確認します。
- ページの上部にある [オペレーションセンター] をクリックします。
- 左側のナビゲーションウィンドウで、 をクリックします。作成した同期タスクを表示します。
- 新しいタスクは翌日の 00:00 に実行を開始します。
- 左側のナビゲーション ウィンドウ、 をクリックします。 その日の事前に作成された各同期タスクを表示します。 スケジューリング間隔は 5 分で、各タスクは過去 5 分から 10 分のデータを処理します。
-
インスタンス名をクリックして詳細を表示します。
- タスクの実行中または完了後にログを表示できます。
- OSS にエクスポートされたデータを確認してください。
OSS コンソールにログインします。新しいファイルが生成されたかどうか、およびファイルの内容が正しいかどうかを確認します。
上記の設定が完了すると、Table Store のデータは 5 〜 10 分のレイテンシで OSS に自動的に同期されます。