MaxCompute does not provide Graph development plug-ins for users. However, you still can develop the MaxCompute Graph program based on Eclipse. The recommended development process is as follows:
- Compile Graph codes and perform basic tests using local debugging.
- Perform cluster debugging and verify the result.
This section uses the SSSP algorithm as an example to describe how to use Eclipse to develop and debug a Graph program.
The procedure for SSSP development is as follows:
Create a Java project, for example, graph_examples.
Add the JAR package in the lib directory of the MaxCompute client to Build Path of the Eclipse project. The following figure shows a configured Eclipse project:
Develop the MaxCompute Graph program. In the actual development process, an example (such as SSSP) is often copied and then modified. In this example, only the package path is changed to package com.aliyun.odps.graph.example.
Compile and build the package. In an Eclipse environment, right-click the source code directory (the src directory in the figure) and select Export > Java > JAR file to generate a JAR package. Select the path for storing the target JAR package, for example, D:\odps\clt\odps-graph-example-sssp.jar.
Use the MaxCompute client to run SSSP. For details about the related operations, see Run Graph in “Quick start”.
- For details about the related development procedure, see Introduction on the Graph development plug-in.
MaxCompute Graph supports the local debugging mode. You can use Eclipse to perform breakpoint debugging.
The procedure for breakpoint debugging is as follows:
Download an odps-graph-local maven package.
Select the Eclipse project, right-click the main program file (including the main function) of the Graph job, and configure its running parameters (by selecting Run As > Run Configurations…).
On the Arguments tab page, set Program arguments to 1 sssp_in sssp_out as the input parameter of the main program.
On the Arguments tab page, set VM arguments to -Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point>-Dodps.access.id=<access.id> -Dodps.access.key=<access.key>.
If MapReduce is in local mode (the value of odps.end.point is not specified), you must create the sssp_in and sssp_out tables in the warehouse and add data for sssp_in. Input data is listed as follows. For details about the warehouse, see MapReduce local running.
- Click Run. SSSP runs locally.
Note: See settings of conf/odps_config.ini in the MaxCompute client to set parameters. The preceding parameters are commonly used. Other parameters are described as follows:
odps.runner.mode: The parameter value is local. This parameter is mandatory for the local debugging function.
odps.project.name: Specifies the current project, which is mandatory.
odps.end.point: Specifies the address of the current MaxCompute service, which is optional. If this parameter is not specified, metadata of tables or resources is only read from the warehouse, and an exception is thrown when the address does not exist. If this parameter is specified, data is read from the warehouse first, and then from remote MaxCompute when the address does not exist.
odps.access.id: Indicates the ID to connect to the MaxCompute service. This parameter is valid only when odps.end.point is specified.
odps.access.key: Indicates the key to connect to the MaxCompute service. This parameter is valid only when odps.end.point is specified.
odps.cache.resources: Specifies the resource list in use. This parameter has the same effect as -resources of the JAR command.
odps.local.warehouse: Specifies the local warehouse path. This parameter is set to ./warehouse by default if not specified.
After SSSP debugging is implemented locally in Eclipse, the following information is output:
graph task finish
Note: In the preceding example, the sssp_in and sssp_out tables must exist in the local warehouse. For details about the sssp_in and sssp_out tables, see Run Graph in “Quick start”.
A temporary directory is created in the Eclipse project directory when local debugging runs each time, as shown in the following figure.
The temporary directory of a locally running Graph job contains the following directories and files:
counters: Stores counting information about job running.
inputs: Stores input data of the job. Data is preferentially obtained from the local warehouse. If such data does not exist locally, the MaxCompute SDK reads data from the server (if odps.end.point is set). An input reads only 10 data records by default. This threshold can be modified in the -Dodps.mapred.local.record.limit parameter, of which the maximum value is 10,000.
outputs: Stores output data of the job. If the local warehouse has an output table, result data in outputs overwrites the corresponding table in the local warehouse after job running is complete.
resources: Stores resources used by the job. Similar to inputs, data is preferentially obtained from the local warehouse. If such data does not exist locally, the data is read from the server using MaxCompute SDK (when odps.end.point is set).
job.xml: Indicates job configuration.
superstep: Stores information about message persistence in each iteration.
- If a detailed log must be output during local debugging, the following log4j configuration file must be placed in the src directory: log4j.properties_odps_graph_cluster_debug.
After local debugging, you can submit the job to a cluster for testing. The procedure is as follows:
- Configure the MaxCompute client.
- Run the add jar /path/work.jar -f; command to update the JAR package.
- Run the JAR command to run the job, and view the running log and result data, as shown below:
- For details about how to run Graph in a cluster, see Run Graph in “Quick start”.
The following section describes common performance tuning methods on the MaxCompute Graph framework.
GraphJob configurations that have an impact on performance include:
- setSplitSize(long): Indicates the split size of an input table. The unit is in MB. Its value must be greater than 0, and the default value is 64.
- setNumWorkers(int): Specifies the number of Workers for a job. The value range is [1, 1000], and the default value is –1. The number of Workers varies depending on the number of input bytes of the job and split size.
- setWorkerCPU(int): Indicates CPU resources of the Map. A one-core CPU contains 100 resources. The value range is [50, 800], and the default value is 200.
- setWorkerMemory(int): Indicates memory resources of the Map. The unit is in MB. The value range is [256 MB, 12 GB], and the default value is 4,096 MB.
- setMaxIteration(int): Specifies the maximum number of iterations. The default value is –1. If the value is smaller than or equal to 0, the maximum number of iterations is not a condition for job termination.
- setJobPriority(int): Specifies the job priority. The value range is [0, 9], and the default value is 9. A larger value indicates a smaller priority.
- You can use the setNumWorkers() method to increase the number of Workers.
- You can use the setSplitSize() method to reduce the split size and increase the speed for a job to load data.
- Increase the CPU or memory of Workers.
- Set the maximum number of iterations. If applications do not have high requirements on result precision, you can reduce the number of iterations and complete the process as soon as possible.
The interfaces setNumWorkers and setSplitSize can be used together to speed up data loading. Assume that setNumWorkers is workerNum and setSplitSize is splitSize, and the total number of input bytes is inputSize. The number of splits is calculated using the formula: splitNum = inputSize/splitSize. The relationship between workerNum and splitNum is as follows:
- If splitNum = workerNum, each Worker is responsible for loading one split.
- If splitNum > workerNum, each Worker is responsible for loading one or multiple splits.
- If splitNum < workerNum, each Worker is responsible for loading zero or one split.
Therefore, you can adjust workerNum and splitSize to enable fast data loading when the first two conditions are met. In the iteration phase, you only need to adjust workerNum. If you set runtime partitioning to false, we recommend that you use setSplitSize to control the number of Workers. Alternatively, make sure that the first two conditions are met. In the case of the third condition, the number of vertices on some Worker may be 0. You can use set odps.graph.split.size=
Another common performance problem is data skew. For example, on Counters, the number of vertices or edges processed by some Workers is much greater than that processed by other Workers.
Data skew occurs usually when the number of vertices, edges, or messages corresponding to some keys is much greater than that corresponding to other keys. Such keys with the large data volume are processed by a small number of Workers, resulting in long running time of these Workers. You can follow these steps to resolve this problem:
- Use a combiner to locally aggregate messages of vertices corresponding to such keys to reduce the number of sent messages.
- Improve the service logic.
You can define a combiner to reduce the memory that stores messages and network data traffic volume, shortening the job execution time. For details, see introduction to Combiner in MaxCompute SDK.
When the data volume is large, reading data in a disk may consume processing time. Therefore, reducing the number of data bytes to be read can increase the overall throughput, and therefore improving job performance. You can use either of the following methods:
- Reduce the input data volume: For decision-making applications, results obtained from processing subsets after data sampling only affect the result precision, instead of the overall accuracy. Therefore, you can perform special data sampling and import the data to the input table for processing.
- Avoid reading fields that are not used: The TableInfo class of the MaxCompute Graph framework supports reading specific columns (transmitted using column name arrays), rather than reading the entire table or table partition. This reduces the input data volume and improves job performance.
The following JAR packages are loaded to JVMs running the Graph program by default. You do not have to upload these resources or carry these JAR packages when running -libjars on the command line.
- In a classpath that runs a JVM, the preceding built-in JAR packages are placed before users’ JAR packages, which may result in a version conflict. For example, if your program uses a function of a class in commons-codec-1.5.jar but this function is not in commons-codec-1.3.jar, you must check whether an implementation method exists in commons-codec-1.3.jar or wait for MaxCompute to upgrade a new version.