Quickly build Lambda big data analysis architecture on the cloud in 5 minutes

background

The Spark China community, together with the Alibaba Cloud EMR technology exchange group and the Tablestore technology exchange group, held a joint technology live broadcast. The topic of the live broadcast is "Real-time computing and processing of massive structured data", which mainly introduces the real-time capture and subscription capabilities of Tablestore-based data changes, and realizes the lightweight implementation of the Lambda architecture on the cloud. There is a demo link in the live broadcast. This article will provide the simple operation steps of the demo link, so that you can build a complete set of architecture similar to the demo scene on Alibaba Cloud to realize real-time and offline data processing.

Demo scene introduction

The demonstration simulates an e-commerce order scenario, and realizes the large-screen order scene through stream computing, so as to achieve real-time injection of massive orders, perform 10-s order statistics aggregation and transaction amount statistics, and make real-time large-screen display. A large screen sample of the entire order is as follows:

For the big screen, we use Alibaba Cloud's DATAV to connect to the Tablestore data source to realize it. Next, let's take a look at the generation process and operation steps from the original data of the order to the result big screen data.

The structure of the whole background is roughly as follows:

In ecs, or simulate an order generator locally, inject order data into Tablestore in real time.

Create a channel in the Tablestore console
Purchase a Spark cluster in the EMR console
Download the latest EMR SDK
Execute the table creation statement and SQL command provided below to realize real-time calculation, and the result table will be written back to Tablestore.
Real-time large-screen display of result table data through DATAV
Step 1: Log in to the Tablestore console on the Alibaba Cloud official website to create instances and tables

After creating an instance, you can create a table with the primary key schema as follows:

Start the client injection program to randomly write data, the sample data is as follows:

Tablestore products are in the form of serverless. Users do not need to purchase sizes or specifications. The products will automatically expand horizontally according to the business.

Step 2: Log in to the EMR console on the official website of Alibaba Cloud to purchase a Spark cluster
The cluster size of Spark can be flexibly selected according to business needs. We actually tested three nodes, and can easily consume 100w/s of data in real time for aggregation calculations!

Step 3: Log in to the EMR cluster to execute the job script
Log in to the master node of EMR, and execute the following command to start the flow task:

1. Start stream sql interaction

Get the latest version of EMR sdk (1.8) from the EMR official website
streaming-sql --driver-class-path emr-datasources_shaded_2.11-1.8.0.jar --jars emr-datasources_shaded_2.11-1.8.0.jar --master yarn-client --num-executors 8 --executor -memory 2g --executor-cores 2

2. Create a streaming source table

DROP TABLE IF EXISTS ots_order_test;
CREATE TABLE ots_order_test
USING tablestore
OPTIONS(
endpoint="fill in the address of Tablestore VPC",
access.key.id="",
access.key.secret="",
instance.name="",
table.name="",
tunnel.id="Find the ID of the channel you want to consume in the Tablestore console",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string "},"price": {"cols": "price", "type": "long"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'
);

3. Create a streaming sink table

DROP TABLE IF EXISTS ots_order_sink_test;
CREATE TABLE ots_order_sink_test
USING tablestore
OPTIONS(
endpoint="",
access.key.id="",
access.key.secret="",
instance.name="",
table.name="",
tunnel.id="",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string "}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "long"}}}'
);

4. Create a Streaming job

CREATE SCAN ots_table_stream on ots_order_test USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/test1',
outputMode='update'
)
insert into ots_order_sink_test
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, sum(price) AS totalPrice FROM ots_table_stream GROUP BY window(to_timestamp(timestamp / 1000000000), " 10 seconds");

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us