すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Kafka データの MaxCompute への移行に関するベストプラクティス

最終更新日:Feb 25, 2026

このトピックでは、DataWorks Data Integration を使用して Kafka クラスターから MaxCompute にデータを移行する方法について説明します。

前提条件

  • MaxCompute が有効化されていること

  • MaxCompute データソースが追加されていること。 詳細については、「MaxCompute 計算リソースの関連付け」をご参照ください。

  • DataWorks ワークスペースにワークフローが作成されていること。 このトピックでは、基本モードのワークスペースのワークフローを例として使用します。 詳細については、「ワークフローの作成」をご参照ください。

  • Kafka クラスターがセットアップされていること。

    データ移行を実行する前に、ご利用の Kafka クラスター環境が動作可能であることを確認してください。 このトピックでは、Alibaba Cloud EMR サービスを使用して Kafka クラスターを自動的にデプロイします。 詳細な手順については、「Kafka クイックスタート」をご参照ください。

    このトピックで使用される EMR Kafka クラスターには、次の構成があります:

    • EMR バージョン: EMR-3.12.1

    • クラスタータイプ: Kafka

    • ソフトウェア: Ganglia 3.7.2、ZooKeeper 3.4.12、Kafka 2.11-1.0.1、および Kafka-Manager 1.3.X.XX

    Kafka クラスターは、中国 (杭州) リージョンの VPC を使用します。 プライマリインスタンスグループの ECS 計算リソースは、パブリック IP アドレスとプライベート IP アドレスで構成されています。

背景情報

Kafka は、分散型のパブリッシュ/サブスクライブミドルウェアです。 高いパフォーマンスとスループットにより広く使用されており、1 秒あたり数百万のメッセージを処理できます。 Kafka はストリームデータの処理に適しており、主にユーザー行動追跡やログ収集などのシナリオで使用されます。

一般的な Kafka クラスターには、複数のプロデューサー、ブローカー、コンシューマー、および ZooKeeper クラスターが含まれます。 Kafka クラスターは ZooKeeper を使用して構成を管理し、サービスを調整します。

トピックはメッセージのコレクションであり、Kafka クラスターの基本的な概念です。 トピックはメッセージストレージの論理的な概念です。 トピックは物理ディスクには保存されません。 代わりに、トピック内のメッセージは、クラスター内の異なるノードのディスク上のパーティションに保存されます。 複数のプロデューサーがトピックにメッセージを送信でき、複数のコンシューマーがトピックからメッセージをプル (消費) できます。

メッセージがパーティションに追加されると、オフセットが割り当てられます。 オフセットは 0 から始まる番号付けを使用し、そのパーティション内のメッセージの一意の ID として機能します。

ステップ 1: Kafka データの準備

Kafka クラスターでテストデータを作成します。 EMR クラスターのヘッダーホストにログインでき、MaxCompute と DataWorks がホストと通信できるようにするには、EMR クラスターのヘッダーホストのセキュリティグループを構成して、TCP ポート 22 と 9092 でのトラフィックを許可する必要があります。

  1. EMR クラスターのヘッダーホストにログインします。

    1. EMR Hadoop コンソールに移動します。

    2. 上部のナビゲーションバーで、[クラスター管理] をクリックします。

    3. 表示されたページで、テストデータを作成するクラスターを見つけ、その詳細ページに移動します。

    4. クラスターの詳細ページで、ホストリストをクリックし、EMR クラスターのヘッダーホストのアドレスを見つけ、SSH 接続を使用してホストにリモートでログインします。

  2. テストトピックを作成します。

    次のコマンドを実行して、テストトピック testkafka を作成します:

    kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
  3. テストデータを書き込みます。

    testkafka トピックにデータを書き込むプロデューサーをシミュレートするには、次のコマンドを実行します。 Kafka はストリームデータを処理するため、トピックに継続的にデータを書き込むことができます。 有効なテスト結果を保証するために、10 件以上のデータレコードを書き込んでください。

    kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka

    別の SSH ウィンドウを開き、次のコマンドを実行して、データが Kafka に正常に書き込まれたかどうかを検証するコンシューマーをシミュレートします。 データが正常に書き込まれると、データが表示されます。

    kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning

ステップ 2: DataWorks でターゲットテーブルを作成

DataWorks でターゲットテーブルを作成し、Kafka からのデータを保存します。

  1. [データ開発] ページに移動します。

    1. DataWorks コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[データ開発とO&M] > [データ開発] をクリックします。

    3. ドロップダウンリストから対応するワークスペースを選択し、 [Data Studio に移動] をクリックします。

  2. 目的のビジネスフローを右クリックし、[テーブルの作成] > [MaxCompute] > [テーブル]を選択します。

  3. [テーブルの作成] ダイアログボックスで、テーブル名を入力し、[作成] をクリックします。

    説明
    • テーブル名は文字で始まる必要があり、漢字や特殊文字を含めることはできません。

    • Data Studio に複数の MaxCompute データソースをアタッチする場合、適切な [MaxCompute エンジンインスタンス] を選択してください。

  4. テーブル編集ページで、[DDL 文] をクリックします。

  5. [DDL] ダイアログボックスで、次のテーブル定義文を入力し、[テーブルスキーマの生成] をクリックします。

    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;

    各列は、DataWorks Data Integration の Kafka リーダーのデフォルト列に対応しています:

    • __key__:メッセージキー。

    • __value__:完全なメッセージ内容。

    • __partition__:現在のメッセージが存在するパーティション。

    • __headers__:現在のメッセージのヘッダー。

    • __offset__:現在のメッセージのオフセット。

    • __timestamp__:現在のメッセージのタイムスタンプ。

    カスタム名を指定することもできます。 詳細については、「Kafka リーダー」をご参照ください。

  6. [本番環境にコミット][確認] をクリックします。

ステップ 3: データの同期

  1. Data Integration 専用リソースグループを作成します。

    Kafka リーダーは、Data Integration 用共有リソースグループでは期待どおりに実行できません。 データを同期するには、Data Integration 専用リソースグループを使用する必要があります。 詳細については、「Data Integration 専用リソースグループの使用」をご参照ください。

  2. データ統合ノードを作成します。

    1. Data Studio ページに移動し、必要なワークフローを右クリックして、[ノードの作成] > [Data Integration] > [オフライン同期] を選択します。

    2. [ノードの作成] ダイアログボックスで、ノードの [名前] を入力し、[確認] をクリックします。

  3. 上部のナビゲーションバーで、Conversion script アイコンを選択します。

  4. スクリプトモードで、** アイコンをクリックします。

  5. スクリプトを構成します。 サンプルコードは次のとおりです:

    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_source",// MaxCompute データソース名
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1
            }
        }
    }

    EMR Kafka クラスターのマスターノードで `kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list` コマンドを実行して、コンシューマーグループ名と `group.id` パラメーターの値を確認できます。

    • コマンド例

      kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    • 結果

      _emr-client-metrics-handler-group
      console-consumer-69493
      console-consumer-83505
      console-consumer-21030
      console-consumer-45322
      console-consumer-14773

    たとえば、`console-consumer-83505` の場合、ヘッダーホストで `kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505` コマンドを実行して、`beginOffset` および `endOffset` パラメーターの値を確認できます。

    • コマンド例

      kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    • 結果

      TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
      testkafka                      6          0               0               0          -                                                 -                              -
      test                           6          3               3               0          -                                                 -                              -
      testkafka                      0          0               0               0          -                                                 -                              -
      testkafka                      1          1               1               0          -                                                 -                              -
      testkafka                      5          0               0               0          -                                                 -                              -
  6. スケジュールリソースグループを構成します。

    1. ノード編集ページの右側のナビゲーションバーで、 [スケジュール設定] ボタンをクリックします。

    2. [リソースプロパティ] セクションで、[スケジュールリソースグループ] に、作成した Data Integration 専用リソースグループを選択します。

      説明

      Kafka データを MaxCompute に定期的に (たとえば 1 時間ごとに) 書き込むには、`beginDateTime` および `endDateTime` パラメーターを使用してデータ読み取り間隔を 1 時間に設定します。 その後、データ統合タスクを 1 時間に 1 回実行するようにスケジュールできます。 詳細については、「Kafka リーダー」をご参照ください。

  7. ** アイコンをクリックしてコードを実行します。

  8. [操作ログ] で結果を表示できます。

次のステップ

Data Studio でタスクを作成して SQL 文を実行し、ApsaraMQ for Kafka からのデータがテーブルに同期されているかどうかを確認できます。 このトピックでは、select * from testkafka 文を例として使用します。 タスクを作成する手順は次のとおりです:

  1. [データ開発] ページに移動します。

    1. DataWorks コンソールにログインします。

    2. 左側のナビゲーションウィンドウで [ワークスペース] をクリックして、[ワークスペース] ページに移動します。

    3. 上部のナビゲーションバーでターゲットリージョンに切り替え、作成したワークスペースを見つけ、[アクション] 列で [ショートカット] > [データ開発] をクリックして Data Studio ページに移動します。

  2. DataStudio ページの左側のナビゲーションウィンドウで、image アイコンをクリックして [アドホッククエリ] ペインに移動します。 [アドホッククエリ] ペインで、image アイコンをクリックし、[作成] > [ODPS SQL] を選択します。

  3. [ノードの作成] ダイアログボックスで、[パス][名前] パラメーターを構成できます。

  4. [確認] をクリックします。

  5. ノード構成ページで、select * from testkafka 文を入力し、image アイコンをクリックしてノードを実行し、操作ログを表示します。

    image