MaxCompute does not provide a dedicated development plug-in for Graph. You can use Eclipse to develop MaxCompute Graph programs.
- Write your Graph code and use local debugging for basic testing.
- Perform cluster debugging to validate the results.
Development example
This section uses the SSSP algorithm as an example to demonstrate how to develop and debug a Graph program in Eclipse.
- Create a Java project named graph_examples.
- Add the JAR packages from the lib directory of the MaxCompute client to the Java Build Path of your Eclipse project. On the Libraries tab of the Java Build Path page, select mapreduce-api.jar, expand it, and then double-click Javadoc location. Select Javadoc URL and enter
http://odps.alibaba-inc.com/doc/prdoc/odps_graph/api/. - Develop the MaxCompute Graph program.
A common practice is to copy and modify an existing example, such as the Single Source Shortest Path algorithm. In this example, only the package path is changed to package com.aliyun.odps.graph.example.
- Compile and package the program.
In Eclipse, right-click the source code directory (the src directory in the figure) and select to generate a JAR package. Choose a destination path for the JAR package, such as D:\\odps\\clt\\odps-graph-example-sssp.jar.
- Use the MaxCompute client to run the SSSP job. For more information, see Run a Graph job.
Local debugging
MaxCompute Graph supports a local debugging mode that lets you perform breakpoint debugging in Eclipse.
- Download the odps-graph-local Maven package.
- In your Eclipse project, right-click the main program file of the Graph job (the file that contains the
mainfunction) and select . - On the Arguments tab, set Program arguments to 1 sssp_in sssp_out.
- On the Arguments tab, set the VM arguments as follows.
Also set the program parameters in Program arguments. For example, use-Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point> -Dodps.access.id=<access.id> -Dodps.access.key=<access.key>1 sssp_in sssp_out, which represent the start node, input table name, and output table name, respectively. - For local mode, where the odps.end.point parameter is not specified, you must create the sssp_in and sssp_out tables in the warehouse directory and add data to the input table sssp_in. The following code shows sample input data.
1,"2:2,3:1,4:4" 2,"1:2,3:2,4:1" 3,"1:1,2:2,5:1" 4,"1:4,2:1,5:1" 5,"3:1,4:1"For more information about the warehouse directory, see Run jobs locally.
- Click Run to run the SSSP job locally. Note For parameter settings, see the conf/odps_config.ini file in the MaxCompute client. The preceding parameters are commonly used. The following list describes these parameters:
- odps.runner.mode: The value must be set to local. This parameter is required for local debugging.
- odps.project.name: Specifies the current project. This parameter is required.
- odps.end.point: Specifies the endpoint of the MaxCompute service. This parameter is optional. If you omit this parameter, tables and resources are read from the local warehouse directory. An exception is thrown if the data or metadata does not exist. If you specify this parameter, the system first tries to read from the local warehouse. If the requested data or metadata does not exist, the system falls back to reading from MaxCompute.
- odps.access.id: The AccessKey ID for connecting to the MaxCompute service. This parameter is valid only when odps.end.point is specified.
- odps.access.key: The AccessKey Secret for connecting to the MaxCompute service. This parameter is valid only when odps.end.point is specified.
- odps.cache.resources: Specifies the list of resources to use. This is equivalent to the
-resourcesoption in the JAR command. - odps.local.warehouse: The path to the local warehouse directory. If this parameter is not specified, the default path is ./warehouse.
The following is the debug output from running the SSSP job locally in Eclipse.Counters: 3 com.aliyun.odps.graph.local.COUNTER TASK_INPUT_BYTE=211 TASK_INPUT_RECORD=5 TASK_OUTPUT_BYTE=161 TASK_OUTPUT_RECORD=5 graph task finishNote In the preceding example, the local warehouse directory must contain the sssp_in and sssp_out tables. For more information about the sssp_in and sssp_out tables, see Write a Graph program.
Temporary directory for local jobs
graph_20130816154834_240_5772. It contains the following subdirectories and files: counters (counter information), inputs (input data, such as zhemin_test1.sssp_in), outputs (output results, including the _default_ subdirectory), resources (resource files, including the centers subdirectory), superSteps (superstep information), and the job.xml job configuration file.- counters: Contains counter information generated during the job run.
- inputs: Contains the input data for the job. The system first attempts to retrieve data from the local warehouse. If the data is not found and the odps.end.point parameter is set, the system uses the MaxCompute SDK to read the data from the server. By default, a maximum of 10 records are read for each input. You can change this limit by using the
-Dodps.mapred.local.record.limitparameter, but the value cannot exceed 10,000. - outputswarehouseoutputs: Contains the output data of the job. After the job completes, result data from this directory overwrites the corresponding table in the local warehouse.
- resources: Contains the resources that are used by the job. Similar to inputs, the system first attempts to retrieve resources from the local warehouse. If the resources are not found, the system uses the MaxCompute SDK to read them from the server, provided that the odps.end.point parameter is set.
- job.xml: Contains the job configuration.
- superstep: Stores persistence information for each iteration.
Cluster debugging
After you complete local debugging, you can submit the job to a cluster for testing.
- Configure the MaxCompute client.
- Use the
add jar /path/work.jar -f;command to update the JAR package. - Run the job using the JAR command and check the run log and result data.
Performance optimization
setSplitSize(long): The split size for the input table. Unit: MB. The value must be greater than 0. Default value: 64.setNumWorkers(int): The number of workers for the job. Valid values: [1, 1000]. Default value: 1. The optimal number of workers depends on the job's input size in bytes and thesplitSize.setWorkerCPU(int): The CPU resources for each worker. A value of 100 represents one CPU core. Valid values: [50, 800]. Default value: 200.setWorkerMemory(int): The memory resources for each worker. Unit: MB. Valid values: [256, 12288]. Default value: 4096.setMaxIteration(int): The maximum number of iterations. Default value: -1. A value less than or equal to 0 indicates that the maximum number of iterations is not a job termination condition.setJobPriority(int): The priority of the job. Valid values: [0, 9]. Default value: 9. A larger value indicates a lower priority.
- Consider increasing the number of workers by using the
setNumWorkersmethod. - Consider reducing the split size by using the
setSplitSizemethod to accelerate data loading. - Increase the CPU or memory resources for each worker.
- Set the maximum number of iterations. For applications that do not require high precision, reduce the number of iterations to end the job sooner.
setNumWorkers and setSplitSize APIs can be used together to improve data loading speed. Assume that setNumWorkers is set to workerNum, setSplitSize is set to splitSize, and the total input size in bytes is inputSize. The number of input splits is calculated as splitNum=inputSize/splitSize. The relationship between workerNum and splitNum is as follows: - Scenario 1: If
splitNumequalsworkerNum, each worker is responsible for loading one split. - Scenario 2: If
splitNumis greater thanworkerNum, each worker is responsible for loading one or more splits. - Scenario 3: If
splitNumis less thanworkerNum, each worker is responsible for loading zero or one split.
Therefore, you should adjust workerNum and splitSize to ensure that one of the first two scenarios is met for faster data loading. During the iteration phase, you only need to adjust workerNum. If you set runtime partitioning to False, use setSplitSize to control the number of workers or ensure that one of the first two scenarios is met. If the third scenario occurs, some workers will have zero splits assigned. To prevent this, you can use the set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; command before the JAR command. This is equivalent to using setNumWorkers and setSplitSize.
- Use a Combiner to aggregate messages for these keys locally, which reduces the number of messages sent.
Developers can define a Combiner to reduce memory usage for message storage and decrease network traffic, thereby shortening job execution time.
- Optimize your business logic. When dealing with large datasets, reading data from disks can consume a significant portion of processing time. Reducing the amount of data that must be read can improve overall throughput and job performance. You can make the following improvements:
- Reduce the input data volume: For certain decision-making applications, processing a sampled subset of the data might only affect the precision of the result, not its overall accuracy. In such cases, consider sampling the data before importing it into the input table.
- Avoid reading unnecessary fields: The TableInfo class in the MaxCompute Graph framework lets you read specific columns, which are passed as an array of column names, instead of the entire table or partition. This also reduces the input data volume and improves job performance.
Built-in JAR packages
-libjars option in your command. - commons-codec-1.3.jar
- commons-io-2.0.1.jar
- commons-lang-2.5.jar
- commons-logging-1.0.4.jar
- commons-logging-api-1.0.4.jar
- guava-14.0.jar
- json.jar
- log4j-1.2.15.jar
- slf4j-api-1.4.3.jar
- slf4j-log4j12-1.4.3.jar
- xmlenc-0.52.jar