このトピックでは、DataHub を使用してログデータを MaxCompute にストリーミングし、バッチ処理を行う方法について説明します。DataHub プロジェクトと Topic を作成し、MaxCompute DataConnector を設定して、データが MaxCompute テーブルにフローすることを確認します。
前提条件
MaxCompute へのアクセスが許可されているアカウントに、次の権限が付与されていることを確認します。
MaxCompute プロジェクトに対する CreateInstance 権限
MaxCompute テーブルを表示、変更、および更新する権限
詳細については、「MaxCompute 権限」をご参照ください。
仕組み
DataHub は、ストリーミングデータを処理するために設計されたプラットフォームです。データが DataHub Topic にアップロードされると、リアルタイム処理のために保存されます。DataHub 内の MaxCompute DataConnector は、受信レコードを定期的にバッチ処理し、MaxCompute テーブルに書き込みます。MaxCompute テーブルでは、SQL クエリを実行してバッチ処理を行うことができます。
デフォルトでは、DataHub は 5分間隔、またはバッファリングされたデータが 64 MB に達した場合のいずれか早い方で、MaxCompute への同期をトリガーします。このパイプラインを設定するには、DataHub で DataConnector を作成および構成するだけで済みます。
Log Source ---> DataHub Topic ---> MaxCompute DataConnector ---> MaxCompute Table
(streaming) (batch sync every (partitioned,
5 min or 64 MB) offline query)操作手順
ステップ 1: MaxCompute テーブルの作成
odpscmd クライアント (MaxCompute コマンドラインツール) で、DataHub から同期されるデータを保存するテーブルを作成します。たとえば、次の SQL ステートメントを実行してパーティションテーブルを作成します。
CREATE TABLE test(f1 string, f2 string, f3 double) partitioned by (ds string);ステップ 2: DataHub プロジェクトの作成
DataHub コンソールにログインします。左上隅でリージョンを選択します。
左側のナビゲーションウィンドウで、[プロジェクト] をクリックします。
「プロジェクト」ページの右上隅で、[プロジェクトの作成] をクリックします。
[プロジェクトの作成] パネルで、[名前] と [説明] を設定し、[作成] をクリックします。
ステップ 3: Topic の作成
「プロジェクト」ページで、目的のプロジェクトを見つけ、[操作]列の表示をクリックします。
プロジェクトの詳細ページで、右上隅の[トピックの作成]をクリックします。
[トピックの作成] パネルで、作成タイプとして [MaxCompute テーブルのインポート] を選択し、その他のパラメーターを設定します。

「[次へ]」をクリックして、Topic の構成を完了します。
注 - [スキーマ] は MaxCompute テーブルに対応します。 スキーマで指定するフィールド名、データの型、フィールドの順序は、MaxCompute テーブルと一致している必要があります。 これら 3 つの条件がすべて満たされた場合にのみ、DataConnector を作成できます。 - [TUPLE] 型および [BLOB] 型の Topic を MaxCompute テーブルに移行できます。 - デフォルトでは、最大 20 個の Topic を作成できます。 さらに多くの Topic が必要な場合は、チケットを起票してください。 - DataHub Topic のオーナーまたは Creator アカウントのみが、DataConnector を管理する権限を持ちます。 たとえば、DataConnector の作成や削除ができます。
ステップ 4: MaxCompute DataConnector の作成
プロジェクト詳細ページの [Topic リスト] タブで、新規作成した Topic を探し、[操作] 列の [表示] をクリックします。
Topic 詳細ページで、右上隅にある[コネクタ]をクリックします。
「[コンネクタの作成]」パネルで、[MaxCompute] をクリックし、パラメーターを設定してから、[作成] をクリックします。
ステップ 5: DataConnector の詳細を表示
左側のナビゲーションウィンドウで、[プロジェクト] をクリックします。
「[プロジェクト]」ページで、目的のプロジェクトを見つけ、「[表示]」を操作列でクリックします。
「[Topic リスト]」タブで、トピックを見つけ、[操作]列の[表示]をクリックします。
トピック詳細ページで、[コネクタ] タブをクリックします。
新しく作成された DataConnector を見つけ、[表示] をクリックして DataConnector の詳細を表示します。
デフォルトでは、DataHub は5分間隔またはデータ量が64 MBに達したときに、MaxCompute テーブルにデータを移行します。[Sync Offset] は、移行されたデータエントリの数を示します。

ステップ 6: 移行の検証
次の SQL ステートメントを実行して、ログデータが MaxCompute に移行されたかどうかを確認します。
SELECT * FROM test;次の図に示すように結果が返された場合、ログデータは MaxCompute に正常に移行されています。

次のステップ
データパイプラインが機能していることを確認したら、次の操作を検討してください。
DataConnector のステータスを監視する: トピックの [コネクタ] タブを定期的に確認し、[同期オフセット] が増加していることと、エラーが発生していないことを確認します。
パーティションフィルターでクエリ: クエリでパーティションフィルターを使用し (例:
SELECT * FROM test WHERE ds='<partition_value>';)、大規模なデータセットでのクエリパフォーマンスを向上させます。パイプラインをスケーリング: より高いスループットが必要な場合は、DataHub Topic のシャード数を増やすことができます。
付録: データの型マッピング
次の表に、MaxCompute と DataHub の間のデータの型マッピングを示します。DataHub Topic を作成する際は、スキーマで互換性のあるデータの型を使用する必要があります。
| MaxCompute | DataHub | 注記 |
|---|---|---|
| BIGINT | BIGINT | 直接マッピング。 |
| STRING | STRING | 直接マッピング。 |
| BOOLEAN | BOOLEAN | 直接マッピング。 |
| DOUBLE | DOUBLE | 直接マッピング。 |
| DATETIME | TIMESTAMP | DataHub の TIMESTAMP は MaxCompute の DATETIME にマッピングされます。 |
| DECIMAL | DECIMAL | 直接マッピング。 |
| TINYINT | TINYINT | 直接マッピング。 |
| SMALLINT | SMALLINT | 直接マッピング。 |
| INT | INTEGER | DataHub は INTEGER を使用し、MaxCompute は INT を使用します。 |
| FLOAT | FLOAT | 直接マッピング。 |
| BLOB | STRING | MaxCompute の BLOB データは DataHub の STRING にマッピングされます。 |
| MAP | サポートされていません | MAP 型は DataHub に同期できません。 |
| ARRAY | サポートされていません | ARRAY 型は DataHub に同期できません。 |