Data Channel

Last Updated: Nov 12, 2017

MaxCompute provides two data import and export methods: using Tunnel Operation on the console directly or using TUNNEL written with java.

Tunnel Commands

Data Preparation

Suppose that we hava prepared the local file wc_example.txt and its corresponding contents are shown as follows:

  1. I LOVE CHINA!
  2. MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!

Here we save the file into the directory: D:\odps\odps\bin.

Create a MaxCompute Table

As we need to import the data mentioned above into a MaxCompute table, here we need to create a table at first:

  1. CREATE TABLE wc_in (word string);

Execute Tunnel Command

After the input table is created successfully, you can import the data on MaxCompute console through tunnel command, as follows:

  1. tunnel upload D:\odps\odps\bin\wc_example.txt wc_in;

After the running is successful, check the records in the table wc_in, as follows:

  1. odps@ $odps_project>select * from wc_in;
  2. ID = 20150918110501864g5z9c6
  3. Log view:
  4. http://webconsole.odps.aliyun-inc.com:8080/logview/?h=http://service-corp.odps.aliyun-inc.com/api&p=odps_public_dev&i=20150918
  5. QWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL29kcHNfcHVibGljX2Rldi9pbnN0YW5jZXMvMjAxNTA5MTgxMTA1MDE4NjRnNXo5YzYiXX1dLC
  6. +------+
  7. | word |
  8. +------+
  9. | I LOVE CHINA! |
  10. | MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL! |
  11. +------+

Note:

Tunnel SDK

On how to use SDK tunnel to upload data, the following simple scenario will be introduced.Scenario description: Upload data into MaxCompute, where the project is “odps_public_dev”, the table name is “tunnel_sample_test” and the partitions are ”pt=20150801,dt=”hangzhou”.

  1. create a table and add corresponding partitions:

    1. CREATE TABLE IF NOT EXISTS tunnel_sample_test(
    2. id STRING,
    3. name STRING)
    4. PARTITIONED BY (pt STRING, dt STRING); --Create a table.
    5. ALTER TABLE tunnel_sample_test
    6. ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --Add the partitions.
  2. Create the program directory structure of UploadSample, as follows:

    1. |---pom.xml
    2. |---src
    3. |---main
    4. |---java
    5. |---com
    6. |---aliyun
    7. |---odps
    8. |---tunnel
    9. |---example
    10. |---UploadSample.java

    UploadSample: tunnel source file.pom.xml: maven program file.

  3. Write UploadSample program, as follows:

    1. package com.aliyun.odps.tunnel.example;
    2. import java.io.IOException;
    3. import java.util.Date;
    4. import com.aliyun.odps.Column;
    5. import com.aliyun.odps.Odps;
    6. import com.aliyun.odps.PartitionSpec;
    7. import com.aliyun.odps.TableSchema;
    8. import com.aliyun.odps.account.Account;
    9. import com.aliyun.odps.account.AliyunAccount;
    10. import com.aliyun.odps.data.Record;
    11. import com.aliyun.odps.data.RecordWriter;
    12. import com.aliyun.odps.tunnel.TableTunnel;
    13. import com.aliyun.odps.tunnel.TunnelException;
    14. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
    15. public class UploadSample {
    16. private static String accessId = "####";
    17. private static String accessKey = "####";
    18. private static String tunnelUrl = "http://dt-corp.odps.aliyun-inc.com";
    19. private static String odpsUrl = "http://service-corp.odps.aliyun-inc.com/api";
    20. private static String project = "odps_public_dev";
    21. private static String table = "tunnel_sample_test";
    22. private static String partition = "pt=20150801,dt=hangzhou";
    23. public static void main(String args[]) {
    24. Account account = new AliyunAccount(accessId, accessKey);
    25. Odps odps = new Odps(account);
    26. odps.setEndpoint(odpsUrl);
    27. odps.setDefaultProject(project);
    28. try {
    29. TableTunnel tunnel = new TableTunnel(odps);
    30. tunnel.setEndpoint(tunnelUrl);
    31. PartitionSpec partitionSpec = new PartitionSpec(partition);
    32. UploadSession uploadSession = tunnel.createUploadSession(project,
    33. table, partitionSpec);
    34. System.out.println("Session Status is : "
    35. + uploadSession.getStatus().toString());
    36. TableSchema schema = uploadSession.getSchema();
    37. RecordWriter recordWriter = uploadSession.openRecordWriter(0);
    38. Record record = uploadSession.newRecord();
    39. for (int i = 0; i < schema.getColumns().size(); i++) {
    40. Column column = schema.getColumn(i);
    41. switch (column.getType()) {
    42. case BIGINT:
    43. record.setBigint(i, 1L);
    44. break;
    45. case BOOLEAN:
    46. record.setBoolean(i, true);
    47. break;
    48. case DATETIME:
    49. record.setDatetime(i, new Date());
    50. break;
    51. case DOUBLE:
    52. record.setDouble(i, 0.0);
    53. break;
    54. case STRING:
    55. record.setString(i, "sample");
    56. break;
    57. default:
    58. throw new RuntimeException("Unknown column type: "
    59. + column.getType());
    60. }
    61. }
    62. for (int i = 0; i < 10; i++) {
    63. recordWriter.write(record);
    64. }
    65. recordWriter.close();
    66. uploadSession.commit(new Long[]{0L});
    67. System.out.println("upload success!");
    68. } catch (TunnelException e) {
    69. e.printStackTrace();
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. }
    73. }
    74. }

    Note: here we ignored the configuration of accessId and accesskey. In actual operation, please change your own accessId and accessKey.

  4. The configuration of pom.xml is shown as follows:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.aliyun.odps.tunnel.example</groupId>
    7. <artifactId>UploadSample</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>com.aliyun.odps</groupId>
    12. <artifactId>odps-sdk-core-internal</artifactId>
    13. <version>0.20.7</version>
    14. </dependency>
    15. </dependencies>
    16. <repositories>
    17. <repository>
    18. <id>alibaba</id>
    19. <name>alibaba Repository</name>
    20. <url>http://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
    21. </repository>
    22. </repositories>
    23. </project>
  5. Compile and run:Compile the program UploadSample:

    1. mvn package

    Run the program UploadSample. Here we use Eclipse to import maven project: right-click on the java program and click Import > Maven > Existing Maven Projects.

Right-click on ‘UploadSample.java’ and click Run As > Run Configurations.

Click Run. After running successfully, the console shows as follows:

  1. Session Status is : NORMAL
  2. upload success!
  1. Check running result:Input the following sentence on the console:
    1. select * from tunnel_sample_test;
    The result is shown as follows:
    1. +----+------+----+----+
    2. | id | name | pt | dt |
    3. +----+------+----+----+
    4. | sample | sample | 20150801 | hangzhou |
    5. | sample | sample | 20150801 | hangzhou |
    6. | sample | sample | 20150801 | hangzhou |
    7. | sample | sample | 20150801 | hangzhou |
    8. | sample | sample | 20150801 | hangzhou |
    9. | sample | sample | 20150801 | hangzhou |
    10. | sample | sample | 20150801 | hangzhou |
    11. | sample | sample | 20150801 | hangzhou |
    12. | sample | sample | 20150801 | hangzhou |
    13. | sample | sample | 20150801 | hangzhou |
    14. +----+------+----+----+

    Notes:

    • As an independent service in MaxCompute, Tunnel has exclusive access port provided for users. At the same time, there are multiple Tunnel services deployed for internal MaxCompute service, to meet the different needs of the production segment and pressure. To prevent the confusion caused by multiple sets of access addresses, MaxCompute provides the routing function of access ports. The detailed access policies are showns as follows:
    • If you access Tunnel service from the production network, you just need to specify MaxCompute EndPoint (http://service.odps.aliyun-inc.com/api). You do not need to configure Tunnel EndPoint, which supports automatic routing accoring to MaxCompute endpoint.
    • If you access Tunnel service from office network, you just need to specify MaxCompute EndPoint (http://service-corp.odps.aliyun-inc.com/api). You do not need to configure Tunnel EndPoint, which supports automatic routing accoring to MaxCompute endpoint.

Fluentd Import Scheme

In addition to MaxCompute Console and Java SDK, data can also be imported through Fluentd.

Fluentd is an open source software, used to collect a variety of source logs (including Log Application, Log Access and Log Sys). It allows user to select corresponding plug-in to filter the log data and save the data into different data processing clients, including MySQL, Oracle, MongoDB, Hadoop, Treasure Data, AWS Services, Google Services and MaxCompute. Fluentd is known for its compact and flexible, allowing users to customize data sources, filter processing andtarget terminals.Currently in this software, there are 300+ plug-ins runing on the Fluentd architecture, and these plug-ins are all open source. MaxCompute also opened data imported plug-in in this software.

Equirement Preparation

In order to import data into MaxCompute through this software, we need prepare the following environments:

  • Ruby 2.1.0 or updated.

  • Gem 2.4.5 or updated.

  • Fluentd-0.10.49 or check the latest version from Fluentd Official Website. Fluentd provides different versions for different OS. For the details, refer to Fluentd Articles.

  • Protobuf-3.5.1 or updated (Ruby protobuf).

Install Import Plug-in

Next you can install MaxCompute Fluentd import plug-in by any of the following two ways.Method 1: install it through ruby gem.

  1. $ gem install fluent-plugin-aliyun-odps

MaxCompute has released this plug-in into GEM, which name is ‘fluent-plugin-aliyun-odps’. You just need to install it through ‘gem install’ command. (During using gem course, you may encounter that gem can not be accessed. Now you can search ‘change gem sourse’ from internet to solve this problem.)

Method 2: install it through the pulg-in source code

  1. $ gem install protobuf
  2. $ gem install fluentd --no-ri --no-rdoc
  3. $ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git
  4. $ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r

In the commands above, the second command is used to install fluentd. If you have already installed it, you can ignore this command. The source code of MaxCompute Fluentd plug-in is located on github. After clone, put it into ‘plugin’ directory of Fluentd.

Use Plug-in

To import data using Fluentd, the most important is to configure the ‘conf’ file of Fluentd. For more details of conf file, refer to Fluentd Configuration File Introduction.

Example 1: import Nginx log. The configuration in ‘source’ of Conf are shown as follows:

  1. <source>
  2. type tail
  3. path /opt/log/in/in.log
  4. pos_file /opt/log/in/in.log.pos
  5. refresh_interval 5s
  6. tag in.log
  7. format /^(?<remote>[^ ]*) - - \[(?<datetime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "-" "(?<agent>[^\"]*)"$/
  8. time_format %Y%b%d %H:%M:%S %z
  9. </source>

Fluentd monitors whether the specified file content has changed by ‘tail’. For more tail configuration, refer to Fluentd Articles. The configuration of match, as follows:

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint http://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint http://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project projectforlog
  11. <table in.log>
  12. table nginx_log
  13. fields remote,method,path,code,size,agent
  14. partition ctime=${datetime.strftime('%Y%m%d')}
  15. time_format %d/%b/%Y:%H:%M:%S %z
  16. </table>
  17. </match>

The data will be imported into the table nginx_log in the project ‘projectforlog’. The column ‘datatime’ in soruce data will be taken as the partition. The plug-in will create a partition automatically when meeting different values.

Example 2: import the data of MySQL. When importing the data in MySQL, we need install ‘fluent-plugin-sql’ as the data source.

  1. $ gem install fluent-plugin-sql

Configure ‘source’ in ‘conf’:

  1. <source>
  2. type sql
  3. host 127.0.0.1
  4. database test
  5. adapter mysql
  6. username xxxx
  7. password xxxx
  8. select_interval 10s
  9. select_limit 100
  10. state_file /path/sql_state
  11. <table>
  12. table test_table
  13. tag in.sql
  14. update_column id
  15. </table>
  16. </source>

This example is to select data from test_table, read 100 records each 10 seconds. When selecting, the ID column is taken as the primary key (ID field is the self enhancement). For more descriptions of ‘fluent-plugin-sql’, refer to Fluentd SQL Plug-in Description.

The configuration of ‘match’ is shown as follows:

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint http://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint http://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project your_projectforlog
  11. <table in.log>
  12. table mysql_data
  13. fields id,field1,field2,fields3
  14. </table>
  15. </match>

The data will be imported into the table ‘mysql_data’ in the project ‘projectforlog’. The imported fields include id, field1, field2 and field3.

Plug-in Parameter Description

To import data into MaxCompute, we need configure MaxCompute plug-in in the item ‘match’ in ‘conf’ file. The supported parameter descriptions are shown as follows:

  • type(Fixed): fixed value, aliyun_odps.
  • aliyun_access_id(Required): access_id.
  • aliyun_access_key(Required): access key.
  • aliyun_odps_hub_endpoint(Required): If your sever is deployed on ECS, set this value to be ‘http://dh-ext.odps.aliyun-inc.com‘; Otherwise, set it to be ‘http://dh.odps.aliyun.com‘.
  • aliyunodps_endpoint(Required):If your server is deployed on ESC, set this value to be ‘http://odps-ext.aiyun-inc.com/api‘; Otherwise, set it to be ‘http://service.odps.aliyun.com/api‘ .
  • buffer_chunk_limit(Optional): bolock size, the unit supports “k”(KB),“m”(MB) and “g”(GB). The default value is 8MB. Its suggested value is 2MB.
  • buffer_queue_limit(Optional): the size of block queue. This value together with ‘buffer_chunk_limit’ determine the size of entire buffer.
  • flush_interval(Optional): Mandatory sending interval. After the time reached the threshold and block data has not be full, then send the message mandatory. The defalt value is 60s.
  • project(Required): project name.
  • table(Required): table name.
  • fields(Required): corresponding to ‘source’. The field names must exist in ‘source’.
  • partition(Optional): if the table is partition table, set this item.
  • Setting mode of partition name:
    • fixed value: partition ctime=20150804
    • keyword: partition ctime=${remote} (remote is a field in ‘source’.)
    • time format keyword: partition ctime=${datetime.strftime(‘%Y%m%d’)} (‘datetime’ is a time format field in ‘source’. The output format is %Y%m%d, as the partition name).
    • time_format(Optional): if using ‘time format keyword’ as < partition >, please set this parameter. For example, source[datetime]=”29/Aug/2015:11:10:16 +0800”, then set < time_format >to be “%d/%b/%Y:%H:%M:%S %z”.

Flume

Besides using Fluentd to import dat, MaxCompute also supports importing data through Flume. Flume is an open source software of Apache. MaxCompute opened the source code of import plug-in based on the Flume. If you are interested in more details, refer to Flume MaxCompute Plug-in.

Thank you! We've received your feedback.