All Products
Search
Document Center

Realtime Compute for Apache Flink:JOIN statements for dimension tables

Last Updated:Dec 01, 2023

In Realtime Compute for Apache Flink, each data stream can be associated with a dimension table of an external data source. This allows you to perform associated queries in Realtime Compute for Apache Flink.

Background information

Most connectors allow you to specify the cache policy for the JOIN operations on dimension tables. Different connectors support different cache policies. For more information, see the related connector documentation. The following cache policies are supported:

  • None: Data is not cached. This is the default value.

  • LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

  • ALL: All data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the data that meets the requirement cannot be found in the cache, the key does not exist. The system reloads all data in the cache after cache entries expire. If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

Note
  • You need to consider the balance between real-time performance and data processing performance based on your business requirements. If you want data to be updated in real time, you can allow the connector to directly read data from the dimension table without the need to use the cached data.

  • If you want to use a cache policy, you can set the cache policy to LRU and specify the time-to-live (TTL) to cache the latest data. You can set TTL to a small value, such as several seconds to tens of seconds. This way, data can be loaded from the source table at the specified interval.

  • If the cache policy is ALL, you must monitor the memory usage of the operator to prevent out of memory (OOM) errors.

  • If the cache policy is ALL, you must increase the memory of the operator for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.

Limits

  • You can associate a data stream only with the dimension table snapshot that is taken at the current moment.

  • Dimension tables support INNER JOIN and LEFT JOIN operations, and do not support RIGHT JOIN or FULL JOIN operations.

Precautions

  • If you want to perform a one-to-one table join, make sure that join conditions contain an equi-join that contains a unique field in the dimension table.

  • Each data stream is associated only with the latest data in the dimension table at the current time. This means that the JOIN operation is performed only at the processing time. Therefore, if the data in the dimension table is added, updated, or deleted after the JOIN operation is performed, the associated data remains unchanged. For more information about the behavior of specific dimension tables, see Supported connectors.

Syntax

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
Note
  • You must append FOR SYSTEM_TIME AS OF PROCTIME() to the end of the dimension table. This way, each data record in the dimension table that can be viewed at the current time is associated with the source data.

  • The ON condition must contain equivalent conditions for fields that can be randomly searched in the dimension table.

  • In the join conditions that are specified in the ON clause, the fields in the dimension table cannot use type conversion functions, such as CAST. If you want to convert data types, perform the conversion on the fields in the source table.

Join hints for dimension tables

Note
  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0 or later supports join hints for dimension tables.

  • Only Realtime Compute for Apache Flink that uses VVR 8.0 or later supports LOOKUP hints.

  • In Realtime Compute for Apache Flink that uses VVR 8.0 or later, an alias can be specified in a join hint for a dimension table. If an alias is specified for a dimension table, the alias of the dimension table must be used in the join hint.

You can use SQL hints of Flink to specify the join strategy for the JOIN operations on dimension tables. This helps improve join performance. Join hints for dimension tables support LOOKUP hints and the following join strategies: SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, and SKEW. The following table describes the usage scenarios of join strategies based on the configuration of the cache policy for dimension tables.

Cache policy

SHUFFLE_HASH

REPLICATED_SHFFLE_HASH

(Equivalent to SKEW)

None

We recommend that you do not use this join strategy because mainstream data introduces additional network overheads.

We recommend that you do not use this join strategy because mainstream data introduces additional network overheads.

LRU

If the lookup I/O of dimension tables becomes a bottleneck, we recommend that you use this join strategy. If mainstream data has temporal locality on join keys, this join strategy can increase the cache hit ratio and reduce the number of I/O requests. This improves the total throughput.

Important

Mainstream data introduces additional network overheads. If mainstream data is skewed on join keys and a performance bottleneck exists, we recommend that you use the REPLICATED_SHUFFLE_HASH join strategy.

If the lookup I/O of dimension tables becomes a bottleneck and mainstream data is skewed on join keys, we recommend that you use this join strategy. If mainstream data has temporal locality on join keys, this join strategy can increase the cache hit ratio and reduce the number of I/O requests. This improves the total throughput.

ALL

If the memory usage of a dimension table becomes a bottleneck, we recommend that you use this join strategy. This way, the memory usage can be reduced to the value of 1/Parallelism.

Important

Mainstream data introduces additional network overheads. If mainstream data is skewed on join keys and a performance bottleneck exists, we recommend that you use the REPLICATED_SHUFFLE_HASH join strategy.

If the memory usage of a dimension table becomes a bottleneck and mainstream data is skewed on join keys, we recommend that you use this join strategy. This way, the memory usage can be reduced to the value of Number of buckets/Parallelism.

SHUFFLE_HASH

  • Effect

    The SHUFFLE_HASH join strategy allows mainstream data to be shuffled based on join keys before the JOIN operation is performed. If the cache policy is LRU, the cache hit ratio is increased and the number of I/O requests is reduced. If the cache policy is ALL, the memory usage is reduced. You can specify multiple dimension tables in each SHUFFLE_HASH join hint.

  • Limits

    If you use the SHUFFLE_HASH join strategy, the memory overhead is reduced. However, additional network overheads are introduced because upstream data needs to be shuffled based on join keys. Therefore, the SHUFFLE_HASH join strategy is not suitable for the following scenarios:

    • The mainstream data has severe data skew on join keys. If you use the SHUFFLE_HASH join strategy to join data, the join operator may cause a performance bottleneck due to data skew. This may cause severe backpressure in streaming deployments or severe long tails in batch deployments. In this scenario, we recommend that you use the REPLICATED_SHUFFLE_HASH join strategy.

    • If a dimension table contains a small amount of data and does not have a memory bottleneck during table loading when the cache policy is ALL, the memory overheads saved by using the SHUFFLE_HASH join strategy may not be cost-effective, compared with the additional network overheads introduced by using the SHUFFLE_HASH join strategy.

  • Sample code

    -- Enable the SHUFFLE_HASH join strategy only for the dimension table dim1. 
    SELECT /*+ SHUFFLE_HASH(dim1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- Enable the SHUFFLE_HASH join strategy for dimension tables dim1 and dim2. 
    SELECT /*+ SHUFFLE_HASH(dim1, dim2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- You must use the alias D1 for the dimension table dim1 in the hint to enable the SHUFFLE_HASH join strategy. 
    SELECT /*+ SHUFFLE_HASH(D1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
    
    -- Use aliases for dimension tables dim1 and dim2 in the hint to enable the SHUFFLE_HASH join strategy. 
    SELECT /*+ SHUFFLE_HASH(D1, D2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

REPLICATED_SHUFFLE_HASH

  • Effect

    The effect of REPLICATED_SHUFFLE_HASH is basically the same as the effect of SHUFFLE_HASH. However, REPLICATED_SHUFFLE_HASH randomly scatters the mainstream data that has the same key to the specified number of concurrent threads to resolve the performance bottleneck caused by data skew. You can specify multiple dimension tables in each REPLICATED_SHUFFLE_HASH join hint.

  • Limits

    • You must configure the table.exec.skew-join.replicate-num parameter to specify the number of buckets that contain skewed data. The default value of this parameter is 16. The value of this parameter cannot be greater than the number of concurrent threads on the join operator of the dimension table. For more information about how to configure this parameter, see How do I configure parameters for deployment running?

    • Update streams are not supported. If the mainstream is an update stream and you use the REPLICATED_SHUFFLE_HASH join strategy, an error is returned.

  • Sample code

    -- Enable the REPLICATED_SHUFFLE_HASH join strategy for the dimension table dim1.
    SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    
    -- You must use the alias for the dimension table dim1 in the hint to enable the REPLICATED_SHUFFLE_HASH join.
    SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

SKEW

  • Effect

    If the specified table has data skew, the optimizer uses the REPLICATED_SHUFFLE_HASH join strategy for the JOIN operation on the dimension table. SKEW is only a syntactic sugar and the REPLICATED_SHUFFLE_HASH join strategy is actually used at the underlying layer.

  • Limits

    • You can specify only one table in each SKEW hint.

    • The name of the table must be the name of the primary table that has data skew instead of a dimension table.

    • Update streams are not supported. If the mainstream is an update stream and you use the SKEW join strategy, an error is returned.

  • Sample code

    SELECT /*+ SKEW(src) */  
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

Examples

  • Test data

    • Table 1 kafka_input

      id (bigint)

      name (varchar)

      age (bigint)

      1

      lilei

      22

      2

      hanmeimei

      20

      3

      libai

      28

    • Table 2 phoneNumber

      name (varchar)

      phoneNumber (bigint)

      dufu

      1390000111

      baijuyi

      1390000222

      libai

      1390000333

      lilei

      1390000444

  • Test statements

    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
    ON t.name = w.name;
  • Test results

    id (bigint)

    phoneNumber (bigint)

    name (varchar)

    1

    1390000444

    lilei

    3

    1390000333

    libai