すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:再帰CTEを書き換えてクエリのパフォーマンスを向上させる

最終更新日:Dec 24, 2024

AnalyticDB for PostgreSQLは、再帰的な共通テーブル式 (CTE) を限定的にサポートします。 分散シナリオでは、AnalyticDB for PostgreSQLは、正しい実行計画と結果を確保するために、再帰的CTEの実行で使用されるWorkTableScan演算子に制限を課します。 モーションは禁止されており、WorkTableScan演算子と連携する一時テーブルは、JOIN句の左端に配置する必要があります。 この場合、再帰CTEに大量のデータを持つテーブルが含まれ、AnalyticDB for PostgreSQLインスタンスに多数の計算ノードがある場合、再帰CTEの実行パフォーマンスは非常に低くなります。 AnalyticDB for PostgreSQLでは、一時テーブルの実行計画に制限はありません。 再帰的CTEを書き換えるには、PL/SQL関数と一時テーブルを使用することを推奨します。 各再帰計算の後、書き換えソリューションは、クエリのパフォーマンスを向上させ、ビジネス要件を満たすためのより良い実行計画を提供します。

例を書き直す

テストデータ

テストテーブルを準備し、テーブルにテストデータを挿入します。

CREATE TABLE city(id varchar(4), pid varchar(4), name varchar(10), gdp int);
INSERT INTO city VALUES('33', NULL, 'Zhejiang Province', 20134);
INSERT INTO city VALUES('3301', '33', 'Hangzhou', 5112);
INSERT INTO city VALUES('3302', '33', 'Ningbo', 3992);
INSERT INTO city VALUES('3303', '33', 'Wenzhou', 2125);
INSERT INTO city VALUES('3304', '33', 'Jiaxing', 1688);
INSERT INTO city VALUES('3306', '33', 'Shaoxing', 1852);
INSERT INTO city VALUES('3305', '33', 'Huzhou', 964);
INSERT INTO city VALUES('3307', '33', 'Jinhua', 1445);
INSERT INTO city VALUES('3308', '33', 'Quzhou', 507);
INSERT INTO city VALUES('3309', '33', 'Zhoushan', 491);
INSERT INTO city VALUES('3310', '33', 'Taizhou', 1486);
INSERT INTO city VALUES('3311', '33', 'Lishui', 472);
INSERT INTO city VALUES('32', NULL, 'Jiangsu Province', 30862);
INSERT INTO city VALUES('3201', '32', 'Nanjing', 4359);
INSERT INTO city VALUES('3202', '32', 'Wuxi', 3584);
INSERT INTO city VALUES('3203', '32', 'Xuzhou', 2118);
INSERT INTO city VALUES('3204', '32', 'Changzhou', 2269);
INSERT INTO city VALUES('3205', '32', 'Suzhou', 5548);
INSERT INTO city VALUES('3206', '32', 'Nantong', 2982);
INSERT INTO city VALUES('3207', '32', 'Lianyungang', 976);
INSERT INTO city VALUES('3208', '32', 'Huai'an', 1257);
INSERT INTO city VALUES('3209', '32', 'Yancheng', 1796);
INSERT INTO city VALUES('3210', '32', 'Yangzhou', 1868);
INSERT INTO city VALUES('3211', '32', 'Zhenjiang', 1372);
INSERT INTO city VALUES('3212', '32', 'Taizhou', 1752);
INSERT INTO city VALUES('3213', '32', 'Suqian', 981);

再帰CTEを使用したデータの照会

再帰CTEを使用して、州およびすべての下位都市の国内総生産 (GDP) を照会します。

WITH RECURSIVE CTE AS 
(
    SELECT id, CAST(name AS varchar(100)), gdp FROM city WHERE name = 'Zhejiang Province'
    UNION ALL
    SELECT son.id, CAST(parent.name || '>' || son.name AS varchar(100)), son.gdp AS name 
    FROM city son INNER JOIN CTE parent ON son.pid = parent.id
)
SELECT id, name, gdp FROM CTE ORDER BY gdp DESC;

実行プラン

実行計画を照会するには、上記のSQL文の前にEXPLAINを追加します。

実行計画は、都市テーブルのデータがすべての計算ノードにブロードキャストされることを示しています。 都市テーブルに大量のデータがある場合、クエリのパフォーマンスは大幅に低下します。 Orcaオプティマイザは再帰的CTEをサポートしていないため、クエリはプランナーオプティマイザにフォールバックします。

                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=13568.80..13971.85 rows=28451 width=242)
   Merge Key: city.gdp
   ->  Sort  (cost=13568.80..13592.51 rows=9484 width=242)
         Sort Key: city.gdp DESC
         ->  Recursive Union  (cost=0.00..12942.36 rows=9484 width=242)
               ->  Seq Scan on city  (cost=0.00..155.67 rows=10 width=242)
                     Filter: ((name)::text = 'Zhejiang Province'::text)
               ->  Hash Join  (cost=885.67..1259.70 rows=947 width=242)
                     Hash Cond: ((parent.id)::text = (son.pid)::text)
                     ->  WorkTable Scan on CTE parent  (cost=0.00..1.95 rows=97 width=238)
                     ->  Hash  (cost=520.67..520.67 rows=29200 width=82)
                           ->  Broadcast Motion 3:3  (slice2; segments: 3)  (cost=0.00..520.67 rows=29200 width=82)
                                 ->  Seq Scan on city son  (cost=0.00..131.33 rows=9733 width=82)
 Optimizer: Postgres-based planner
(14 rows)

クエリ結果

次の応答が返されます。

id	name	        gdp
33	Zhejiang Province	        20134
3301	Zhejiang Province > Hangzhou	5112
3302	Zhejiang Province > Ningbo	3992
3303	Zhejiang Province > Wenzhou	2125
3306	Zhejiang Province > Shaoxing	1852
3304	Zhejiang Province > Jiaxing	1688
3310	Zhejiang Province > Taizhou	1486
3307	Zhejiang Province > Jinhua	1445
3305	Zhejiang Province > Huzhou	964
3308	Zhejiang Province > Quzhou	507
3309	Zhejiang Province > Zhoushan	491
3311	Zhejiang Province > Lishui	472

書き換えられた再帰CTEを使用したデータのクエリ

UNION ALLシナリオでの再帰CTEの書き換え

次のサンプルコードは、UNION ALLシナリオでPL/SQL関数を使用して再帰CTEを書き換える方法の例を示しています。

-- The parameter of the function is the value of a variable parameter.
CREATE OR REPLACE FUNCTION city_gdp(
    target_name varchar(10)
) RETURNS TABLE(
    id varchar(4),
    name varchar(100),
    gdp int
) AS $$
-- The intermediate variables that can be used during the function execution.
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
-- Create a temporary table. Compared with the recursive CTE, the temporary table has an additional field named level.  
CREATE TEMP TABLE temp_result(
    id varchar(4),
    name varchar(100),
    gdp int,
    level int
) ON COMMIT DROP DISTRIBUTED BY(id); 
-- Write the non-recursive part of the recursive CTE to the temporary table.
INSERT INTO temp_result
SELECT
    parent.id,
    CAST(parent.name AS varchar(100)),
    parent.gdp,
    1 AS level
FROM city parent
WHERE parent.name = target_name;
-- Count the number of rows in the current temporary table for subsequent loop termination.
prev_count := (
    SELECT COUNT(*) FROM temp_result
);
LOOP 
-- (Optional) Analyze the temporary table to obtain a better execution plan.
ANALYZE temp_result;
-- Write the recursive part of the recursive CTE to the temporary table. The value of the level field must be increased by 1. The data of the current level must be filtered in the WHERE clause.
INSERT INTO temp_result
SELECT
    son.id,
    CAST(parent.name || '>' || son.name AS varchar(100)),
    son.gdp,
    parent.level + 1 AS level
FROM city son
INNER JOIN temp_result parent ON parent.id = son.pid
WHERE parent.level = curr_level;
-- Count the number of rows in the current temporary table.
curr_count := (
    SELECT  COUNT(*) FROM temp_result
);
-- If no data is added to the temporary table, the loop exits.
IF curr_count = prev_count THEN EXIT;
END IF;
-- Update the prev_count and curr_level variables before the next loop.
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
-- Reference the entire temporary table as a recursive CTE and query data from the recursive CTE.
RETURN QUERY
SELECT
    CTE.id,
    CTE.name,
    CTE.gdp
FROM temp_result CTE
ORDER BY gdp DESC;
END;
$$ LANGUAGE plpgsql;

実行プラン

関数の実行プロセスは、Orcaオプティマイザが各ループに対してより良い実行プランを自動的に生成することを示しています。

-- The first loop.
LOG:  Insert on temp_result  (cost=0.00..862.04 rows=1 width=24)
  ->  Result  (cost=0.00..0.00 rows=0 width=0)
        ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=28)
              Hash Key: city.id
              ->  Hash Join  (cost=0.00..862.00 rows=1 width=34)
                    Hash Cond: ((city.pid)::text = (temp_result_1.id)::text)
                    ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..431.00 rows=1 width=28)
                          Hash Key: city.pid
                          ->  Seq Scan on city  (cost=0.00..431.00 rows=1 width=28)
                    ->  Hash  (cost=431.00..431.00 rows=1 width=17)
                          ->  Seq Scan on temp_result temp_result_1  (cost=0.00..431.00 rows=1 width=17)
                                Filter: (level = 1)
Optimizer: GPORCA
-- The second loop.
LOG:  Insert on temp_result  (cost=0.00..862.04 rows=1 width=24)
  ->  Result  (cost=0.00..0.00 rows=0 width=0)
        ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=28)
              Hash Key: city.id
              ->  Hash Join  (cost=0.00..862.00 rows=1 width=43)
                    Hash Cond: ((temp_result_1.id)::text = (city.pid)::text)
                    ->  Seq Scan on temp_result temp_result_1  (cost=0.00..431.00 rows=5 width=27)
                          Filter: (level = 2)
                    ->  Hash  (cost=431.00..431.00 rows=1 width=28)
                          ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..431.00 rows=1 width=28)
                                Hash Key: city.pid
                                ->  Seq Scan on city  (cost=0.00..431.00 rows=1 width=28)
Optimizer: GPORCA
説明

都市テーブルでANALYZEステートメントが実行されないため、上記の結果では最適な実行計画が表示されません。 上記の結果は、ループごとに異なる実行プランが生成されることを示しているだけです。 ANALYZEステートメントの詳細については、「手動収集統計」トピックの「使用法」セクションを参照してください。

クエリ結果

SQL文に有効なパラメーター値を入力します。 例: 浙江省。

SELECT * FROM city_gdp('Zhejiang Province');

次の応答が返されます。

id	name	        gdp
33	Zhejiang Province	        20134
3301	Zhejiang Province > Hangzhou	5112
3302	Zhejiang Province > Ningbo	3992
3303	Zhejiang Province > Wenzhou	2125
3306	Zhejiang Province > Shaoxing	1852
3304	Zhejiang Province > Jiaxing	1688
3310	Zhejiang Province > Taizhou	1486
3307	Zhejiang Province > Jinhua	1445
3305	Zhejiang Province > Huzhou	964
3308	Zhejiang Province > Quzhou	507
3309	Zhejiang Province > Zhoushan	491
3311	Zhejiang Province > Lishui	472

UNIONシナリオでの再帰CTEの書き換え

再帰CTEでUNION句を使用して、データ重複排除を実行し、無限再帰を防ぐことができます。

データ重複排除が必要なシナリオをシミュレートするには、2つの繰り返しデータを都市テーブルに書き込みます。

INSERT INTO city VALUES('1111', '1111', 'Virtual City', 1000);
INSERT INTO city VALUES('1111', '1111', 'Virtual City', 1000);

シミュレーションシナリオでは、次の再帰CTEでUNION句を使用して、繰り返しデータの重複を解消します。

WITH RECURSIVE CTE AS
(
    SELECT id,  gdp FROM city WHERE name = 'Virtual City'
    UNION
    SELECT son.id, son.gdp AS name FROM city son INNER JOIN  CTE parent ON son.pid = parent.id
)
SELECT id,  gdp FROM CTE ORDER BY gdp DESC;

再帰CTEを書き換えると、一時テーブルに一意のインデックスを作成し、INSERT INTO ON CONFLICTステートメントを使用して、データ重複排除後にデータを書き込むことができます。

-- The parameter of the function is the value of a variable parameter.
CREATE OR REPLACE FUNCTION city_gdp(
    target_name varchar(10)
) RETURNS TABLE(
    id varchar(4),
    gdp int
) AS $$
-- The intermediate variables that can be used during the function execution.
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
-- Create a temporary table. Compared with the recursive CTE, the temporary table has an additional field named level.  
CREATE TEMP TABLE temp_result(
    id varchar(4),
    gdp int,
    level int
) ON COMMIT DROP DISTRIBUTED BY(id);
-- New operation in UNION scenarios: Create a unique index.
CREATE UNIQUE INDEX ON temp_result(id, gdp);  
-- Write the non-recursive part of the recursive CTE to the temporary table.
INSERT INTO temp_result
SELECT
    parent.id,
    parent.gdp,
    1 AS level
FROM city parent
WHERE parent.name = target_name
-- New operation in UNION scenarios.
GROUP BY 1,2;
-- Count the number of rows in the current temporary table for subsequent loop termination.
prev_count := (
    SELECT  COUNT(*) FROM temp_result
);
LOOP 
-- (Optional) Analyze the temporary table to obtain a better execution plan.
ANALYZE temp_result;
-- Write the recursive part of the recursive CTE to the temporary table. The value of the level field must be increased by 1. The data of the current level must be filtered in the WHERE clause.
INSERT INTO temp_result
SELECT
    son.id,
    son.gdp,
    parent.level + 1 AS level
FROM city son
INNER JOIN temp_result parent ON parent.id = son.pid
WHERE parent.level = curr_level
-- New operation in UNION scenarios.
GROUP BY 1,2,3
ON CONFLICT DO NOTHING;
-- Count the number of rows in the current temporary table.
curr_count := (
    SELECT COUNT(*) FROM temp_result
);
-- If no data is added to the temporary table, the loop exits.
IF curr_count = prev_count THEN EXIT;
END IF;
-- Update the prev_count and curr_level variables before the next loop.
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
-- Reference the entire temporary table as a recursive CTE and query data from the recursive CTE.
RETURN QUERY
SELECT
    CTE.id,
    CTE.gdp
FROM temp_result CTE ORDER BY gdp DESC;
END;
$$ LANGUAGE plpgsql;

再帰CTEを書き換えた後、SELECT * FROM city_gdppp('Virtual City') ステートメントを実行します。 クエリ結果は、書き換えの前後で同じです。

性能比較

テストデータ

テストテーブルを準備し、テーブルにテストデータを挿入します。

CREATE TABLE test_table(
        id varchar(100),
        parent_id varchar(100),
        float1 float,
        float2 float,
        varchar1 varchar(100),
        varchar2 varchar(100)) DISTRIBUTED BY (id);
INSERT INTO test_table VALUES('1-CCCCCCCCCCCCCCCCCCCC', '', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB');
INSERT INTO test_table SELECT i || '-CCCCCCCCCCCCCCCCCCCC', '1-CCCCCCCCCCCCCCCCCCCC', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB' FROM generate_series(2, 10000) i;
INSERT INTO test_table SELECT i || '-CCCCCCCCCCCCCCCCCCCC', 'test2-CCCCCCCCCCCCCCCCCCCC', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB' FROM generate_series(10001, 5000000) i;

再帰CTEを使用したデータの照会

次のSQL文を実行して、再帰CTEを使用してデータを照会します。

WITH RECURSIVE CTE AS (
    SELECT
        parent.id,
        parent.parent_id,
        CAST(parent.id AS varchar(4000)) id_seq,
        parent.float1,
        parent.float2,
        parent.varchar1,
        parent.varchar2
    FROM test_table parent
    WHERE parent.id IN ('1-CCCCCCCCCCCCCCCCCCCC')
    UNION ALL
    SELECT
        son.id,
        son.parent_id,
        CAST(CTE.id_seq || '>' || son.id AS varchar(4000)) id_seq,
        son.float1,
        son.float2,
        son.varchar1,
        son.varchar2
    FROM test_table son
    INNER JOIN CTE ON CTE.id = son.parent_id
) SELECT * FROM CTE;

次のサンプルコードは、実行計画の例を示しています。

                                                                              QUERY PLAN                                                                              
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather Motion 2:1  (slice1; segments: 2)  (cost=0.00..4008018.27 rows=50000002 width=1404) (actual time=1698.654..1733.896 rows=10000 loops=1)
   ->  Recursive Union  (cost=0.00..3258018.24 rows=25000001 width=628) (actual time=0.062..1694.406 rows=10000 loops=1)
         ->  Seq Scan on test_table parent  (cost=0.00..42563.00 rows=1 width=628) (actual time=0.058..80.130 rows=1 loops=1)
               Filter: ((id)::text = '1-CCCCCCCCCCCCCCCCCCCC'::text)
               Rows Removed by Filter: 2499158
         ->  Hash Join  (cost=194565.00..271545.52 rows=2500000 width=628) (actual time=792.825..806.534 rows=5000 loops=2)
               Hash Cond: ((cte.id)::text = (son.parent_id)::text)
               Extra Text: (seg1)   Hash chain length 1666666.7 avg, 4990000 max, using 3 of 8388608 buckets.
               ->  WorkTable Scan on cte  (cost=0.00..0.20 rows=10 width=734) (actual time=0.002..0.415 rows=5000 loops=2)
               ->  Hash  (cost=111313.00..111313.00 rows=5000000 width=112) (actual time=1591.270..1591.270 rows=5000000 loops=1)
                     Buckets: 8388608  Batches: 1  Memory Usage: 0kB
                     ->  Broadcast Motion 2:2  (slice2; segments: 2)  (cost=0.00..111313.00 rows=5000000 width=112) (actual time=0.138..359.646 rows=5000000 loops=1)
                           ->  Seq Scan on test_table son  (cost=0.00..36313.00 rows=2500000 width=112) (actual time=0.081..179.119 rows=2500841 loops=1)
 Optimizer: Postgres-based planner
 Planning Time: 0.523 ms
   (slice0)    Executor memory: 581K bytes.
   (slice1)    Executor memory: 1224280K bytes avg x 2 workers, 1225892K bytes max (seg1).  Work_mem: 1223292K bytes max.
   (slice2)    Executor memory: 672K bytes avg x 2 workers, 672K bytes max (seg0).
 Memory used:  8388608kB
 Query Id: 5832811209844029710
 Execution Time: 1773.102 ms
(21 rows)

書き換えられた再帰CTEを使用したデータのクエリ

再帰CTEを書き換える前に、次のステートメントを実行して、実行メモリを8 GBに変更する必要があります。

SET statement_mem to '8GB';

再帰CTEを書き換えます。

CREATE OR REPLACE FUNCTION rewrite_query(
    parent_id_arr character varying []
) RETURNS TABLE(
    id varchar(100),
    parent_id varchar(100),
    id_seq varchar(4000),
    float1 float,
    float2 float2,
    varchar1 varchar(100),
    varchar2 varchar(100)
) AS $$
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
CREATE TEMP TABLE temp_result(
    id varchar(100),
    parent_id varchar(100),
    id_seq varchar(4000),
    float1 float,
    float2 float2,
    varchar1 varchar(100),
    varchar2 varchar(100),
    level int
) ON COMMIT DROP DISTRIBUTED BY(id);
INSERT INTO temp_result
SELECT
    parent.id,
    parent.parent_id,
    CAST(parent.id AS varchar(4000)) id_seq,
    parent.float1,
    parent.float2,
    parent.varchar1,
    parent.varchar2,
    1 AS level
FROM test_table parent
WHERE parent.id = ANY(parent_id_arr);
prev_count := (
    SELECT COUNT(*) FROM temp_result
);
LOOP 
ANALYZE temp_result;
INSERT INTO temp_result
SELECT
    son.id,
    son.parent_id,
    CAST(temp_result.id_seq || '>' || son.id AS varchar(4000)) id_seq,
    son.float1,
    son.float2,
    son.varchar1,
    son.varchar2,
    temp_result.level + 1 AS level
FROM test_table son
    INNER JOIN temp_result ON temp_result.id = son.parent_id
WHERE temp_result.level = curr_level;
curr_count := (
    SELECT  COUNT(*) FROM temp_result
);
IF curr_count = prev_count THEN EXIT;
END IF;
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
RETURN QUERY
SELECT
    CTE.id,
    CTE.parent_id,
    CTE.id_seq,
    CTE.float1,
    CTE.float2,
    CTE.varchar1,
    CTE.varchar2
FROM temp_result CTE;
END;
$$ LANGUAGE plpgsql;

実行プランは、書き換えられた再帰CTEの実行期間が約0.9秒であることを示しています。 クエリのパフォーマンスは約2倍になります。

explain analyze SELECT * FROM rewrite_query(array['1-CCCCCCCCCCCCCCCCCCCC']);
                                                        QUERY PLAN                                                        
--------------------------------------------------------------------------------------------------------------------------
 Function Scan on rewrite_query  (cost=0.25..10.25 rows=1000 width=170) (actual time=875.001..875.652 rows=10000 loops=1)
 Optimizer: Postgres-based planner
 Planning Time: 0.040 ms
   (slice0)    Executor memory: 2644K bytes.  Work_mem: 2785K bytes max.
 Memory used:  1048576kB
 Query Id: 338320616919474816
 Execution Time: 875.896 ms
(7 rows)

サクセスストーリー

次の場合は、顧客の生産シナリオからです。 テストシナリオと比較して、このシナリオにはより多くの計算ノードとデータが含まれます。 再帰的CTEが書き換えられた後、実行期間は186秒から2秒未満に短縮されます。

再帰CTEを使用したデータの照会

WITH RECURSIVE CTE AS (
    SELECT
        parent.id,
        parent.parent_id,
        CAST(parent.parent_id AS varchar(4000)) id_seq,
        parent.float1,
        parent.float2,
        parent.varchar1,
        parent.varchar2
    FROM test_table parent
    WHERE
        parent.parent_id IN ('test_id1','test_id2')
    UNION ALL
    SELECT
        son.id,
        son.parent_id,
        CAST(CTE.id_seq || '>' || son.parent_id AS varchar(4000)) id_seq,
        son.float1,
        son.float2,
        son.varchar1,
        son.varchar2
    FROM test_table son
    INNER JOIN CTE ON CTE.id = son.parent_id
)
SELECT
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    split_part(CTE.id_seq, '>', 1) id_1,
    CTE.float1,
    SUM(CTE.float2) sum_float2
FROM CTE
INNER JOIN other_table m ON m.varchar1 = CTE.varchar1
GROUP BY
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    id_1,
    CTE.float1
ORDER BY 7 DESC;

書き換えられた再帰CTEを使用したデータのクエリ

CREATE OR REPLACE FUNCTION rewrite_query(
    parent_id_arr character varying []
) RETURNS TABLE(
    varchar1 varchar(100),
    varchar3 varchar(100),
    id varchar(100),
    parent_id varchar(100),
    id_1 text,
    float1 float,
    sum_float2 float
) AS $$
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
CREATE TEMP TABLE temp_result(
    id varchar(100),
    parent_id varchar(100),
    id_seq varchar(4000),
    float1 float,
    float2 float2,
    varchar1 varchar(100),
    varchar2 varchar(100),
    level int
) ON COMMIT DROP DISTRIBUTED BY(id);
INSERT INTO temp_result
SELECT
    parent.id,
    parent.parent_id,
    CAST(parent.parent_id AS varchar(4000)) id_seq,
    parent.float1,
    parent.float2,
    parent.varchar1,
    parent.varchar2,
    1 AS level
FROM test_table parent
WHERE parent.parent_id = ANY(parent_id_arr);
prev_count := (
    SELECT  COUNT(*)  FROM temp_result
);
LOOP 
ANALYZE temp_result;
INSERT INTO temp_result
SELECT
    son.id,
    son.parent_id,
    CAST(temp_result.id_seq || '>' || son.parent_id as varchar(4000)) id_seq,
    son.float1,
    son.float2,
    son.varchar1,
    son.varchar2,
    temp_result.level + 1 AS level
FROM test_table son
INNER JOIN temp_result ON temp_result.id = son.parent_id
WHERE temp_result.level = curr_level;
curr_count := (
    SELECT COUNT(*) FROM temp_result
);
IF curr_count = prev_count THEN EXIT;
END IF;
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
RETURN QUERY
SELECT
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    split_part(CTE.id_seq, '>', 1) id_1,
    CTE.float1,
    SUM(CTE.float2) sum_float2
FROM temp_result CTE
INNER JOIN other_table m ON m.varchar1 = CTE.varchar1
GROUP BY
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    id_1,
    CTE.float1
ORDER BY 7 DESC;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM rewrite_query(ARRAY['test_id1', 'test_id2']);