このトピックでは、オープンソースの Apache Flink から AnalyticDB for MySQL データウェアハウス版 クラスタにデータをインポートする方法について説明します。
前提条件
Apache Flink ドライバがダウンロードされ、すべての Apache Flink ノードの ${Flink デプロイメントディレクトリ}/lib ディレクトリにデプロイされています。 Apache Flink のバージョンに基づいてドライバをダウンロードできます。 次のリストは、Apache Flink のバージョンに対応するドライバパッケージのダウンロードリンクを示しています。
Apache Flink 1.11: flink-connector-jdbc_2.11-1.11.0.jar
Apache Flink 1.12: flink-connector-jdbc_2.11-1.12.0.jar
Apache Flink 1.13: flink-connector-jdbc_2.11-1.13.0.jar
他の Apache Flink バージョンのドライバをダウンロードするには、JDBC SQL コネクタ ページにアクセスしてください。
MySQL ドライバがダウンロードされ、すべての Apache Flink ノードの ${Flink デプロイメントディレクトリ}/lib ディレクトリにデプロイされています。
説明MySQL ドライバのバージョンは 5.1.40 以降である必要があります。 MySQL ドライバをダウンロードするには、mysql/mysql-connector-java ページにアクセスしてください。
すべての JAR パッケージがデプロイされた後、Apache Flink クラスタが再起動されます。 Apache Flink クラスタの起動方法の詳細については、「ステップ 2: クラスタを起動する」をご参照ください。
書き込むデータを格納するために、AnalyticDB for MySQL クラスタにデータベースとテーブルが作成されています。 データベースとテーブルの作成方法については、「CREATE DATABASE」および「CREATE TABLE」をご参照ください。
説明この例では、次の文を実行して、
tpch
という名前のデータベースが作成されます。CREATE DATABASE IF NOT EXISTS tpch; /* データベース tpch を作成する */
この例では、次の文を実行して、
person
という名前のテーブルが作成されます。CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int); /* テーブル person を作成する */
AnalyticDB for MySQL クラスターがエラスティックモードの場合、Eniネットワーク を ネットワーク情報 セクションの クラスター情報 ページで有効にする必要があります。
重要ENI を有効または無効にすると、データベース接続が約 2 分間中断される場合があります。 この間、読み取りまたは書き込み操作を実行することはできません。 ENI を有効または無効にする場合は注意してください。
使用上の注意
このトピックでは、Apache Flink SQL を使用してテーブルを作成し、AnalyticDB for MySQL にデータを書き込む方法についてのみ説明します。 Apache Flink Java Database Connectivity (JDBC) API を使用してデータを書き込む方法については、「JDBC コネクタ」をご参照ください。
このトピックで説明されている方法は、Apache Flink 1.11 以降にのみ適用されます。 他の Apache Flink バージョンから AnalyticDB for MySQL クラスタにデータを書き込むには、次のいずれかの方法を使用できます。
Apache Flink 1.9 および 1.10 からデータを書き込む方法については、「Flink v1.10」をご参照ください。
Apache Flink 1.8 以前からデータを書き込む方法については、「Flink v1.8」をご参照ください。
手順
この例では、書き込むソースデータとして CSV ファイルを使用します。
ステップ | 説明 |
CSV ファイルを作成し、ファイルにデータを書き込み、すべての Apache Flink ノードの /root ディレクトリにファイルをデプロイします。 | |
SQL 文を実行して、Apache Flink にソーステーブルと結果テーブルを作成し、テーブルを使用してソースデータを AnalyticDB for MySQL クラスタに書き込みます。 | |
AnalyticDB for MySQL データベースにログインして、ソースデータが書き込まれているかどうかを確認します。 |
ステップ 1: データを準備する
Apache Flink ノードのルートディレクトリで、
vim /root/data.csv
コマンドを実行して、data.csv という名前の CSV ファイルを作成します。CSV ファイルには次のデータが含まれています。 書き込むデータ量を増やすために、追加のデータ行をコピーできます。
0,json00,20 1,json01,21 2,json02,22 3,json03,23 4,json04,24 5,json05,25 6,json06,26 7,json07,27 8,json08,28 9,json09,29
CSV ファイルを作成した後、他の Apache Flink ノードの /root ディレクトリにファイルをデプロイします。
ステップ 2: データを書き込む
Apache Flink SQL アプリケーションを起動して実行します。 詳細については、「SQL クライアント CLI の起動」をご参照ください。
csv_person
という名前のソーステーブルを作成するには、次の文を実行します。CREATE TABLE if not exists csv_person ( /* ソーステーブル csv_person を作成する */ `user_id` STRING, `user_name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///root/data.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true' );
説明ソーステーブルの列名とデータ型は、AnalyticDB for MySQL クラスタ内のターゲットテーブルの列名とデータ型と同じである必要があります。
上記の文の
path
フィールドは、data.csv ファイルのオンプレミスディレクトリを指定します。 すべての Apache Flink ノード上のファイルのディレクトリが同じであることを確認してください。 data.csv ファイルがオンプレミスデバイスに保存されていない場合は、ファイルの実際のディレクトリを指定してください。上記の文の他のパラメータについては、「FileSystem SQL コネクタ」をご参照ください。
mysql_person
という名前の結果テーブルを作成するには、次の文を実行します。CREATE TABLE mysql_person ( /* 結果テーブル mysql_person を作成する */ user_id String, user_name String, age INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://<エンドポイント:ポート>/<DB名>?useServerPrepStmts=false&rewriteBatchedStatements=true', 'table-name' = '<テーブル名>', 'username' = '<ユーザー名>', 'password' = '<パスワード>', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '1s' );
説明結果テーブルの列名とデータ型は、AnalyticDB for MySQL クラスタ内のターゲットテーブルの列名とデータ型と同じである必要があります。
次の表は、AnalyticDB for MySQL クラスタに接続するために必要なパラメータを示しています。 オプションのパラメータについては、JDBC SQL コネクタのトピックの「コネクタオプション」セクションをご参照ください。
パラメータ
説明
connector
Apache Flink が使用するコネクタの型。 このパラメータを
jdbc
に設定します。url
AnalyticDB for MySQL クラスタの JDBC URL。
形式:
jdbc:mysql://<エンドポイント:ポート>/<DB名>?useServerPrepStmts=false&rewriteBatchedStatements=true'
。エンドポイント
: AnalyticDB for MySQL クラスタのエンドポイント。説明パブリックエンドポイントを使用して AnalyticDB for MySQL クラスタに接続する場合は、最初に パブリックエンドポイントを申請 する必要があります。
DB名
: AnalyticDB for MySQL クラスタ内のターゲットデータベースの名前。useServerPrepStmts=false&rewriteBatchedStatements=true
: AnalyticDB for MySQL クラスタにデータをバッチ書き込みするために必要な構成。 この構成は、書き込みパフォーマンスを向上させ、AnalyticDB for MySQL クラスタの負荷を軽減するために使用されます。
例:
jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true
。table-name
書き込むデータを格納するために使用される、AnalyticDB for MySQL クラスタ内のターゲットテーブルの名前。 この例では、ターゲットテーブルの名前は
person
です。username
書き込み権限を持つ AnalyticDB for MySQL データベースアカウントの名前。
説明SHOW GRANTS 文を実行して、現在のアカウントの権限をクエリできます。
GRANT 文を実行して、アカウントに権限を付与できます。
password
書き込み権限を持つ AnalyticDB for MySQL データベースアカウントのパスワード。
sink.buffer-flush.max-rows
Apache Flink から AnalyticDB for MySQL クラスタに一度に書き込むことができる最大行数。 Apache Flink はリアルタイムでデータを受信します。 Apache Flink が受信するデータ行数がこのパラメータの値に達すると、データ行は AnalyticDB for MySQL クラスタにバッチ書き込みされます。 有効な値:
0:
sink.buffer-flush.interval
パラメータで指定された最大時間間隔に達した場合にのみ、Apache Flink はデータをバッチ書き込みします。0 以外の値。 特定の行数を指定します。 例: 1000 または 2000。
説明このパラメータを 0 に設定しないことをお勧めします。 このパラメータを 0 に設定すると、書き込みパフォーマンスが低下し、同時クエリ中に AnalyticDB for MySQL クラスタの負荷が増加します。
sink.buffer-flush.max-rows
パラメータとsink.buffer-flush.interval
パラメータの両方を 0 以外の値に設定すると、次のバッチ書き込みルールが適用されます。Apache Flink が受信するデータ行数が
sink.buffer-flush.max-rows
パラメータの値に達したが、最大時間間隔がsink.buffer-flush.interval
パラメータの値に達していない場合、Apache Flink は最大時間間隔が経過するのを待たずに、AnalyticDB for MySQL クラスタにデータをバッチ書き込みします。Apache Flink が受信するデータ行数が
sink.buffer-flush.max-rows
パラメータの値に達していないが、最大時間間隔がsink.buffer-flush.interval
パラメータの値に達した場合、Apache Flink は Apache Flink が受信するデータ量に関係なく、AnalyticDB for MySQL クラスタにデータをバッチ書き込みします。
sink.buffer-flush.interval
Apache Flink が AnalyticDB for MySQL クラスタにデータをバッチ書き込みする最大時間間隔。 次のバッチ書き込み操作までの最大待機時間でもあります。 有効な値:
0:
sink.buffer-flush.max-rows
パラメータで指定された最大データ行数に達した場合にのみ、Apache Flink はデータをバッチ書き込みします。0 以外の値。 特定の時間間隔を指定します。 例: 1d、1h、1min、1s、または 1ms。
説明オフピーク時にソースデータの量が小さい場合にデータがタイムリーに書き込まれるように、このパラメータを 0 に設定しないことをお勧めします。
INSERT INTO
文を実行してデータをインポートします。 プライマリキーに重複する値がある場合、データは繰り返し挿入されず、INSERT INTO 文はINSERT IGNORE INTO
文と同じです。 詳細については、「INSERT INTO」をご参照ください。INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person; /* csv_person から mysql_person にデータを挿入する */
ステップ 3: データを確認する
データが書き込まれた後、tpch
AnalyticDB for MySQL クラスタの person
データベースにログインし、次の文を実行して、ソースデータが テーブルに書き込まれているかどうかを確認します。
SELECT * FROM person; /* person テーブルのデータを確認する */