All Products
Search
Document Center

Realtime Compute for Apache Flink:Performance white paper (Nexmark performance test)

Last Updated:Jan 19, 2026

This topic describes how to use Nexmark to test the performance of Realtime Compute for Apache Flink.

Performance

With a 1 compute unit (CU) compute resource configuration in Realtime Compute for Apache Flink, the performance of 19 Nexmark queries ranges from a minimum of 5,000 records per second (RPS) to a maximum of 55,000 RPS.

  • For simple operations, such as single-stream filtering and string transformations, 1 CU can process 40,000 to 55,000 records per second.

  • For complex operations, such as JOIN, GROUP BY, or window function operations, 1 CU can process 5,000 to 10,000 records per second.

Prerequisites

  • Java Development Kit (JDK) 1.8.x or a later version is installed.

  • Maven 3.8.2 is installed.

  • Git is installed. To download Git, see Git.

  • Nexmark is installed. For more information about Nexmark, see Nexmark.

  • A workspace is created. For more information, see Activate Realtime Compute for Apache Flink.

    Note

    Nexmark and Git are third-party websites. You may experience access delays or failures.

Test tools

测试图片

  • Nexmark source table: Generates test data based on the required transactions per second (TPS) for the test.

  • Transformations: The 19 queries of Nexmark.

  • Blackhole sink table: Excludes the performance impact of upstream and downstream storage to focus on testing the performance of Flink.

Data preparation

  1. Run the following commands in the command-line interface (CLI) to download and compile the Nexmark source code.

    cd <path>
    git clone https://github.com/nexmark/nexmark.git
    cd nexmark/nexmark-flink
    mvn clean package
    Note

    <path> specifies the custom path where the nexmark.git file is stored.

  2. Create a Nexmark connector.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the target workspace, click Console.

    3. On the right side of the Connectors page, click Create Custom Connector.

    4. In the dialog box that appears, click Select File. The following figure shows an example.Upload a job

      Note

      The package is in the `/nexmark/nexmark-flink/target` directory. The nexmark-flink-0.2-SNAPSHOT.jar file is the compiled package.

    5. Click Next and then click Finish.

The following table describes the parameters of the Nexmark connector.

Parameter

Parameter Value

Description

first-event.rate

10000000

The data generation rate.

next-event.rate

events.num

100000000

The number of events to generate.

bid.proportion

92%

The percentage of Bid events.

auction.proportion

6%

The percentage of Auction events.

person.proportion

2%

The percentage of Person events.

Test procedure

  1. Create the following query jobs. For more information about how to create a job, see Job development map.

q0

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid;

q1

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price DECIMAL(23, 3),
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        0.908 * price as price, -- convert dollar to euro
        channel,
        url,
        dateTime,
        extra 
    FROM
        bid;

q2

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid 
    WHERE
        MOD(auction, 123) = 0;

q3

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (name VARCHAR, city VARCHAR, state VARCHAR, id BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.name, P.city, P.state, A.id 
    FROM
        auction AS A 
        INNER JOIN person AS P
            on A.seller = P.id 
    WHERE
        A.category = 10 
            and(P.state = 'OR' 
            OR P.state = 'ID' 
            OR P.state = 'CA');

q4

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, final BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        Q.category, AVG(Q.final) 
    FROM
        (
            SELECT 
                MAX(B.price) AS final, A.category 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires 
            GROUP BY
                A.id, A.category
        ) Q 
    GROUP BY
        Q.category;

q5

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (auction BIGINT, num BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        AuctionBids.auction, AuctionBids.num 
    FROM
        (
            SELECT 
                B1.auction,
                count(*) AS num,
                HOP_START(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                HOP_END(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
            FROM
                bid B1 
            GROUP BY
                B1.auction,
                HOP(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
        ) AS AuctionBids 
        JOIN (
            SELECT 
                max(CountBids.num) AS maxn,
                CountBids.starttime,
                CountBids.endtime 
            FROM
                (
                    SELECT 
                        count(*) AS num,
                        HOP_START(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                        HOP_END(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
                    FROM
                        bid B2 
                    GROUP BY
                        B2.auction,
                        HOP(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
                ) AS CountBids 
            GROUP BY
                CountBids.starttime, CountBids.endtime
        ) AS MaxBids
            ON AuctionBids.starttime = MaxBids.starttime 
                AND AuctionBids.endtime = MaxBids.endtime 
                AND AuctionBids.num >= MaxBids.maxn;

q7

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.auction, B.price, B.bidder, B.dateTime, B.extra 
    from
        bid B 
        JOIN (
            SELECT 
                MAX(B1.price) AS maxprice,
                TUMBLE_ROWTIME(B1.dateTime, INTERVAL '10' SECOND) as dateTime 
            FROM
                bid B1 
            GROUP BY
                TUMBLE(B1.dateTime, INTERVAL '10' SECOND)
        ) B1
            ON B.price = B1.maxprice 
    WHERE
        B.dateTime BETWEEN B1.dateTime - INTERVAL '10' SECOND AND B1.dateTime;

q8

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, name VARCHAR, stime TIMESTAMP(3)) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.id, P.name, P.starttime 
    FROM
        (
            SELECT 
                P.id,
                P.name,
                TUMBLE_START(P.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(P.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                person P 
            GROUP BY
                P.id,
                P.name,
                TUMBLE(P.dateTime, INTERVAL '10' SECOND)
        ) P 
        JOIN (
            SELECT 
                A.seller,
                TUMBLE_START(A.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(A.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                auction A 
            GROUP BY
                A.seller,
                TUMBLE(A.dateTime, INTERVAL '10' SECOND)
        ) A
            ON P.id = A.seller 
                AND P.starttime = A.starttime 
                AND P.endtime = A.endtime;

q9

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    id BIGINT,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    extra VARCHAR,
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        id,
        itemName,
        description,
        initialBid,
        reserve,
        dateTime,
        expires,
        seller,
        category,
        extra,
        auction,
        bidder,
        price,
        bid_dateTime,
        bid_extra 
    FROM
        (
            SELECT 
                A.*,
                B.auction,
                B.bidder,
                B.price,
                B.dateTime AS bid_dateTime,
                B.extra AS bid_extra,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            A.id
                        ORDER BY
                            B.price DESC, B.dateTime ASC
                    ) AS rownum 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires
        ) 
    WHERE
        rownum <= 1;

q11

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        SESSION_START(B.dateTime, INTERVAL '10' SECOND) as starttime,
        SESSION_END(B.dateTime, INTERVAL '10' SECOND) as endtime 
    FROM
        bid B 
    GROUP BY
        B.bidder, SESSION(B.dateTime, INTERVAL '10' SECOND);

q12

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        TUMBLE_START(B.p_time, INTERVAL '10' SECOND) as starttime,
        TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime 
    FROM
        (
            SELECT 
                *, PROCTIME() as p_time 
            FROM
                bid
        ) B 
    GROUP BY
        B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);

q15

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q16

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    channel VARCHAR,
    `day` VARCHAR,
    `minute` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        channel,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        max(DATE_FORMAT(dateTime, 'HH:mm')) as `minute`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        channel, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q17

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    min_price BIGINT,
    max_price BIGINT,
    avg_price BIGINT,
    sum_price BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        min(price) AS min_price,
        max(price) AS max_price,
        avg(price) AS avg_price,
        sum(price) AS sum_price 
    FROM
        bid 
    GROUP BY
        auction, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q18

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            bidder, auction
                        ORDER BY
                            dateTime DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 1;

q19

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR,
    rank_number BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        * 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            auction
                        ORDER BY
                            price DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 10;

q20

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    auction_dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    auction_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        url,
        B.dateTime,
        B.extra,
        itemName,
        description,
        initialBid,
        reserve,
        A.dateTime,
        expires,
        seller,
        category,
        A.extra 
    FROM
        bid AS B 
        INNER JOIN auction AS A
            on B.auction = A.id 
    WHERE
        A.category = 10;

q21

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    channel_id VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        CASE 
            WHEN lower(channel) = 'apple' 
            THEN '0' 
            WHEN lower(channel) = 'google' 
            THEN '1' 
            WHEN lower(channel) = 'facebook' 
            THEN '2' 
            WHEN lower(channel) = 'baidu' 
            THEN '3' 
            ELSE REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) 
        END AS channel_id 
    FROM
        bid 
    where
        REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) is not null 
            or lower(channel) in ('apple', 'google', 'facebook', 'baidu');

q22

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '10000000',
    'next-event.rate' = '10000000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    dir1 VARCHAR,
    dir2 VARCHAR,
    dir3 VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        SPLIT_INDEX(url, '/', 3) as dir1,
        SPLIT_INDEX(url, '/', 4) as dir2,
        SPLIT_INDEX(url, '/', 5) as dir3 
    FROM
        bid;
  1. Click the tab on the right, and under More Configurations, set Engine Version to Vvr-6.0.4-flink-1.15.

  2. In the upper-right corner of the SQL editor, click Deploy.

  3. On the Operation Center > Job O&M page, click the target job. On the Deployment Details tab, in the Resource Configuration section, click Edit in the upper-right corner. Set the Concurrency parameter to 1, the TaskManager Memory parameter to 4 GiB, and the TaskManager CPU Cores parameter to 1 Core. The following figure shows the configuration.Resource configuration settings

  4. After the configuration is complete, click Save in the upper-right corner of the Resource Configuration section.

  5. For each target job, click Start in the Actions column.

  6. After the status of the target job changes to Completed, click the name of the target job. On the Running Events tab, view the Start Time and End Time of the job, as shown below.

    • Start time: The job started successfully.

    • End time: The job finished successfully.

Expected results

Note

Duration is the difference between the End Time and the Start Time. RPS is the result of events.num divided by Duration.

Query

Start Time

End Time

Duration (seconds)

RPS

q0

2023-02-07 11:55:13

2023-02-07 12:26:00

1847

54,141

q1

2023-02-07 12:28:06

2023-02-07 12:58:53

1847

54,141

q2

2023-02-07 12:29:13

2023-02-07 13:00:02

1,849

54,083

q3

2023-02-07 12:47:12

2023-02-07 13:18:01

1849

54083

q4

2023-02-07 13:30:08

2023-02-07 14:04:46

2,078

48,123

q5

2023-02-07 14:15:07

2023-02-07 14:45:45

1838

54,406

q7

2023-02-07 14:24:13

2023-02-07 17:08:42

9,869

10,132

q8

2023-02-07 14:27:09

2023-02-07 14:57:47

1838

54,406

q9

2023-02-07 14:35:53

2023-02-07 15:38:16

3743

26716

q11

2023-02-07 14:38:14

2023-02-07 15:08:53

1839

54,377

q12

2023-02-07 14:40:36

2023-02-07 15:11:15

1839 s

54,377

q15

2023-02-07 14:42:43

2023-02-07 15:13:22

1839

54,377

q16

2023-02-07 14:46:17

2023-02-07 16:03:09

4,012

24,925

q17

2023-02-07 16:02:19

2023-02-07 16:33:07

1848

54112

q18

2023-02-07 16:04:43

2023-02-07 16:41:08

2,185

45,766

q19

2023-02-07 16:07:13

2023-02-07 16:40:01

1968

50,813

q20

2023-02-07 16:13:29

2023-02-07 16:59:13

2,744

36,443

q21

2023-02-07 16:16:32

2023-02-07 16:47:11

1849

54,083

q22

2023-02-07 16:18:53

2023-02-07 16:49:41

1848

54,112