Real-time Computing Flink Version of the Real-time Data Warehouse Construction Practice in the Financial Industry

Warehouse Construction Practice Introduction:

Finance is at the heart of the modern economy. my country's financial industry has continued to develop in the course of market-oriented reform and opening-up, and the total financial volume has grown substantially. Financial stability is directly related to the future and destiny of national economic development, and the financial industry is a barometer of national economic development. Objectively analyzing the development status of my country's financial industry and exploring the development trend of the financial industry will help to eliminate financial hidden dangers and make the financial industry develop in a healthy and orderly direction.

Warehouse Construction Practice Business Background


•Industry Status:
Finance is at the heart of the modern economy. my country's financial industry has continued to develop in the course of market-oriented reform and opening-up, and the total financial volume has grown substantially. Financial stability is directly related to the future and destiny of national economic development, and the financial industry is a barometer of national economic development. Objectively analyzing the development status of my country's financial industry and exploring the development trend of the financial industry will help to eliminate financial hidden dangers and make the financial industry develop in a healthy and orderly direction.

The role of big data in its industry:

1..Financial services and product innovation: With the help of massive users and data generated by social networks and other platforms , the interests and preferences of user groups are recorded, and the analysis of customer behavior patterns can bring product innovations that are closer to customer needs.

2.Enhanced user experience: Profile customers through big data analysis, provide users with personalized services and enhance user experience by combining the characteristics of customer portraits.

Business scene

An insurance company has developed a financial APP. The company will place insurance advertisements and release preferential activities in the APP, and users can apply for insurance through the APP.
The construction of a business involves several ends:

1.APP: Application, user access portal, users click through the APP to browse insurance advertisements, preferential activities, etc., and place insurance orders.
2.backend system:
a. Operators:
(1) According to the order submitted by the user, the total number of insured persons and the total insured amount in the specified time period are counted to assist in optimizing the operation plan .
(2) Analyze the daily behavior of users, analyze the information that each user is more concerned about, and use it as the data source of the recommendation system.

b: Business manager:
Monitor the changes in the insurance amount of key customers, and push key customers with large changes in the insurance amount to the business manager, who will carry out operations such as customer retention in a targeted manner.

Warehouse Construction Practice.Technology Architecture

Architecture analysis:
data collection : In this scenario, the data of the data warehouse mainly comes from the buried point information of APP and other systems, and is collected in real time to DATAHUB as the input data of
Flink . Real-time data warehouse architecture : In this scenario, the construction of the ETL and BI parts of the entire real-time data warehouse is all completed by Flink . Flink reads the data of DATAHUB in real time for processing, and performs related queries with dimension tables, and finally real-time statistics are collected. The results are entered into the downstream database RDS.

Business Metrics

•Operational Data Analysis
•Insured persons per hour
•total premium per hour
•Total policies per hour
•User behavior monitoring
•User's original insurance amount
•User's current insurance amount
•User Behavior Analysis
•The last page type the user visited
•The URL of the page last visited by the user

data structure

Scenario 1 : Operational Data Analysis

.

This scenario is used to calculate the total number of insured persons and total premiums per hour.

When a user applies for insurance, an order will be generated, and the order content includes the user id, user name, order number, etc. flink reads the order information in real time, uses where to filter out the data that is larger than the current hour (data filtering), then groups according to the user id and uses the last_value function to obtain the order information finally generated by each user (order deduplication), and finally according to the hour Dimension aggregation counts the total premiums and the total number of insureds in the current hour.

input table

CREATE TABLE user_order _
(
id bigint comment 'userid'
, order_no varchar comment 'order number'
, order_type bigint comment 'Order Type'
, pay_time bigint comment 'Payment time'
, order_price double comment 'Order Price'
, customer_name varchar comment 'username'
, customer_tel varchar comment 'user phone'
, certificate_no varchar comment 'document number'
, gmt_created bigint comment 'creation time'
, gmt_modified bigint comment 'Change the time'
, account_payble double comment 'Amounts payable'

) WITH (
type='datahub',
topic='user_order'
...
)
输出表
CREATE TABLE hs_order (
biz_date varchar COMMENT 'yyyymmddhh'
, total_premium DOUBLE COMMENT 'Total Premium'
, policy_cnt BIGINT COMMENT 'Number of policies'
, policy_holder_cnt BIGINT COMMENT 'Number of people insured'
,PRIMARY KEY ( biz_date )
) WITH
(
type = ' rds ' ,
tableName = ' adm_pfm_zy_order_gmv_msx_hs '
...
)
;
Business code
1: Data cleaning
create view last_order _
as
select
id as id
,last_value(order_no) as order_no
,last_value(customer_tel) as customer_tel
,last_value(gmt_modified) as gmt_modified
,last_value(account_payble) as account_payble
from user_order
where gmt_modified >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
group by id
;
2:数据汇总
insert into hs_order
select
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH') as biz_date
,sum(coalesce(account_payble,0)) as total_premium
,count(distinct order_no) as policy_cnt
,count(distinct customer_tel) as policy_holder_cnt
from last_order a
group by
from_ unixtime ( cast ( gmt_modified / 1000 as bigint ), ' yyyyMMddHH ' )
)a
;

Scenario 2: Warehouse Construction Practice.User Behavior Monitoring


Warehouse Construction Practice .This scenario monitors key users with large changes in the insured amount (the total insured amount changes by more than 15%).

Flink reads the user's new order data in real time. The new order includes the user's id and the current insurance amount, and filters the orders that have not been saved successfully by where. The dimension table stores key user data (such as the original insurance amount) that the business manager pays attention to. The user id in the new order is associated with the dimension table for query. If the user exists in the dimension table and the total insurance amount drops by more than 15%, then Push the user's detailed information to the business manager, and update the user's insurance amount and insurance information in the dimension table.

input table

create table update_info
(
id bigint comment 'userid'
,channel varchar comment 'Channel ID'
, open_id varchar comment 'order id'
, event varchar comment 'event type'
, now_premium varchar comment 'Current Insured Amount'
,creator varchar comment 'founder'
,modifier varchar comment 'Last Modified By'
, gmt_modified bigint comment 'Change the time'
, now_info varchar comment 'Current insurance information'
) with (
type = 'datahub' ,
topic = ' dh_prd_dm_account_wechat_trace '
...

);

dimension table
create table raw_info
(
id bigint comment 'userid'
, raw_premium varchar comment 'Original Insured Amount'
, raw_info varchar comment 'Original insurance information'
,PRIMARY KEY ( id )
, PERIOD FOR SYSTEM_TIME
) WITH (
type = ' ots ' ,
tableName = ' adm_zy_acct_sub_wechat_list '
...
);

output table
create table update_info
(
id bigint comment 'userid'
, raw_info varchar comment 'Original insurance information'
, now_info varchar comment 'Current insurance information'
, raw_premium varchar comment 'Original Insured Amount'
, now_premium varchar comment 'Current Insured Amount'
,PRIMARY KEY(id)
) WITH (
type='rds',
tableName='wechat_activity_user'
...
);
业务代码:
create view info_join as
select
t1.id as id
,t2.raw_info as raw_info
,t1.now_info as now_info
,t2.raw_premium as raw_premium
,t1.now_premium as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select
id as id
,raw_info as raw_info
,now_info as now_info
,raw_premium as raw_premium
,now_premium as now_premium
from info_join where now_premium;
insert into raw_info
select
id as id
,now_premium as raw_premium
, now _info as raw_info
from info_join

Scenario 3: Warehouse Construction Practice.User Behavior Analysis


In this scenario, the name and type of the page last accessed by the user are recorded as the feature value of the user portrait.
Flink reads the log information of the user browsing the APP page, such as user id, page name and page type, etc., groups according to the user id and device id, obtains the name and type of the last page accessed by the user through the last_value function, and outputs it to RDS as The input data of the recommendation system will push relevant advertising information for the user when they log in next time, so as to improve the user's advertisement click rate and the success rate of placing an order.

input table

create table user_log
(
log_time bigint comment 'Log collection date (Linux time)'
, device_id varchar comment 'device id'
, account_id varchar comment 'account id'
, bury_point_type varchar comment 'Page Type or Post Type'
, page_name varchar comment 'Page name or first-level directory when burying points'
) WITH (
type = 'datahub' ,
topic = ' edw_zy_evt_bury_point_log '
...
);
output table
create table user_last_log
(
account_id varchar comment 'account_id'
,device_id varchar comment '设备id'
,bury_point_type varchar comment '页面类型'
, page_name varchar comment 'page name'
,primary key ( account_id )
) WITH (
type = ' rds ' ,
tableName = ' adm_zy_moblie_charge_exchg_rs '
...

);
Business code

insert into user_last_log
select
account_id
,device_id
,last_value(bury_point_type) as bury_point_type
,last_value(page_name) as page_name
from user_log
where account_id is not null
group by account_id,device_id

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00