MaxCompute Practice - Data Storage - Alibaba Cloud Developer Forums: Cloud Discussion Forums

Blanche
Engineer
Engineer
  • UID619
  • Fans3
  • Follows2
  • Posts59
Reads:2830Replies:0

MaxCompute Practice - Data Storage

Created#
More Posted time:Sep 12, 2016 16:38 PM
MaxCompute (formerly ODPS) is a mass data processing platform developed by Alibaba.  It is mainly used to store and compute mass structured data, and also offer solutions for mass data warehouses and analysis & modeling services for big data. It supports PB level data analysis. The common scenarios are as follows: data warehouse and BI analysis for large-scale internet companies, log analysis on websites, transaction analysis on e-commerce websites, mining of user characteristics and interests and so on.
We access MaxCompute mainly for two purposes: one is log analysis on websites, and the other is mining of user characteristics and interests. The log analysis on websites has been well managed within the project team, while the "mining of user characteristics and interests" is still in exploration.
However, neither data analysis nor data mining can be done without data. MaxCompute does not collect data, but only processes data, like a yogurt machine without milk (MaxCompute is compared to the yogurt machine, and data to the milk). So the first step is to collect the mass log data.
The metadata in MaxCompute is stored in another open table storage service Table Store of Alibaba Cloud Computing, of which the content mainly includes the user space metadata, Table/Partition Schema, ACL, Job metadata, security system and so on.
We can start after the concept is clarified. Next, let's talk about how to collect the massive logs.
I. Data Collection
We don't care about the data source, which can either be MySQL or a text file. But our business scenario is mainly to produce a large number of logs (text files), such as Nginx request logs, PHP, Java, Node service logs, self-created burying point logs and so on.
First, we should collect the logs. We use TTLog service (it is a self-created log service; there are many similar log processing services, such as kafka and LogHub, which are largely identical but with minor differences, and TTLog is described here). With TTLog, we can easily create a task to read the log files on the servers in a timely manner. To put it simple, you only need to configure the log path on the server, and TTLog will help you pull the logs down and then help you to store them in the Table Store of MaxCompute; then you can analyze the data through MaxCompute.
Using TTLog to manage the data collection methods can really help us save a lot of manpower, and the methods are quite convenient, so that we can get it done in a few minutes. But in addition to convenience, TTLog requires us to abide by its rules. For example, the Table Store list (it is awkward to say, so we'll refer to it as MaxCompute Table) created must be filled according to the fields defined by TTLog, and the four fields are as follows:
1. content, the log content
2. ds(days), automatically generated by TTLog
3. hh(hour), automatically generated by TTLog
4. mm(minite), automatically generated by TTLog
We can see that content is the only field in our control, and the other fields will be generated automatically by TTLog. You need to use numerous LIKE statements to perform data analysis, for which the query speed may be reduced, and the main issues are that a bit more complex data analysis will not be implemented, and that it is rather difficult to use LIKE to imitate equal to, not equal to, contain and not contain. You finally set up the platform with much difficulty (not so much actually), and lament that this service is so awesome. Some time later, you will find that you can't produce a report upon request by your PM, and the PM will consider your work as severely lacking.
Then we can process the data format using some tips when setting the burying point. For example, use all kinds of symbols as separators, define a set of log format standards - anyhow there are a lot of solutions to the problem. But as Mr. Ma said, the best solution to the problem is to leave no chance for its occurrence... So please see below.
II. Creation of MaxCompute Data Source
MaxCompute provides all functional interfaces for TTLog, but it is very complicated to configure the environment despite the simple interface invocation. If the business requires in-depth analysis and mining of the data, you will have to configure the environment by yourself. Of course, it also demonstrates how powerful MaxCompute is (we may generally consider any high-cost application to be powerful)...
1. First, we need to create a Table, essentially the same as MySQL.
create table if not exists sale_detail(
shop_name     string,
customer_id   string,
total_price   double)
partitioned by (sale_date string,region string); //Set partitions


The partition must be taken into account, because the queries by MaxCompute can display 5,000 sets of data at most, and limit does not support offect, so that it will be difficult to implement online query via development kits (Data IDE) if the data size is too large. It also requires partition fields to export the data to the local disk. The larger the partition is, the more data you can export upon a request, so it is essential to set partitions reasonably.
2. Upload the data via MaxCompute DataHub Service (DHS), generally referred to as Datahub service.
Datahub service provides Java SDK, by which the real-time uploading function can be implemented. Datahub service interface does not require us to create a MaxCompute task, so it is very fast. It can provide data storage support for the services with high QPS (Query Per Second) and large throughput. The data in Datahub will only be stored for seven days, and then deleted. Before deletion, it will be stored in the MaxCompute table. The interface can also be invoked in an asynchronous manner to synchronize the data in Datahub to the MaxCompute table.
Java-based services can use SDK directly, and you need to weigh the cost for non-Java services. There should be similar TTLog platforms, and you may search at MaxCompute WIKI.
III. Summary
As it costs too much for our PHP environment to invoke DHS directly in the production environment, TTLog is temporarily used to collect the log data, then download the data to the local disk via PyODPS, and transfer it to MaxCompute Table after processing.
IV. End
To end, we need to post the core code for reference using the Python language.
1. We need also encapsulate the connection of MaxCompute, so that we can enjoy the powerful functions of MaxCompute whenever and wherever needed;
#/usr/bin/python3
__author__ = 'layne.fyc@gmail.com'
#coding=utf-8
from odps import ODPS
# Test address:endpoint='http://service-corp.odps.aliyun-inc.com/api'
# Formal address:endpoint='http://service.odps.aliyun-inc.com/api'
debug = True
onlineUrl = 'http://service.odps.aliyun-inc.com/api'
localUrl = 'http://service-corp.odps.aliyun-inc.com/api'
accessId = 'fill in your own ID'
accessKey = 'fill in your own key'

class OdpsConnect:
    model = object
    def __init__(self):
        self.model = ODPS(accessId,accessKey,project='Item Name', endpoint=(localUrl if debug else onlineUrl))        
    def getIntense(self):
        return self.model
    def exe(self,sql):
        return self.model.execute_sql(sql)


2. Download the data in MaxCompute to the local disk via Tunnel;
#/usr/bin/python3
__author__ = 'layne.fyc@gmail.com'
#coding=utf-8
import datetime
from OdpsConnect import OdpsConnect
#PyODPS encapsulates tunnel
from odps.tunnel import TableTunnel

odps = OdpsConnect().getIntense()
table = odps.get_table ('table name')

#init hours  The data obtained via TTLog will be subject to partition every 15 minutes, so you need to construct your own hour and minute structures.
hours = [
    '00','01','02','03','04','05',
    '06','07','08','09','10','11',
    '12','13','14','15','16','17',
    '18','19','20','21','22','23'
]
#init minute
minute = ['00','15','30','45']

#Download the data over the past five days
i = 5
while i > 0 :
    #Obtain the data i day(s) before, for example, 20160606
    last_time = (datetime.datetime.now() + datetime.timedelta(days=-i)).strftime('%Y%m%d')
    file = 'log/%s.log' % last_time
    output = open(file, 'wb')
    tunnel = TableTunnel(odps)
    for hour in hours:
        for mnt in minute:
            #You must fill it in like this, as tunnel downloading requires full partition name
            dt = "ds='%s',hh='%s',mm='%s'" %(last_time,hour,mnt)
            print('%s start!' % dt)
            download_session = tunnel.create_download_session(table.name,partition_spec = dt)
            if download_session.count == 0:continue
            with download_session.open_record_reader(0, download_session.count) as reader:
                for record in reader:
                    if hasattr(record,'content') :
                       #Write the data into text files in bytes; otherwise, there will be garbled characters
                        output.write(bytes(record.content, encoding = "utf8"))
    output.close()
    i = i-1

exit()


3. Upload the data to MaxCompute via Tunnel;
#/usr/bin/python3
__author__ = 'layne.fyc@gmail.com'
#coding=utf-8
from OdpsConnect import OdpsConnect
from odps.models import Schema, Column, Partition
from odps.tunnel import TableTunnel

odps = OdpsConnect().getIntense()
t = odps.get_table ('table name')
records = []
tunnel = TableTunnel(odps)

#The file data of five days will be uploaded by default
sz = 5
while sz > 0 :
    #Get the date
    last_time = (datetime.datetime.now() + datetime.timedelta(days=-sz)).strftime('%Y%m%d')
    #When inserting the data to MaxCompute, you need to first create partitions; otherwise nothing can be done...
    t.create_partition('dt='+last_time, if_not_exists=True)
    #Create a session based on the partitions
    upload_session = tunnel.create_upload_session(t.name, partition_spec='dt='+last_time)
    print('%s start!' % last_time)
    file = 'log/%s.log' % last_time
    #Sessions using the current partition store data row by row, which is fast. I tested 50 million rows of data, and found that it is nearly as fast as storing them to MySQL.
    with upload_session.open_record_writer(0) as writer:
        for line in open(file,'r',encoding='utf8'):
            #Check the core code, and store the data in MaxCompute according to the field order. As it involves specific business logic, the code analyzed after the data is read from the files is omitted here.
            writer.write(t.new_record(
            [
                1,2,3,4,5,6
            ]))
    upload_session.commit([0])
    sz = sz - 1;


PS: When using MaxCompute, remember to focus on data and give priority to partitions. To store or download data, you must set the partitions first, and then process the data.
Guest