Use the DataWorks data synchronization feature to copy data from an Alibaba Cloud Elasticsearch cluster into MaxCompute for offline analytics.
Prerequisites
Before you begin, make sure that you have:
MaxCompute activated. For details, see Activate MaxCompute
DataWorks activated. For details, see Activate DataWorks
A MaxCompute data source added to DataStudio. For details, see Add a MaxCompute data source
A workflow created in your DataWorks workspace. This guide uses a workspace in basic mode. For details, see Create a workflow
An Alibaba Cloud Elasticsearch cluster that is running. For details, see Getting started
The Elasticsearch cluster used in this guide has the following configuration:
| Parameter | Value |
|---|---|
| Region | China (Shanghai) |
| Zone | Zone B |
| Version | Elasticsearch 5.5.3 with Commercial Feature |
Background information
Alibaba Cloud Elasticsearch is available in the following versions: Elasticsearch 5.5.3 with Commercial Feature, Elasticsearch 6.3.2 with Commercial Feature, and Elasticsearch 6.7.0 with Commercial Feature. It includes the commercial X-Pack plug-in and supports use cases such as data analysis and search. Compared with open source Elasticsearch, Alibaba Cloud Elasticsearch additionally provides enterprise-class access control, security monitoring and alerting, and automatic reporting.
How it works
Populate the Elasticsearch index with source data by running an existing synchronization task.
Create a destination table in MaxCompute that matches the Elasticsearch field schema.
Configure and run an offline synchronization node in DataWorks. The node uses the Elasticsearch Reader to scroll through the index and the MaxCompute Writer to load rows into the destination table.
Query the destination table in MaxCompute to verify the migration.
Step 1: Create a source table in Elasticsearch
Follow Use DataWorks to synchronize data from MaxCompute to Alibaba Cloud Elasticsearch to populate the Elasticsearch index that you will migrate from.
Step 2: Create a destination table in MaxCompute
Log on to the DataWorks console. In the top navigation bar, select a region. In the left-side navigation pane, choose Data Development and O&M > Data Development. Select your workspace from the drop-down list and click Go to Data Development.
In the Scheduled Workflow pane of DataStudio, right-click your workflow name and choose Create Table > MaxCompute > Table.
In the Create Table dialog box, enter a name in the Name field and click Create.
NoteIf multiple MaxCompute data sources are associated with DataStudio, select the data source you want to use.
In the toolbar of the table configuration tab, click DDL.
In the DDL dialog box, enter the following table creation statement and click Generate Table Schema:
create table elastic2mc_bankdata ( age string, job string, marital string, education string, default string, housing string, loan string, contact string, month string, day of week string );Click Submit to Production Environment.
Step 3: Synchronize data
In the Scheduled Workflow pane, right-click your workflow and choose new > data integration > offline synchronization.
In the Create Node dialog box, enter a name in the Name field and click Confirm.
In the toolbar of the node configuration tab, click the conversion script icon to switch to script mode.
In script mode, click the template import icon.
In the import Template dialog box, select the source type, data source, target type, and data source, and click confirm.
Replace the generated script with the following configuration:
{ "type": "job", "steps": [ { "stepType": "elasticsearch", "parameter": { "retryCount": 3, "column": [ "age", "job", "marital", "education", "default", "housing", "loan", "contact", "month", "day_of_week", "duration", "campaign", "pdays", "previous", "poutcome", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y" ], "scroll": "1m", "index": "es_index", "pageSize": 1, "sort": { "age": "asc" }, "type": "elasticsearch", "connTimeOut": 1000, "retrySleepTime": 1000, "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200", "password": "xxxx", "search": { "match_all": {} }, "readTimeOut": 5000, "username": "xxxx" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": true, "compress": false, "datasource": "odps_source", "column": [ "age", "job", "marital", "education", "default", "housing", "loan", "contact", "month", "day_of_week", "duration", "campaign", "pdays", "previous", "poutcome", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y" ], "emptyAsNull": false, "table": "elastic2mc_bankdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [{ "from": "Reader", "to": "Writer" }] }, "setting": { "errorLimit": { "record": "0" }, "speed": { "throttle": false, "concurrent": 1, "dmu": 1 } } }The key Reader parameters are:
Parameter Description Value in this example endpointPublic endpoint and port of the Elasticsearch cluster. Find these on the Basic Information page of your cluster. http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200indexName of the Elasticsearch index to read from. es_indexscrollDuration each scroll context is kept alive on the Elasticsearch server. Increase this value if your dataset is large and each batch takes longer to process. 1mpageSizeNumber of documents fetched per scroll request. Increase this value for large datasets to improve throughput. 1sortField and order used to sort documents before reading. { "age": "asc" }retryCountNumber of retries on a failed request. 3connTimeOutConnection timeout in milliseconds. 1000readTimeOutRead timeout in milliseconds. Increase this value if scroll batches are slow to return. 5000The key Writer parameters are:
Parameter Description Value in this example datasourceName of the MaxCompute data source configured in DataStudio. odps_sourcetableName of the destination table in MaxCompute. elastic2mc_bankdatatruncateWhether to clear the destination table before loading. Set to falsefor incremental loads.trueemptyAsNullWhether to write empty strings as NULL.falseFor the full parameter reference, see Elasticsearch Reader.
Click the run icon in the toolbar to execute the synchronization job.
On the Runtime Logs tab, confirm that the job finishes without errors.
Step 4: Verify the migration
In the Scheduled Workflow pane, right-click your workflow and choose new > MaxCompute > ODPS SQL.
In the create a node dialog box, enter a name in the node name field and click submit.
In the ODPS SQL node editor, enter:
SELECT * FROM elastic2mc_bankdata;Click the run icon to execute the query.
On the Operation Log tab, review the results.