PV/UV statistics using Spark Streaming SQL

1. Background introduction

PV/UV statistics is a common scenario for flow analysis. PV can be used to analyze traffic or hotspots of visited websites. For example, advertisers can estimate the traffic and advertising revenue brought by the advertising webpage through the PV value. In other scenarios, it is necessary to analyze the visiting users, such as analyzing the user's webpage click behavior, and at this time, it is necessary to make statistics on UV.

Using Spark Streaming SQL, combined with Redis, can easily perform PV/UV statistics. This article will introduce how to consume user access information stored in Loghub through Streaming SQL, perform PV/UV statistics on the data in the past 1 minute, and store the results in Redis.

2. Preparations

Create a Hadoop cluster of E-MapReduce 3.23.0 or later.
Download and compile the E-MapReduce-SDK package
git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emareduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests
After compiling, emr-datasources_shaded_${version}.jar will be generated in the assembly/target directory, where ${version} is the sdk version.

data source

This article uses Loghub as the data source. For log collection and log parsing, please refer to Log Service.

3. Statistical PV/UV

In general scenarios, the calculated PV/UV and the corresponding statistical time need to be stored in Redis. In some other business scenarios, only the latest results will be saved, and the old data will be continuously overwritten and updated with new results. The following first introduces the operation process of the first case.

3.1 Start the client

Start the streaming-sql client from the command line

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr -datasources_shaded_2.11-${version}.jar
You can also create a SQL statement file and run it through streaming-sql -f.

3.1 Define the data table

The data source table is defined as follows

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)
USING loghub
OPTIONS(
sls.project=${sls.project},
sls.store=${sls.store},
access.key.id=${access.key.id},
access.key.secret=${access.key.secret},
endpoint=${endpoint});
Among them, the data source table contains two fields user_ip and __time__, which respectively represent the user's IP address and the time column on loghub. The value of the configuration item in OPTIONS is based on the actual configuration.
The resulting table is defined as follows

CREATE TABLE redis_sink
USING redis
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='interval');
Among them, statistic_info is the name of the table where Redis stores the results, and interval corresponds to the interval field in the statistical results; the value of the configuration item ${redis_host} is based on the actual configuration.

3.2 Create stream job

CREATE SCAN loghub_scan
ON loghub_source
USING STREAM
OPTIONS(
watermark.column='__time__',
watermark.delayThreshold='10 second');
CREATE STREAM job
OPTIONS(
checkpointLocation=${checkpoint_location})
INSERT INTO redis_sink
SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;

4.3 View statistical results

The final statistical results are shown in the figure below

It can be seen that a piece of data is generated every minute, the key is in the form of table name: interval, and the value is the value of pv and uv.

3.4 Implement coverage update

Modify the configuration item key.column of the result table to a fixed value, for example, define it as follows

CREATE TABLE redis_sink
USING redis
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='statistic_type');
The SQL to create the streaming job is changed to

CREATE STREAM job
OPTIONS(
checkpointLocation='/tmp/spark-test/checkpoint')
INSERT INTO redis_sink
SELECT "PV_UV" as statistic_type, COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;
The final statistical results are shown in the figure below

It can be seen that the value in Redis retains a value, which is updated every minute, and the value contains the values of pv, uv and interval.

4. Summary

This article briefly introduces the requirements of using Streaming SQL combined with Redis to realize PV/UV statistics in streaming processing. In subsequent articles, I will introduce more about Spark Streaming SQL.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us