すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Apache Flink からデータをインポートする

最終更新日:Apr 15, 2025

このトピックでは、オープンソースの Apache Flink から AnalyticDB for MySQL データウェアハウス版 クラスタにデータをインポートする方法について説明します。

前提条件

  • Apache Flink ドライバがダウンロードされ、すべての Apache Flink ノードの ${Flink デプロイメントディレクトリ}/lib ディレクトリにデプロイされています。 Apache Flink のバージョンに基づいてドライバをダウンロードできます。 次のリストは、Apache Flink のバージョンに対応するドライバパッケージのダウンロードリンクを示しています。

    他の Apache Flink バージョンのドライバをダウンロードするには、JDBC SQL コネクタ ページにアクセスしてください。

  • MySQL ドライバがダウンロードされ、すべての Apache Flink ノードの ${Flink デプロイメントディレクトリ}/lib ディレクトリにデプロイされています。

    説明

    MySQL ドライバのバージョンは 5.1.40 以降である必要があります。 MySQL ドライバをダウンロードするには、mysql/mysql-connector-java ページにアクセスしてください。

  • すべての JAR パッケージがデプロイされた後、Apache Flink クラスタが再起動されます。 Apache Flink クラスタの起動方法の詳細については、「ステップ 2: クラスタを起動する」をご参照ください。

  • 書き込むデータを格納するために、AnalyticDB for MySQL クラスタにデータベースとテーブルが作成されています。 データベースとテーブルの作成方法については、「CREATE DATABASE」および「CREATE TABLE」をご参照ください。

    説明
    • この例では、次の文を実行して、tpch という名前のデータベースが作成されます。

      CREATE DATABASE IF NOT EXISTS tpch; /* データベース tpch を作成する */
    • この例では、次の文を実行して、person という名前のテーブルが作成されます。

      CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int); /* テーブル person を作成する */
  • AnalyticDB for MySQL クラスターがエラスティックモードの場合、Eniネットワークネットワーク情報 セクションの クラスター情報 ページで有効にする必要があります。

    重要

    ENI を有効または無効にすると、データベース接続が約 2 分間中断される場合があります。 この間、読み取りまたは書き込み操作を実行することはできません。 ENI を有効または無効にする場合は注意してください。

使用上の注意

  • このトピックでは、Apache Flink SQL を使用してテーブルを作成し、AnalyticDB for MySQL にデータを書き込む方法についてのみ説明します。 Apache Flink Java Database Connectivity (JDBC) API を使用してデータを書き込む方法については、「JDBC コネクタ」をご参照ください。

  • このトピックで説明されている方法は、Apache Flink 1.11 以降にのみ適用されます。 他の Apache Flink バージョンから AnalyticDB for MySQL クラスタにデータを書き込むには、次のいずれかの方法を使用できます。

    • Apache Flink 1.9 および 1.10 からデータを書き込む方法については、「Flink v1.10」をご参照ください。

    • Apache Flink 1.8 以前からデータを書き込む方法については、「Flink v1.8」をご参照ください。

手順

説明

この例では、書き込むソースデータとして CSV ファイルを使用します。

ステップ

説明

ステップ 1: データを準備する

CSV ファイルを作成し、ファイルにデータを書き込み、すべての Apache Flink ノードの /root ディレクトリにファイルをデプロイします。

ステップ 2: データを書き込む

SQL 文を実行して、Apache Flink にソーステーブルと結果テーブルを作成し、テーブルを使用してソースデータを AnalyticDB for MySQL クラスタに書き込みます。

ステップ 3: データを確認する

AnalyticDB for MySQL データベースにログインして、ソースデータが書き込まれているかどうかを確認します。

ステップ 1: データを準備する

  1. Apache Flink ノードのルートディレクトリで、vim /root/data.csv コマンドを実行して、data.csv という名前の CSV ファイルを作成します。

    CSV ファイルには次のデータが含まれています。 書き込むデータ量を増やすために、追加のデータ行をコピーできます。

    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29
  2. CSV ファイルを作成した後、他の Apache Flink ノードの /root ディレクトリにファイルをデプロイします。

ステップ 2: データを書き込む

  1. Apache Flink SQL アプリケーションを起動して実行します。 詳細については、「SQL クライアント CLI の起動」をご参照ください。

  2. csv_person という名前のソーステーブルを作成するには、次の文を実行します。

    CREATE TABLE if not exists csv_person ( /* ソーステーブル csv_person を作成する */
      `user_id` STRING,
      `user_name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///root/data.csv',
      'format' = 'csv',
      'csv.ignore-parse-errors' = 'true',
      'csv.allow-comments' = 'true'
    );
    説明
    • ソーステーブルの列名とデータ型は、AnalyticDB for MySQL クラスタ内のターゲットテーブルの列名とデータ型と同じである必要があります。

    • 上記の文の path フィールドは、data.csv ファイルのオンプレミスディレクトリを指定します。 すべての Apache Flink ノード上のファイルのディレクトリが同じであることを確認してください。 data.csv ファイルがオンプレミスデバイスに保存されていない場合は、ファイルの実際のディレクトリを指定してください。

      上記の文の他のパラメータについては、「FileSystem SQL コネクタ」をご参照ください。

  3. mysql_person という名前の結果テーブルを作成するには、次の文を実行します。

    CREATE TABLE mysql_person ( /* 結果テーブル mysql_person を作成する */
      user_id String,
      user_name String,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<エンドポイント:ポート>/<DB名>?useServerPrepStmts=false&rewriteBatchedStatements=true',
      'table-name' = '<テーブル名>',
      'username' = '<ユーザー名>',
      'password' = '<パスワード>',
      'sink.buffer-flush.max-rows' = '10',
      'sink.buffer-flush.interval' = '1s'
      );
    説明
    • 結果テーブルの列名とデータ型は、AnalyticDB for MySQL クラスタ内のターゲットテーブルの列名とデータ型と同じである必要があります。

    • 次の表は、AnalyticDB for MySQL クラスタに接続するために必要なパラメータを示しています。 オプションのパラメータについては、JDBC SQL コネクタのトピックの「コネクタオプション」セクションをご参照ください。

    パラメータ

    説明

    connector

    Apache Flink が使用するコネクタの型。 このパラメータを jdbc に設定します。

    url

    AnalyticDB for MySQL クラスタの JDBC URL。

    形式: jdbc:mysql://<エンドポイント:ポート>/<DB名>?useServerPrepStmts=false&rewriteBatchedStatements=true'

    • エンドポイント: AnalyticDB for MySQL クラスタのエンドポイント。

      説明

      パブリックエンドポイントを使用して AnalyticDB for MySQL クラスタに接続する場合は、最初に パブリックエンドポイントを申請 する必要があります。

    • DB名: AnalyticDB for MySQL クラスタ内のターゲットデータベースの名前。

    • useServerPrepStmts=false&rewriteBatchedStatements=true: AnalyticDB for MySQL クラスタにデータをバッチ書き込みするために必要な構成。 この構成は、書き込みパフォーマンスを向上させ、AnalyticDB for MySQL クラスタの負荷を軽減するために使用されます。

    例: jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true

    table-name

    書き込むデータを格納するために使用される、AnalyticDB for MySQL クラスタ内のターゲットテーブルの名前。 この例では、ターゲットテーブルの名前は person です。

    username

    書き込み権限を持つ AnalyticDB for MySQL データベースアカウントの名前。

    説明
    • SHOW GRANTS 文を実行して、現在のアカウントの権限をクエリできます。

    • GRANT 文を実行して、アカウントに権限を付与できます。

    password

    書き込み権限を持つ AnalyticDB for MySQL データベースアカウントのパスワード。

    sink.buffer-flush.max-rows

    Apache Flink から AnalyticDB for MySQL クラスタに一度に書き込むことができる最大行数。 Apache Flink はリアルタイムでデータを受信します。 Apache Flink が受信するデータ行数がこのパラメータの値に達すると、データ行は AnalyticDB for MySQL クラスタにバッチ書き込みされます。 有効な値:

    • 0: sink.buffer-flush.interval パラメータで指定された最大時間間隔に達した場合にのみ、Apache Flink はデータをバッチ書き込みします。

    • 0 以外の値。 特定の行数を指定します。 例: 1000 または 2000

    説明

    このパラメータを 0 に設定しないことをお勧めします。 このパラメータを 0 に設定すると、書き込みパフォーマンスが低下し、同時クエリ中に AnalyticDB for MySQL クラスタの負荷が増加します。

    sink.buffer-flush.max-rows パラメータと sink.buffer-flush.interval パラメータの両方を 0 以外の値に設定すると、次のバッチ書き込みルールが適用されます。

    • Apache Flink が受信するデータ行数が sink.buffer-flush.max-rows パラメータの値に達したが、最大時間間隔が sink.buffer-flush.interval パラメータの値に達していない場合、Apache Flink は最大時間間隔が経過するのを待たずに、AnalyticDB for MySQL クラスタにデータをバッチ書き込みします。

    • Apache Flink が受信するデータ行数が sink.buffer-flush.max-rows パラメータの値に達していないが、最大時間間隔が sink.buffer-flush.interval パラメータの値に達した場合、Apache Flink は Apache Flink が受信するデータ量に関係なく、AnalyticDB for MySQL クラスタにデータをバッチ書き込みします。

    sink.buffer-flush.interval

    Apache Flink が AnalyticDB for MySQL クラスタにデータをバッチ書き込みする最大時間間隔。 次のバッチ書き込み操作までの最大待機時間でもあります。 有効な値:

    • 0: sink.buffer-flush.max-rows パラメータで指定された最大データ行数に達した場合にのみ、Apache Flink はデータをバッチ書き込みします。

    • 0 以外の値。 特定の時間間隔を指定します。 例: 1d1h1min1s、または 1ms

    説明

    オフピーク時にソースデータの量が小さい場合にデータがタイムリーに書き込まれるように、このパラメータを 0 に設定しないことをお勧めします。

  4. INSERT INTO 文を実行してデータをインポートします。 プライマリキーに重複する値がある場合、データは繰り返し挿入されず、INSERT INTO 文は INSERT IGNORE INTO 文と同じです。 詳細については、「INSERT INTO」をご参照ください。

    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person; /* csv_person から mysql_person にデータを挿入する */

ステップ 3: データを確認する

データが書き込まれた後、tpchAnalyticDB for MySQL クラスタの person データベースにログインし、次の文を実行して、ソースデータが テーブルに書き込まれているかどうかを確認します。

SELECT * FROM person; /* person テーブルのデータを確認する */