All Products
Search
Document Center

Realtime Compute for Apache Flink:Key parameters

Last Updated:Apr 29, 2024

This topic describes some key parameters that you may need to configure when you develop an SQL draft. This topic also provides examples on how to configure the parameters.

table.exec.sink.keyed-shuffle

When you write data to a table that has a primary key, the data may be out of order. To resolve this issue, you can configure the table.exec.sink.keyed-shuffle parameter to perform hash shuffling. This ensures that data with the same primary key is distributed to the same subtask of an operator and reduces the probability of the disorder issue.

Precautions

  • The hash shuffling operation is helpful only when the upstream operator can ensure the valid order of the update records in the primary key field.

  • If you change the parallelism of an operator for a deployment that runs in expert mode, the following parallelism rules do not apply.

Valid values

  • AUTO: If the parallelism of the sink operator is not 1 and the parallelism of the sink operator is different from the parallelism of the upstream operator, Realtime Compute for Apache Flink automatically performs hash shuffling on the primary key field when data flows to the sink operator. This is the default value.

  • FORCE: If the parallelism of the sink operator is not 1, Realtime Compute for Apache Flink forcefully performs hash shuffling on the primary key field when data flows to the sink operator.

  • NONE: Realtime Compute for Apache Flink does not perform hash shuffling based on the parallelism of the sink operator and the parallelism of the upstream operator.

Examples

  • AUTO

    1. Create an SQL streaming draft, copy the following test SQL statements to the code editor, and then deploy the draft. In this example, the parallelism of the sink operator is explicitly set to 2.

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print',
         -- You can configure the sink.parallelism parameter to specify the parallelism of the sink operator. 
        'sink.parallelism'='2'
      );
      
      INSERT INTO sink SELECT * FROM s1;
      -- You can also configure the dynamic table options to specify the parallelism of the sink operator. 
      --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
    2. In the Resources section of the Configuration tab on the Deployments page, set the Parallelism parameter to 1. In the Parameters section, do not configure the table.exec.sink.keyed-shuffle parameter. Alternatively, add the table.exec.sink.keyed-shuffle: AUTO configuration to the Other Configuration field.

      image

    3. Start the deployment. On the Status tab, the data connection mode between the sink operator and the upstream operator is HASH.

      image

  • FORCE

    1. Create an SQL streaming draft, copy the following test SQL statements to the code editor, and then deploy the draft. You do not need to specify the parallelism of the sink operator.

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print'
      );
      
      INSERT INTO sink
      SELECT * FROM s1;
    2. In the Resources section of the Configuration tab on the Deployments page, set the Parallelism parameter to 2. In the Parameters section, add the table.exec.sink.keyed-shuffle: FORCE configuration to the Other Configuration field.

      image

    3. Start the deployment. On the Status tab, both the parallelism of the sink operator and the parallelism of the upstream operator are 2 and the data connection mode between the sink operator and the upstream operator is changed to HASH.

      image

table.exec.mini-batch.size

This parameter specifies the maximum number of input records that can be cached on a compute node for micro-batch operations. If the number of the cached data records reaches the value of this parameter, final calculation and data output are triggered. This parameter takes effect only when it is used together with the table.exec.mini-batch.enabled and table.exec.mini-batch.allow-latency parameters. For more information about miniBatch-related optimization, see MiniBatch Aggregation and MiniBatch Regular Joins.

Precautions

If you do not explicitly configure this parameter in the Parameters section before a deployment is started, the managed memory is used to cache data in miniBatch processing mode. If one of the following conditions is met, final calculation and data output are triggered:

  • The compute node receives the watermark message sent by the MiniBatchAssigner operator.

  • The managed memory is full.

  • The CHECKPOINT command is received and checkpointing has not been performed.

  • The deployment is canceled.

Valid values

  • -1: The managed memory is used to cache data. This is the default value.

  • A negative value of the LONG type: The processing mechanism is the same as the processing mechanism for the default value.

  • A positive value of the LONG type: The heap memory is used to cache data. When the number of cached input records reaches the specified value, data output is triggered.

Example

  1. Create an SQL streaming draft, copy the following test SQL statements, and then deploy the draft.

    CREATE TEMPORARY TABLE s1 (
      a INT,
      b INT,
      ts TIMESTAMP(3),
      PRIMARY KEY (a) NOT ENFORCED,
      WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) WITH (
      'connector'='datagen',
      'rows-per-second'='1',
      'fields.ts.kind'='random',
      'fields.ts.max-past'='5s',
      'fields.b.kind'='random',
      'fields.b.min'='0',
      'fields.b.max'='10'
    );
    
    CREATE TEMPORARY TABLE sink (
      a INT,
      b BIGINT,
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector'='print'
    );
    
    INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
  2. In the Parameters section of the Configuration tab on the Deployments page, add the table.exec.mini-batch.enabled: true and table.exec.mini-batch.allow-latency: 2s configurations to the Other Configuration field. Do not configure the table.exec.mini-batch.size parameter.

  3. Start the deployment. On the Status tab, the topology of the deployment contains the MiniBatchAssigner, LocalGroupAggregate, and GlobalGroupAggregate operators.

    image

References

Data output is suspended on the LocalGroupAggregate operator for a long period of time. No data output is generated. Why?