すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Flink を使用して MaxCompute にデータを書き込む

最終更新日:Nov 09, 2025

MaxCompute は、新しい Flink コネクタプラグインを提供します。このプラグインを使用して、Flink から MaxCompute の標準テーブルおよび Delta テーブルにデータを書き込むことができます。このトピックでは、新しい Flink コネクタの機能と、MaxCompute にデータを書き込むための主な手順について説明します。

背景情報

  • サポートされている書き込みモード

    新しい Flink コネクタは、upsert または insert モードで MaxCompute にデータを書き込むことをサポートしています。upsert モードでは、次の 2 つの方法のいずれかでデータを書き込むことができます。

    • プライマリキーによるグループ化

    • パーティションフィールドによるグループ化

      テーブルに多数のパーティションがある場合は、パーティションフィールドでデータをグループ化できます。このメソッドはデータスキューを引き起こす可能性があることに注意してください。

  • upsert モードでの Flink コネクタのデータ書き込みプロセスと推奨パラメーター構成の詳細については、「データウェアハウスへのリアルタイムデータインジェスト」をご参照ください。

  • Flink ジョブを構成して MaxCompute にデータを書き込む際に、Flink コネクタのパラメーターを設定して書き込みモードを指定できます。コネクタパラメーターの完全なリストについては、「付録: 新しい Flink コネクタのすべてのパラメーター」をご参照ください。

  • Flink upsert ジョブのチェックポイント間隔を 3 分以上に設定してください。そうしないと、書き込み効率が低下し、多数の小さなファイルが生成される可能性があります。

  • 次の表は、MaxCompute と Realtime Compute for Apache Flink の間のフィールドデータ型のマッピングを示しています。

    Flink データ型

    MaxCompute データ型

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    説明

    Flink の TIMESTAMP データ型にはタイムゾーンが含まれていませんが、MaxCompute の TIMESTAMP データ型には含まれています。この違いにより、8 時間の時間のずれが生じます。タイムスタンプを合わせるには、TIMESTAMP_LTZ(9) を使用してください。

    -- FlinkSQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'id',
      created_time TIMESTAMP NOT NULL COMMENT '作成時間',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新時間',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

セルフマネージドのオープンソース Flink クラスターから MaxCompute へのデータ書き込み

  1. 準備: MaxCompute テーブルの作成

    まず、Flink データを書き込むことができる MaxCompute テーブルを作成します。次の例では、パーティション化されていない Delta テーブルとパーティション化された Delta テーブルを作成して、Flink データを MaxCompute に書き込む主なプロセスを示します。テーブルのプロパティについては、「Delta テーブルのパラメーター」をご参照ください。

    -- パーティション化されていない Delta テーブルを作成します。
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    -- パーティション化された Delta テーブルを作成します。
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. オープンソースの Flink クラスターを構築します。Flink バージョン 1.13、1.15、1.16、および 1.17 がサポートされています。お使いの Flink バージョンに対応する Flink コネクタを選択してください:

    説明
    • バージョン 1.16 用の Flink コネクタは Flink 1.17 で使用できます。

    • このトピックでは、Flink コネクタ 1.13 を例として使用します。パッケージをローカル環境にダウンロードして解凍してください。

  3. Flink コネクタをダウンロードし、Flink クラスターパッケージに追加します。

    1. Flink コネクタの JAR パッケージをローカル環境にダウンロードします。

    2. Flink コネクタの JAR パッケージを、解凍した Flink インストールパッケージの lib ディレクトリに追加します。

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. Flink サービスを開始します。

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. Flink SQL クライアントを起動します。

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. Flink テーブルを作成し、Flink コネクタのパラメーターを構成します。

    Flink SQL または Flink DataStream API のいずれかを使用して、Flink テーブルを作成し、パラメーターを構成できます。以下のセクションでは、両方のメソッドの主要な例を示します。

    Flink SQL の使用

    1. Flink SQL エディターに移動し、次のコマンドを実行してテーブルを作成し、パラメーターを構成します。

      -- Flink SQL に対応するパーティション化されていないテーブルを登録します。
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
      
      -- Flink SQL に対応するパーティション化されたテーブルを登録します。
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
    2. Flink テーブルにデータを書き込み、MaxCompute テーブルをクエリして、データが正常に書き込まれたことを確認します。

      -- Flink SQL クライアントでパーティション化されていないテーブルにデータを挿入します。
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      
      -- MaxCompute でデータをクエリし、結果を確認します。
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      -- Flink SQL クライアントでパーティション化されていないテーブルにデータを挿入します。
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      -- MaxCompute でデータをクエリし、結果を確認します。
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      -- Flink SQL クライアントでパーティション化されたテーブルにデータを挿入します。
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      -- MaxCompute でデータをクエリし、結果を確認します。
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      -- Flink SQL クライアントでパーティション化されたテーブルにデータを挿入します。
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      -- MaxCompute でデータをクエリし、結果を確認します。
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    DataStream API を使用する

    1. DataStream API を使用する場合は、まず次の依存関係を追加します。

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      説明

      xxx を実際のバージョン番号に置き換えてください。

    2. 次のサンプルコードは、テーブルを作成してパラメーターを構成する方法を示しています。

      package com.aliyun.odps.flink.examples;
      
      // Flink と MaxCompute の連携の例
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); // ソーステーブルからデータを読み込む
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert"); // upsert モードを設定
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); // commit スレッド数を設定
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); // メジャーコンパクションの最小 commit 回数を設定
      
              OdpsConf odpsConfig = new OdpsConf("accessid", // AccessKey ID
                      "accesskey", // AccessKey Secret
                      "endpoint", // MaxCompute エンドポイント
                      "project", // MaxCompute プロジェクト名
                      "tunnel endpoint"); // Tunnel エンドポイント
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a") // プロジェクト名
                      .tableName("user_ledger_portfolio") // テーブル名
                      .partition("") // パーティション
                      .configuration(config) // 構成
                      .odpsConf(odpsConfig) // ODPS 構成
                      .sink(input, false); // データをシンクに書き込む
              env.execute();
          }
      }

Alibaba Cloud 上のフルマネージド Flink から MaxCompute へのデータ書き込み

  1. 準備: MaxCompute テーブルの作成

    まず、Flink データを書き込むことができる MaxCompute テーブルを作成します。次の例では、Delta テーブルを作成する方法を示します。

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. Realtime Compute for Apache Flink コンソールにログインし、Flink コネクタの情報を表示します。Flink コネクタは、Alibaba Cloud 上のフルマネージド Flink の Ververica Platform (VVP) にプリロードされています。

  3. Flink SQL ジョブを使用して Flink テーブルを作成し、リアルタイムの Flink データを生成します。ジョブを開発した後、デプロイできます。

    Flink ジョブ開発ページで、Flink SQL ジョブを作成および編集します。次の例では、Flink ソーステーブルと一時的な Flink シンクテーブルを作成します。この例には、リアルタイムデータを自動的に生成してソーステーブルに書き込むロジックも含まれています。その後、ジョブログロジックはソーステーブルから一時的なシンクテーブルにデータを書き込みます。SQL ジョブ開発の詳細については、「ジョブ開発マップ」をご参照ください。

    -- Flink ソーステーブルを作成します。
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    -- 一時的な Flink シンクテーブルを作成します。
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI****************',
        		'odps.access.key'='********************',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    -- Flink 計算ロジック
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    各項目の意味は次のとおりです。

    odps.end.point: 対応するリージョンの内部ネットワークエンドポイントを使用します。

    upsert.write.bucket.num: このパラメーターの値は、MaxCompute の Delta テーブルの write.bucket.num プロパティの値と同じでなければなりません。

  4. MaxCompute でデータをクエリして、Flink データが正常に書き込まれたことを確認します。

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    -- 結果: Flink データはランダムに生成されるため、MaxCompute でのクエリ結果は例と異なる場合があります。
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

付録: 新しい Flink コネクタのすべてのパラメーター

  • 基本パラメータ

    パラメータ

    必須

    デフォルト値

    説明

    connector

    はい

    なし

    コネクタタイプ。これを MaxCompute に設定します。

    odps.project.name

    はい

    なし

    MaxCompute プロジェクトの名前。

    odps.access.id

    はい

    なし

    Alibaba Cloud アカウントの AccessKey ID。この情報は AccessKey ペアページ で表示できます。

    odps.access.key

    はい

    なし

    Alibaba Cloud アカウントの AccessKey Secret。この情報は AccessKey ペアページ で表示できます。

    odps.end.point

    はい

    なし

    MaxCompute のエンドポイント。さまざまなリージョンにおける MaxCompute のエンドポイントについては、「エンドポイント」をご参照ください。

    odps.tunnel.end.point

    いいえ

    なし

    Tunnel サービスのパブリックエンドポイント。Tunnel エンドポイントを構成しない場合、トラフィックは MaxCompute サービスが配置されているネットワークに対応する Tunnel エンドポイントに自動的にルーティングされます。Tunnel エンドポイントを構成する場合、トラフィックは指定されたエンドポイントにルーティングされ、自動ルーティングは無効になります。

    さまざまなリージョンとネットワークタイプに対応する Tunnel エンドポイントについては、「エンドポイント」をご参照ください。

    odps.tunnel.quota.name

    いいえ

    なし

    MaxCompute にアクセスするために使用される Tunnel クォータの名前。

    table.name

    はい

    なし

    MaxCompute テーブルの名前。フォーマットは [project.][schema.]table です。

    odps.namespace.schema

    いいえ

    false

    3 層モデルを使用するかどうかを指定します。3 層モデルの詳細については、「スキーマ操作」をご参照ください。

    sink.operation

    はい

    insert

    書き込みモード。有効な値: insert および upsert

    説明

    MaxCompute Delta テーブルのみが upsert 書き込みをサポートしています。

    sink.parallelism

    いいえ

    なし

    書き込みの並列処理の次数。これが設定されていない場合、デフォルトで入力データの並列処理の次数が使用されます。

    説明

    write.bucket.num テーブルプロパティの値がこのパラメーターの値の整数倍であることを確認してください。これにより、最適な書き込みパフォーマンスが確保され、シンクノードのメモリを最大限に節約できます。

    sink.meta.cache.time

    いいえ

    400

    メタデータキャッシュのサイズ。

    sink.meta.cache.expire.time

    いいえ

    1200

    メタデータキャッシュのタイムアウト期間 (秒)。

    sink.coordinator.enable

    いいえ

    はい。

    コーディネーターモードを有効にするかどうかを指定します。

  • パーティションパラメータ

    パラメータ

    必須

    デフォルト値

    説明

    sink.partition

    いいえ

    なし

    書き込み先のパーティションの名前。

    動的パーティションを使用する場合、これは動的パーティションの親パーティションの名前です。

    sink.partition.default-value

    いいえ

    __DEFAULT_PARTITION__

    動的パーティションで使用するデフォルトのパーティション名。

    sink.dynamic-partition.limit

    いいえ

    100

    動的パーティションに書き込む場合、これは単一のチェックポイントで同時にインポートできるパーティションの最大数です。

    説明

    この値を大幅に増やさないでください。同時に多数のパーティションに書き込むと、シンクノードでメモリ不足 (OOM) エラーが発生する可能性があります。書き込みの同時パーティション数がしきい値を超えると、書き込みジョブは失敗します。

    sink.group-partition.enable

    いいえ

    false

    動的パーティションに書き込むときに、パーティションごとにデータをグループ化するかどうかを指定します。

    sink.partition.assigner.class

    いいえ

    なし

    PartitionAssigner 実装クラス。

  • FileCached モードでの書き込み用パラメーター

    多数の動的パーティションがある場合は、ファイルキャッシュモードを使用できます。次のパラメーターを使用して、データ書き込み用のキャッシュファイル情報を構成できます。

    パラメータ

    必須

    デフォルト値

    説明

    sink.file-cached.enable

    いいえ

    false

    FileCached モードでの書き込みを有効にするかどうかを指定します。有効な値:

    • false: モードを無効にします。

    • true: モードを有効にします。

      説明

      動的パーティションが多数ある場合は、ファイルキャッシュモードを使用してください。

    sink.file-cached.tmp.dirs

    いいえ

    ./local

    ファイルキャッシュモードでのキャッシュファイルのデフォルトディレクトリ。

    sink.file-cached.writer.num

    いいえ

    16

    ファイルキャッシュモードでデータをアップロードするための単一タスクの同時スレッド数。

    説明

    この値を大幅に増やさないでください。同時に多数のパーティションに書き込むと、OOM エラーが発生する可能性があります。

    sink.bucket.check-interval

    いいえ

    60000

    ファイルキャッシュモードでファイルサイズをチェックする間隔。単位: ミリ秒 (ms)。

    sink.file-cached.rolling.max-size

    いいえ

    16 M

    ファイルキャッシュモードでの単一キャッシュファイルの最大サイズ。

    ファイルがこのサイズを超えると、そのデータはサーバーにアップロードされます。

    sink.file-cached.memory

    いいえ

    64 M

    ファイルキャッシュモードでファイルの書き込みに使用されるオフヒープメモリの最大サイズ。

    sink.file-cached.memory.segment-size

    いいえ

    128 KB

    ファイルキャッシュモードでファイルの書き込みに使用されるバッファーのサイズ。

    sink.file-cached.flush.always

    いいえ

    true

    ファイルキャッシュモードで、ファイルの書き込み時にキャッシュを使用するかどうかを指定します。

    sink.file-cached.write.max-retries

    いいえ

    3

    ファイルキャッシュモードでのデータアップロードの再試行回数。

  • insert または upsert 書き込みのパラメーター

    Upsert 書き込みパラメーター

    パラメータ

    必須

    デフォルト値

    説明

    upsert.writer.max-retries

    いいえ

    3

    Upsert Writer がバケットへの書き込みに失敗した場合の再試行回数。

    upsert.writer.buffer-size

    いいえ

    64 m

    Flink の単一の Upsert Writer のキャッシュサイズ。

    説明
    • すべてのバケットの合計バッファーサイズがしきい値に達すると、システムは自動的にデータをサーバーにフラッシュします。

    • 単一の Upsert Writer は、同時に複数のバケットに書き込むことができます。この値を増やすと、書き込み効率が向上します。

    • 多数のパーティションに書き込む場合、OOM エラーのリスクがあります。この場合、この値を減らすことを検討してください。

    upsert.writer.bucket.buffer-size

    いいえ

    1 m

    Flink の単一バケットのキャッシュサイズ。Flink サーバーのメモリリソースが不足している場合は、この値を減らすことができます。

    upsert.write.bucket.num

    はい

    なし

    シンクテーブルのバケット数。この値は、シンクテーブルの write.bucket.num プロパティの値と同じでなければなりません。

    upsert.write.slot-num

    いいえ

    1

    単一セッションで使用される Tunnel スロットの数。

    upsert.commit.max-retries

    いいえ

    3

    upsert セッションコミットの再試行回数。

    upsert.commit.thread-num

    いいえ

    16

    upsert セッションコミットの並列処理の次数。

    このパラメーターを大きな値に設定しないでください。コミットの同時実行数が多いとリソース消費量が増加し、パフォーマンスの問題や過剰なリソース使用量が発生する可能性があります。

    upsert.major-compact.min-commits

    いいえ

    100

    メジャーコンパクションをトリガーするために必要なコミットの最小数。

    upsert.commit.timeout

    いいえ

    600

    upsert セッションコミットが待機するタイムアウト期間。単位: 秒 (s)。

    upsert.major-compact.enable

    いいえ

    false

    メジャーコンパクションを有効にするかどうかを指定します。

    upsert.flush.concurrent

    いいえ

    2

    単一パーティションで同時に書き込むことができるバケットの最大数。

    説明

    バケット内のデータがフラッシュされるたびに、Tunnel スロットが占有されます。

    説明

    upsert 書き込みの推奨パラメーター構成の詳細については、「Upsert 書き込みの推奨パラメーター構成」をご参照ください。

    Insert 書き込みパラメーター

    パラメータ

    必須

    デフォルト値

    説明

    insert.commit.thread-num

    いいえ

    16

    コミットセッションの並列処理の次数。

    insert.arrow-writer.enable

    いいえ

    false

    Arrow 形式を使用するかどうかを指定します。

    insert.arrow-writer.batch-size

    いいえ

    512

    Arrow バッチの最大行数。

    insert.arrow-writer.flush-interval

    いいえ

    100000

    ライターがデータをフラッシュする間隔。単位: ミリ秒 (ms)。

    insert.writer.buffer-size

    いいえ

    64 M

    バッファリングされたライターのキャッシュサイズ。