To use the custom algorithm upload feature, you must develop an algorithm package, and create and publish algorithms based on the algorithm package. This topic describes how to develop an algorithm package.

Procedure

  1. Prepare a development environment.
    1. Download and decompress the Spark client package.
      tar xzvf spark-2.3.0-odps0.32.0.tar.gz
    2. Optional:Configure the JAVA_HOME and SPARK_HOME environment variables.
      Note If the Java environment has been installed on your computer, skip this step.
      • JAVA_HOME
        export JAVA_HOME=/path/to/jdk
        export PATH=$JAVA_HOME/bin/:$PATH
      • SPARK_HOME
        export SPARK_HOME=/path/to/spark_extracted_package
        export PATH=$SPARK_HOME/bin/:$PATH
  2. Configure the spark-defaults.conf file.
    1. Initialize the configuration file. Commands:
      cd $SPARK_HOME/conf
      cp spark-defaults.conf.template spark-defaults.conf
    2. Configure the account. Commands:
      spark.hadoop.odps.project.name=The name of the MaxCompute project
      spark.hadoop.odps.access.id=The AccessKey ID of your account
      spark.hadoop.odps.access.key=The AccessKey secret of your account
      spark.hadoop.odps.end.point=The endpoint of MaxCompute
    3. Optional:Use Spark SQL to access MaxCompute. Command:
      spark.sql.catalogImplementation=odps
    4. Configure resources. Commands:
      Note You can modify the configurations as needed.
      spark.executor.instances=
      spark.executor.cores=
      spark.executor.memory=
      spark.driver.cores=
      spark.driver.memory=
  3. Write code.
    In this example, PySpark is used and the code file is named read_example.py.
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext, DataFrame, SparkSession
    import sys
    def mainFunc():
        # Process input parameters.
        arg_dict = {}
        for arg in sys.argv:
            argParam = arg.split('=', 1)
            if len(argParam) > 1:
                arg_dict[argParam[0]] = argParam[1]
        # Define input nodes.
        INPUT_TABLE = arg_dict["inputTable1"]
        OUTPUT_TABLE = arg_dict["outputTable1"]
        ID_COL = arg_dict["idCol"]
        CONTENT_COL = arg_dict["contentCol"]
        conf = SparkConf().setAppName("odps_pyspark")
        sc = SparkContext(conf=conf)
        sql_context = SQLContext(sc)
        # Delete original data tables.
        spark = SparkSession.builder.appName("spark sql").getOrCreate()
        spark.sql("DROP TABLE IF EXISTS " + OUTPUT_TABLE)
        spark.sql("CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + "(id STRING,content STRING)")
        print ("Create odps table finished")
        normal_df = spark.sql("SELECT * FROM " + INPUT_TABLE)
        print ("Read normal odps table finished")
        spark.sql("INSERT INTO " + OUTPUT_TABLE + " SELECT " + ID_COL + " as id," + CONTENT_COL + " as content FROM " + INPUT_TABLE)
        print ("Write normal odps table finished")
        result_df = spark.sql("SELECT * FROM " + OUTPUT_TABLE)
        for i in result_df.collect():
            print (i)
    • Entry parameters
      read_example.mainFunc
    • User-defined parameters
      • inputTable1: the input table that corresponds to the input port of the algorithm component.
      • outputTable1: the output table that corresponds to the output port of the algorithm component.
      • idCol: the ID column of the input table. The values in this column must be of the STRING type.
      • contentCol: the content column of the input table. The values in this column must be of the STRING type.
      • currentProject: the name of the project.
  4. Debug your code. PySpark is used in this example.
    1. Save read_example.py to your computer. Create a file named a.py and add the following content:
      from read_example import mainFunc
      if __name__ == '__main__':
          mainFunc()
    2. Add the following configuration to the spark-defaults.conf file.
      spark.master=local[4]
    3. Run the following command to debug the code:
      cd $SPARK_HOME
      ./bin/spark-submit --driver-class-path cupid/odps-spark-datasource_2.11-3.3.8.jar --py-files python/lib/pyspark.zip,python/lib/py4j-0.10.6-src.zip ~/Desktop/a.py(The path to the a.py file) inputTable1=The name of the input table outputTable1=The name of the output table idCol=The name of the ID column contentCol=The name of the content column