- Compile Graph codes and perform basic tests using local debugging.
- Perform cluster debugging and verify the result.
This section uses the SSSP algorithm as an example to describe how to use Eclipse to develop and debug a Graph program.
- Create a Java project, for example, graph_examples.
- Add the JAR package in the lib directory of the MaxCompute client to Build Path of the Eclipse project. The following figure shows a configured Eclipse project:
- Develop a MaxCompute Graph program.
In the actual development process, an example (such as SSSP) is often copied and then modified. In this example, only the package path is changed to package com.aliyun.odps.graph.example.
- Compile and build the package.
In an Eclipse environment, right-click the source code directory (the src directory in the figure) and select Export > Java > JAR file to generate a JAR package. Select the path for storing the target JAR package, for example,
- Use the MaxCompute console to run SSSP. For more information about the related operations, see Run Graph in “Quick start”.
|For more information about the related development procedure, see Introduction on the Graph development plug-in.|
MaxCompute Graph supports the local debugging mode. Use Eclipse to perform breakpoint debugging.
- Download an odps-graph-local maven package.
- Select the Eclipse project, right-click the main program file (including the main function) of the Graph job, and configure its running parameters (by selecting Run As > Run Configurations).
- On the Arguments tab page, set Program arguments to
1 sssp_in sssp_outas the input parameter of the main program.
- On the Arguments tab page, set VM arguments to the following:
-Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point> -Dodps.access.id=<access.id> -Dodps.access.key=<access.key>
- If MapReduce is in local mode (the value of odps.end.point is not specified), you must create the sssp_in and sssp_out tables in the warehouse and add data for sssp_in. Input data is listed as follows:
1,"2:2,3:1,4:4" 2,"1:2,3:2,4:1" 3,"1:1,2:2,5:1" 4,"1:4,2:1,5:1" 5,"3:1,4:1"
For more information about the warehouse, see MapReduce local running.
- Click Run.
Note Check the settings of conf/odps_config.ini in the MaxCompute client to set parameters. The preceding parameters are commonly used. Other parameters are described as follows:
After SSSP debugging is implemented locally in Eclipse, the following information is output:
- odps.runner.mode: The parameter value is local. This parameter is required for the local debugging function.
- odps.project.name: (Required). Specifies the current project.
- odps.end.point: (Optional). Specifies the address of the current MaxCompute service. If this parameter is not specified, metadata of tables or resources is only read from the warehouse, and an exception is thrown when the address does not exist. If this parameter is specified, data is read from the warehouse first, and then from remote MaxCompute if the address does not exist.
- odps.access.id: Indicates the ID to connect to the MaxCompute service. This parameter is valid only when odps.end.point is specified.
- odps.access.key: Indicates the key to connect to the MaxCompute service. This parameter is valid only when odps.end.point is specified.
- odps.cache.resources: Specifies the resource list in use. This parameter has the same effect as -resources of the JAR command.
- odps.local.warehouse: Specifies the local warehouse path. This parameter is set to ./warehouse by default ,if not specified.
Counters: 3 com.aliyun.odps.graph.local.COUNTER TASK_INPUT_BYTE=211 TASK_INPUT_RECORD=5 TASK_OUTPUT_BYTE=161 TASK_OUTPUT_RECORD=5 graph task finish
Note In the preceding example, the sssp_in and sssp_out tables must exist in the local warehouse. For more information about the sssp_in and sssp_out tables, see Run Graph in “Quick start”.
Temporary directory of local job
- counters: Stores counting information about job running.
- inputs: Stores input data of the job. Data is preferentially obtained from the local warehouse. If such data does not exist locally, the MaxCompute SDK reads data from the server (if odps.end.point is set). An input reads only 10 data records by default. This threshold can be modified in the
-Dodps.mapred.local.record.limitparameter, of which the maximum value is 10,000.
- outputs: Stores output data of the job. If the local warehouse has an output table, the result data in the outputoverwrites the corresponding table in the local warehouse after job running is complete.
- resources: Stores resources used by the job. Similar to inputs, data is preferentially obtained from the local warehouse. If such data does not exist locally, the data is read from the server using MaxCompute SDK (when odps.end.point is set).
- job.xml: Indicates job configuration.
- superstep: Stores information about message persistence in each iteration.
| If a detailed log must be output during local debugging, the following log4j configuration file must be placed in the src directory:
After local debugging, submit the job to a cluster for testing.
- Configure the MaxCompute client.
- Run the
add jar /path/work.jar -f;command to update the JAR package.
- Run a JAR command to run the job, and view the running log and result data.
|For more information about how to run Graph in a cluster, see Run Graph in “Quick start”.|
The following section describes common performance tuning methods on the MaxCompute Graph framework.
Job Parameter Configuration
- setSplitSize(long): Indicates the split size of an input table. The unit is in MB. Its value must be greater than 0, and the default value is 64.
- setNumWorkers(int): Specifies the number of Workers for a job. The value range is [1, 1000], and the default value is –1. The number of Workers varies depending on the number of input bytes of the job and split size.
- setWorkerCPU(int): Indicates CPU resources of the Map. A one-core CPU contains 100 resources. The value range is [50, 800], and the default value is 200.
- setWorkerMemory(int): Indicates memory resources of the Map. The unit is MB. The value range is [256 MB, 12 GB], and the default value is 4,096 MB.
- setMaxIteration(int): Specifies the maximum number of iterations. The default value is –1. If the value is smaller than or equal to 0, the maximum number of iterations is not a condition for job termination.
- setJobPriority(int): Specifies the job priority. The value range is [0, 9], and the default value is 9. A larger value indicates a smaller priority.
- You can use the setNumWorkers() method to increase the number of Workers.
- You can use the setSplitSize() method to reduce the split size and increase the speed for a job to load data.
- Increase the CPU or memory of Workers.
- Set the maximum number of iterations. If applications do not have high requirements on result precision, you can reduce the number of iterations to speed up the process.
- If splitNum == workerNum, each Worker is responsible for loading one split.
- If splitNum > workerNum, each Worker is responsible for loading one or multiple splits.
- If splitNum < workerNum, each Worker is responsible for loading zero or one split.
Therefore, if the first two conditions are met, you can adjust workerNum and splitSize to enable fast data loading. In the iteration phase, you only need to adjust workerNum.
If you set runtime partitioning to false, we recommend that you use setSplitSize to control the number of Workers. Regarding the third condition, the number of vertices on some Worker may be 0. You can use set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; before the JAR command, which has the same effect as setNumWorkers and setSplitSize.
Another common performance problem is data skew. For example, on Counters, the number of vertices or edges processed by some Workers is much greater than that processed by other Workers.
Data skew occurs usually when the number of vertices, edges, or messages corresponding to some keys is much greater than that corresponding to other keys. Such keys with the large data volume are processed by a small number of Workers, resulting in a long run time of these Workers.
- Use a combiner to locally aggregate messages of vertices corresponding to such keys to reduce the number of sent messages.
- Improve the service logic.
Use a Combiner
Define a Combiner to reduce memory that stores messages and network data traffic volume and shortens the job execution time. For more information, see introduction to Combiner in MaxCompute SDK.
Reduce the Data Input Volume
- Reduce the input data volume: For decision-making applications, results obtained from processing subsets after data sampling only affect the result precision, instead of the overall accuracy. Therefore, you can perform special data sampling and import the data to the input table for processing.
- Avoid reading fields that are not used: The TableInfo class of the MaxCompute Graph framework supports reading specific columns (transmitted using column name arrays), rather than reading the entire table or table partition. This reduces the input data volume and improves job performance.
Built-in JAR Packages
|In a classpath that runs a JVM, the preceding built-in JAR packages are placed before users’ JAR packages, which may result in a version conflict. For example, if your program uses a function of a class in commons-codec-1.5.jar but this function is not in commons-codec-1.3.jar. Check whether an implementation method exists in commons-codec-1.3.jar or wait for MaxCompute to upgrade to a supported version.|