すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:SDK を使用して追跡データをコンシュームする

最終更新日:Nov 09, 2025

変更追跡チャネルの追跡タスクと使用者グループを作成した後、Data Transmission Service (DTS) SDK を使用して追跡データをコンシュームできます。このトピックでは、提供されているサンプルコードの使用方法について説明します。

説明

前提条件

注意事項

  • 追跡データをコンシュームする際、DefaultUserRecord の commit メソッドを呼び出してオフセットをコミットする必要があります。そうしないと、データが繰り返しコンシュームされます。

  • 異なるコンシュームプロセスは互いに独立しています。

  • コンソールの [現在のオフセット] は、クライアントによって送信されたオフセットではなく、追跡タスクの現在のオフセットを示します。

手順

  1. SDK サンプルコードファイルをダウンロードし、ファイルを解凍します。

  2. SDK コードのバージョンを確認します。

    1. ファイルを解凍したディレクトリに移動します。

    2. テキストエディターを使用して、ディレクトリ内の pom.xml ファイルを開きます。

    3. 変更追跡 SDK のバージョン (version) を最新バージョンに変更します。

      説明

      最新の Maven 依存関係は、dts-new-subscribe-sdk ページで確認できます。

      SDK バージョンパラメーターの場所 (クリックして展開)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. SDK コードを編集します。

    1. コードエディターを使用して、解凍したファイルを開きます。

    2. SDK クライアントのコンシュームモードに対応する Java ファイルを開きます。

      説明

      Java ファイルのパスは aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/ です。

      使用モード

      Java ファイル

      説明

      シナリオ

      ASSIGN モード

      DTSConsumerAssignDemo.java

      メッセージのグローバルな順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを ASSIGN モードで使用する場合、SDK クライアントを 1 つだけ起動することをお勧めします。

      同じ使用者グループ内で 1 つの SDK クライアントのみが追跡データをコンシュームします。

      SUBSCRIBE モード

      DTSConsumerSubscribeDemo.java

      メッセージのグローバルな順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを SUBSCRIBE モードで使用する場合、1 つの使用者グループで複数の SDK クライアントを同時に起動してディザスタリカバリを実装できます。これは、使用者グループでデータをコンシュームしているクライアントに障害が発生した場合、他の SDK クライアントがランダムかつ自動的にパーティション 0 に割り当てられ、コンシュームを継続するためです。

      同じ使用者グループ内で複数の SDK クライアントが追跡データをコンシュームします。これは、データディザスタリカバリシナリオに適用できます。

    3. Java コードでパラメーターを設定します。

      サンプルコード

      ******        
          public static void main(String[] args) {
              // kafka ブローカーの URL
              String brokerUrl = "dts-cn-***.com:18001";
              // コンシュームする Topic、パーティションは 0
              String topic = "cn_***_version2";
              // 認証用のユーザーパスワードと sid
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // 最初のシークの初期チェックポイント (設定するタイムスタンプ、例: 1566180200 (Mon Aug 19 10:03:21 CST 2019) にしたい場合)
              String initCheckpoint = "1740472***";
              // subscribe モードを使用する場合、グループ構成が必要です。kafka 使用者グループが有効になります
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      パラメーター

      説明

      取得方法

      brokerUrl

      変更追跡チャネルのネットワークアドレスとポート番号。

      説明
      • SDK クライアントをデプロイするサーバー (ECS インスタンスなど) と変更追跡インスタンスが同じ VPC にある場合は、データコンシュームに VPC アドレスを使用してネットワーク遅延を削減することをお勧めします。

      • ネットワークが不安定になる可能性があるため、パブリックエンドポイントの使用は推奨されません。

      DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。これにより 基本情報 ページが開き、ネットワーク セクションからエンドポイントとポート番号を取得できます。

      topic

      変更追跡チャネルの追跡 Topic。

      DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。基本情報 ページで、基本情報 セクションの トピック を見つけます。

      sid

      使用者グループ ID。

      DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。データ消費 ページで、使用者グループの コンシューマーグループ ID /名前アカウント を取得します。

      userName

      使用者グループアカウントのユーザー名。

      警告

      このトピックで提供されているクライアントを使用していない場合は、ユーザー名を <使用者グループのユーザー名>-<使用者グループ ID> 形式に設定してください。例: dtstest-dtsae******bpv。そうしないと、接続は失敗します。

      password

      アカウントのパスワード。

      使用者グループを作成したときに設定した使用者グループアカウントのパスワード。

      initCheckpoint

      コンシューマオフセット。このパラメーターは、SDK クライアントがコンシュームする最初のデータレコードのタイムスタンプを指定します。値は UNIX タイムスタンプ (例: 1620962769) である必要があります。

      説明

      コンシューマオフセットは、次の目的で使用できます:

      • アプリケーションの中断後、最後にコンシュームされたオフセットからデータコンシュームを続行します。これにより、データの損失を防ぎます。

      • クライアントの起動時に特定のオフセットからデータコンシュームを開始します。これにより、特定の時点からデータをコンシュームできます。

      コンシューマオフセットは、変更追跡インスタンスのタイムスタンプ範囲内である必要があり、UNIX タイムスタンプに変換する必要があります。

      説明

      対象の変更追跡インスタンスのデータ範囲は、追跡タスクリストの データ範囲 列で確認できます。

      subscribeMode

      SDK クライアントのコンシュームモード。このパラメーターを変更する必要はありません。

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN モード。

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE モード。

      なし

  4. コードエディターでプロジェクト構造を開き、このプロジェクトの OpenJDK バージョンが 1.8 であることを確認します。

  5. クライアントコードを実行します。

    説明

    初めてコードを実行するとき、コードエディターが必要なプラグインと依存関係を自動的にロードするのに時間がかかる場合があります。

    結果のサンプル (クリックして展開)

    正常な実行結果

    次の結果が返された場合、クライアントは期待どおりに実行されており、ソースデータベースからのデータ変更を追跡できます。

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    正常な追跡結果

    次の結果は、クライアントがソースデータベースからのデータ変更 (UPDATE 操作) を正常に追跡したことを示します。

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    異常な実行結果

    次の結果が返された場合、クライアントはソースデータベースへの接続に失敗しました。

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    SDK クライアントは、データコンシュームに関する統計情報を定期的に収集して表示します。この情報には、送受信されたデータレコードの総数、総データ量、および受信した 1 秒あたりのレコード数 (RPS) が含まれます。

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    パラメーター

    説明

    outCounts

    SDK クライアントによってコンシュームされたデータレコードの総数。

    outBytes

    SDK クライアントによってコンシュームされたデータの総量 (バイト単位)。

    outRps

    SDK クライアントがデータをコンシュームするときの RPS。

    outBps

    SDK クライアントがデータをコンシュームするときの 1 秒あたりの伝送ビット数。

    count

    SDK クライアントのデータコンシューム情報 (メトリック) 内のパラメーターの総数。

    説明

    これには count 自体は含まれません。

    inBytes

    DTS サーバーから送信されたデータの総量 (バイト単位)。

    DStoreRecordQueue

    DTS サーバーがデータを送信するときのデータキャッシュキューの現在のサイズ。

    inCounts

    DTS サーバーから送信されたデータレコードの総数。

    inBps

    DTS サーバーがデータを送信するときの 1 秒あたりの伝送ビット数。

    inRps

    DTS サーバーがデータを送信するときの RPS。

    __dt

    SDK クライアントがデータを受信したときの現在のタイムスタンプ (ミリ秒単位)。

    DefaultUserRecordQueue

    シリアル化後のデータキャッシュキューの現在のサイズ。

  6. 必要に応じてコードを編集して、追跡データをコンシュームします。

    サブスクライブしたデータをコンシュームする際、コンシューマオフセットを管理する必要があります。この方法により、データの損失や重複を防ぎ、オンデマンドでのコンシュームが可能になります。

よくある質問

  • 変更追跡インスタンスに接続できない場合はどうすればよいですか?

    エラーメッセージに基づいて問題をトラブルシューティングします。詳細については、「トラブルシューティング」をご参照ください。

  • 永続化後のコンシューマオフセットのデータ形式は何ですか?

    コンシューマオフセットが永続化されると、JSON 形式で返されます。永続化されたコンシューマオフセットは UNIX タイムスタンプであり、SDK に直接渡すことができます。次の例では、"timestamp" の後の 1700709977 が永続化されたコンシューマオフセットです。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • 追跡タスクは複数のクライアントによる並列コンシュームをサポートしていますか?

    いいえ。SUBSCRIBE モードでは複数のクライアントを並列で実行できますが、常に 1 つのクライアントのみがアクティブにデータをコンシュームできます。

  • SDK コードにはどのバージョンの Kafka クライアントがカプセル化されていますか?

    dts-new-subscribe-sdk のバージョン 2.0.0 以降は、Kafka クライアント (kafka-clients) のバージョン 2.7.0 をカプセル化しています。2.0.0 より前のバージョンは、Kafka クライアントのバージョン 1.0.0 をカプセル化しています。

付録

コンシューマオフセットの管理

SDK クライアントが初めて起動、再起動、または内部リトライを実行する場合、コンシューマオフセットをクエリして渡し、データコンシュームを開始または再開する必要があります。コンシューマオフセットは、SDK クライアントがコンシュームする最初のデータレコードの UNIX タイムスタンプです。

クライアントのコンシューマオフセットをリセットするには、次の表に示すように、コンシュームモード (SDK 使用モード) に基づいてコンシューマオフセットをクエリおよび変更できます。

シナリオ

SDK 使用モード

オフセット管理方法

コンシューマオフセットのクエリ

ASSIGN モード、SUBSCRIBE モード

  • SDK クライアントは 5 秒ごとにメッセージオフセットを保存し、DTS サーバーに送信します。最新のコンシューマオフセットをクエリするには、次の場所を確認します:

    • SDK クライアントが配置されているサーバー上の localCheckpointStore ファイル。

    • サブスクリプションチャネルの [データコンシューム] インターフェイス。

  • consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して、データベースなどの外部永続共有ストレージ媒体を設定した場合、そのストレージ媒体は 5 秒ごとにメッセージオフセットを保存し、クエリできるようになります。

SDK クライアントを初めて起動するときに、コンシューマオフセットを渡してデータをコンシュームする。

ASSIGN モード、SUBSCRIBE モード

SDK クライアントの使用モードに基づいて、DTSConsumerAssignDemo.java または DTSConsumerSubscribeDemo.java ファイルを選択します。次に、コンシューマオフセットを設定 (initCheckpoint) してデータをコンシュームします。

SDK クライアントは、内部リトライのためにデータコンシュームを継続するために、最後に記録されたコンシューマオフセットを再 する必要があります。

ASSIGN モード

次の順序で最後に記録されたコンシューマオフセットを検索します。見つかると、オフセット情報が返されます:

  1. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

  2. SDK クライアントが配置されているサーバー上の localCheckpointStore ファイル。

  3. DTSConsumerSubscribeDemo.java ファイルで initCheckpoint を使用して渡した開始タイムスタンプ。

SUBSCRIBE モード

次の順序で最後に記録されたコンシューマオフセットを検索します。見つかると、オフセット情報が返されます:

  1. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

  2. DTS サーバー (増分データ収集モジュール) によって保存されたオフセット。

    説明

    オフセットは、SDK クライアントが commit メソッドを呼び出した後にのみ更新されます。

  3. DTSConsumerSubscribeDemo.java ファイルで initCheckpoint を使用して渡した開始タイムスタンプ。

  4. DTS サーバー (新しい増分データ収集モジュール) の開始オフセットを使用します。

    重要

    増分データ収集モジュールのスイッチオーバーが発生した場合、新しいモジュールはクライアントの最後のコンシューマオフセットを保存できないため、古いオフセットからサブスクライブしたデータのコンシュームを開始する可能性があります。クライアント上で コンシューマオフセットを永続的に保存することをお勧めします。

SDK クライアントを再起動した後、最後に記録されたコンシューマオフセットを再 してデータコンシュームを続行します。

ASSIGN モード

consumerContext.java ファイルの setForceUseCheckpoint 設定に基づいてコンシューマオフセットをクエリします。見つかると、オフセット情報が返されます:

  • true に設定されている場合、SDK クライアントが再起動されるたびに、渡された initCheckpoint がコンシューマオフセットとして強制的に使用されます。

  • false に設定されているか、設定されていない場合、次の順序で最後に記録されたコンシューマオフセットを検索します:

    1. SDK クライアントが配置されているサーバー上の localCheckpointStore ファイル。

    2. DTS サーバー (増分データ収集モジュール) によって保存されたオフセット。

      説明

      オフセットは、SDK クライアントが commit メソッドを呼び出した後にのみ更新されます。

    3. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

SUBSCRIBE モード

このモードでは、consumerContext.java ファイルの setForceUseCheckpoint 設定は有効になりません。次の順序で最後に記録されたコンシューマオフセットを検索します:

  1. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

  2. DTS サーバー (増分データ収集モジュール) によって保存されたオフセット。

    説明

    オフセットは、SDK クライアントが commit メソッドを呼び出した後にのみ更新されます。

  3. DTSConsumerSubscribeDemo.java ファイルで initCheckpoint を使用して渡した開始タイムスタンプ。

  4. DTS サーバー (新しい増分データ収集モジュール) の開始オフセットを使用します。

コンシューマオフセットの永続ストレージの使用

増分データ収集モジュールでディザスタリカバリメカニズムがトリガーされた場合、特に SUBSCRIBE モードでは、新しいモジュールはクライアントの最後のコンシューマオフセットを取得できません。これにより、クライアントが古いオフセットからデータのコンシュームを開始し、既存データの繰り返しコンシュームが発生する可能性があります。たとえば、サービススイッチオーバーの前に、古いモジュールのオフセット範囲が 2023 年 11 月 11 日 08:00:00 から 2023 年 11 月 12 日 08:00:00 までであるとします。クライアントのコンシューマオフセットはこの範囲の最後にあります: 2023 年 11 月 12 日 08:00:00。スイッチオーバー後、新しいモジュールのオフセット範囲は 2023 年 11 月 8 日 10:00:00 から 2023 年 11 月 12 日 08:01:00 までです。このシナリオでは、クライアントは新しいモジュールの開始オフセット (2023 年 11 月 8 日 10:00:00) からデータのコンシュームを開始するため、既存データが再度コンシュームされます。

このスイッチオーバーシナリオで既存データの繰り返しコンシュームを避けるために、クライアントでコンシューマオフセットの永続ストレージ方法を設定することをお勧めします。次のセクションでは、要件に基づいて変更できるサンプルメソッドを提供します。

  1. AbstractUserMetaStore() メソッドを継承して実装する UserMetaStore() メソッドを作成します。

    たとえば、MySQL データベースを使用してオフセット情報を保存する場合、Java サンプルコードは次のようになります:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.java ファイルで、setUserRegisteredStore(new UserMetaStore()) メソッドを使用して外部ストレージ媒体を設定します。

トラブルシューティング

エラー

エラーメッセージ

原因

解決策

接続に失敗しました

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl が正しくありません。

brokerUrluserName、および password パラメーターに有効な値を入力します。詳細については、「パラメーターの説明」をご参照ください。

telnet real node *** failed, please check the network

ブローカーアドレスを介して実 IP アドレスに接続できません。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

ユーザー名またはパスワードが正しくありません。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java ファイルで、setUseCheckpointtrue に設定されていますが、コンシューマオフセットが変更追跡インスタンスのタイムスタンプ範囲内にありません。

変更追跡インスタンスのデータ範囲内のコンシューマオフセットを指定します。詳細については、このトピックの「パラメーターの説明」表をご参照ください。

データコンシュームが遅くなる

なし

  • 統計情報DStoreRecordQueue および DefaultUserRecordQueue キューのサイズをクエリすることで、データコンシュームが遅い原因を分析できます。

    • DStoreRecordQueue パラメーターが 0 のままである場合、DTS サーバーはより遅い速度でデータをプルしています。

    • DefaultUserRecordQueue パラメーターがデフォルト値の 512 のままである場合、SDK クライアントはより遅い速度でデータをコンシュームしています。

  • 必要に応じて、コード内のコンシューマオフset (initCheckpoint) を変更してオフセットをリセットします。