All Products
Search
Document Center

Platform For AI:PyAlink Script

Last Updated:Mar 12, 2024

The PyAlink Script component allows you to call all algorithms of Alink by writing code. For example, you can use the PyAlink Script component to call a classification, regression, or recommendation algorithm of Alink for relevant purposes. You can also use the PyAlink Script component together with other algorithm components of Machine Learning Designer to create pipelines and verify their effects. This topic describes how to use the PyAlink Script component.

Background information

You can use the PyAlink Script component alone or use the PyAlink Script component together with other algorithm components. PyAlink Script supports hundreds of Alink components and allows you to read and write different types of data by writing code. For more information, see Method 1: Use the PyAlink Script component alone, Method 2: Use the PyAlink Script component together with other algorithm components of Machine Learning Designer, and Methods in which the PyAlink Script component reads and writes different types of data. You can deploy a PipelineModel generated by the PyAlink Script component as a service in Elastic Algorithm Service (EAS). For more information, see Example: Deploy a model generated by the PyAlink Script component as a service in EAS.

Concepts

Before you use the PyAlink Script component, familiarize yourself with the concepts described in the following table.

Term

Description

Operator

In Alink, an operator is an algorithm component. An operator can be a batch operator or a stream one. For example, the following logistic regression-related operators can be of the batch or stream type.

  • LogisticRegressionTrainBatchOp: logistic regression-based batch training operator

  • LogisticRegressionPredictBatchOp: logistic regression-based batch prediction operator

  • LogisticRegressionPredictStreamhOp: logistic regression-based stream prediction operator

You can use the Link or LinkFrom syntax to connect operators. Example:

# Define data. 
data = CsvSourceBatchOp()
# Use LogisticRegressionTrainBatchOp for training. 
lrTrain = LogisticRegressionTrainBatchOp()
# Use LogisticRegressionPredictBatchOp for prediction. 
LrPredict = LogisticRegressionPredictBatchOp()
# Perform training operations. 
data.link(lrTrain)
# Perform prediction operations. 
LrPredict.linkFrom(lrTrain, data)

Each operator comes with a set of parameters. For example, a logistic regression operator comes with the following parameters:

  • labelCol: the label column of the input table. This parameter is required. The value of this parameter is of the STRING type.

  • featureCols: the feature columns. The value of this parameter is of the STRING[] type. Default value: NULL. The value NULL indicates that all columns are selected.

Use a set together with a parameter name to set parameters. Examples:

lr = LogisticRegressionTrainBatchOp()\
            .setFeatureCols(colnames)\
            .setLabelCol("label")

Source and sink-related operators are special operators that must be defined first. Then, you can use the Link or LinkForm syntax to connect them with other algorithm components. The following figure shows the process of how to use these operators:

image

Alink components provide commonly-used stream and batch data sources. Example:

df_data = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 2]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='f0 int, f1 int, label int')
# load data
dataTest = input
colnames = ["f0","f1"]
lr = LogisticRegressionTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model = input.link(lr)
predictor = LogisticRegressionPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, dataTest).print()

Pipeline

Pipelines are another method to use Alink algorithms. You can integrate data processing, feature generation, and model training into a single pipeline to provide online training and prediction services. The following code provides an example on pipeline usage:

quantileDiscretizer = QuantileDiscretizer()\
            .setNumBuckets(2)\
            .setSelectedCols("sepal_length")

binarizer = Binarizer()\
            .setSelectedCol("petal_width")\
            .setOutputCol("bina")\
            .setReservedCols("sepal_length", "petal_width", "petal_length", "category")\
            .setThreshold(1.);

lda = Lda()\
            .setPredictionCol("lda_pred")\
            .setPredictionDetailCol("lda_pred_detail")\
            .setSelectedCol("category")\
            .setTopicNum(2)\
            .setRandomSeed(0)

pipeline = Pipeline()\
    .add(binarizer)\
    .add(binarizer)\
    .add(lda)

pipeline.fit(data1)
pipeline.transform(data2)

Vector

VECTOR is a custom data type in Alink. The following VECTOR types are available:

  • Sparse vectors

    Example: $4$1:0.1 2:0.2. In this example, the number between two $ signs indicates the vector length. The numbers behind the second $ sign indicate column attributes presented in the column index:column value format.

  • Dense vectors

    Example: 0.1 0.2 0.3. In this example, values are separated with spaces.

Note

In Alink, if a column is of the VECTOR type, the vectorColName parameter is often used to specify the column name.

Alink components supported by PyAlink Script

PyAlink Script provides hundreds of Alink components that cover fields such as data processing, feature engineering, and model training.

Note

PyAlink Script supports pipeline and batch components but not stream components.

Method 1: Use the PyAlink Script component alone

This section describes how to use the PyAlink Script component in Machine Learning Designer based on Alibaba Cloud resources. In this example, the item-based collaborative filtering (ItemCF) model is used to score the MovieLens dataset. Perform the following steps:

  1. Go to the Visualized Modeling (Designer) page and create a blank pipeline. For more information, see Procedure.

  2. On the Pipelines tab, find and click the pipeline that you create. Then, click Open.

  3. In the search box above the left-side component list, search for PyAlink Script. Then, drag PyAlink Script to the canvas on the right. A pipeline node named PyAlink Script-1 is automatically generated on the canvas.

    image

  4. On the canvas, click the PyAlink Script-1 node. In the right-side pane, configure parameters on the Parameters Setting and Tuning tabs.

    • Write the following code on the Parameters Settings tab.

      from pyalink.alink import *
      
      def main(sources, sinks, parameter):
          PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
          RATING_FILE = "ratings.csv"
          PREDICT_FILE = "predict.csv"
          RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
      
          ratingsData = CsvSourceBatchOp() \
                  .setFilePath(PATH + RATING_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          predictData = CsvSourceBatchOp() \
                  .setFilePath(PATH + PREDICT_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          itemCFModel = ItemCfTrainBatchOp() \
                  .setUserCol("user_id").setItemCol("item_id") \
                  .setRateCol("rating").linkFrom(ratingsData);
      
          itemCF = ItemCfRateRecommender() \
                  .setModelData(itemCFModel) \
                  .setItemCol("item_id") \
                  .setUserCol("user_id") \
                  .setReservedCols(["user_id", "item_id"]) \
                  .setRecommCol("prediction_score")
      
          result = itemCF.transform(predictData)
      
          result.link(sinks[0])
          BatchOperator.execute()

      The PyAlink Script component supports a maximum of four output ports. In the script, result.link(sinks[0]) is used to write the output data to the first output port of the PyAlink Script component. A downstream node can connect to the first output port to read the output data. For more information about how the PyAlink Script component reads and writes different types of data, see Methods in which the PyAlink Script component reads and writes different types of data.

    • On the Tuning tab, configure the parameters related to the running model and node specifications. The following table describes the parameters.

      Parameter

      Description

      Choose Running Mode

      Valid values:

      • DLC (Multi-Threads): We recommend that you select this value if you run a task that involves a small amount of data in the debugging phase.

      • MaxCompute (Distributed): We recommend that you select this value if you run a task that involves a large amount of data or run a production task.

      • Flink (Distributed): This value indicates that the resources of the Flink cluster associated with the current workspace run in distributed mode.

      The number of workers

      This parameter is available only if you set Choose Running Mode to MaxCompute (Distributed) or Flink (Distributed). This parameter specifies the number of workers. By default, this parameter is left empty. In this case, the system automatically specifies a value for this parameter based on the task data.

      The memory of each worker, in MB

      This parameter is available only if you set Choose Running Mode to MaxCompute (Distributed) or Flink (Distributed). This parameter specifies the memory size of each worker. Unit: MB. The value must be a positive integer. Default value: 8192.

      The cpu cores of each worker

      This parameter is available only if you set Choose Running Mode to MaxCompute (Distributed) or Flink (Distributed). This parameter specifies the number of CPU cores for each worker. The value must be a positive integer. By default, this parameter is left empty.

      Choose Node Specification to Run script

      This parameter specifies the specifications of the Deep Learning Containers (DLC) node. Default value: 2vCPU+8GB Mem-ecs.g6.large.

  5. Click Save above the canvas and click the image button to run PyAlink Script.

  6. After the execution of the task is complete, right-click PyAlink Script -1 on the canvas and choose View Data > Output 0 to view the result.

    Column name

    Description

    user_id

    The ID of the user.

    item_id

    The ID of the movie.

    prediction_score

    Indicates how much the user likes the movie. The value is used as a reference for movie recommendations.

Method 2: Use the PyAlink Script component together with other algorithm components of Machine Learning Designer

The input and output ports of the PyAlink Script component are the same as those of other algorithm components in Machine Learning Designer. You can connect the PyAlink Script component to other algorithm components to use them together. The following figure provides an example on how to use the PyAlink Script component together with other algorithm components of Machine Learning Designer.组合使用

Methods in which the PyAlink Script component reads and writes different types of data

  • Read data

    • Read data from MaxCompute tables: The PyAlink Script component reads data from upstream nodes by using its input ports. The following sample code provides an example:

      train_data = sources[0]
      test_data = sources[1]

      In the code, sources[0] indicates the MaxCompute table that corresponds to the first input port, sources[1] indicates the MaxCompute table that corresponds to the second input port, and so on. The PyAlink Script component supports a maximum of four input ports.

    • Read data from network file systems: The PyAlink Script component reads data by using the source components CsvSourceBatchOp and AkSourceBatchOP of Alink in the code. The PyAlink Script component can read data from the following types of files:

      • Network shared files in the HTTP format. The following sample code provides an example:

        ratingsData = CsvSourceBatchOp() \
                    .setFilePath(PATH + RATING_FILE) \
                    .setFieldDelimiter("\t") \
                    .setSchemaStr(RATING_SCHEMA_STRING)
      • Files stored in Object Storage Service (OSS). Specify the OSS path from which you want to read data based on the instructions shown in the following figure. imageThe following sample code provides an example:

        model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
  • Write data

    • Write data to MaxCompute tables: The PyAlink Script component writes data to downstream nodes by using the output ports. The following sample code provides an example:

      result0.link(sinks[0])
      result1.link(sinks[1])
      BatchOperator.execute()

      In the code, result0.link(sinks[0]) indicates that data is written to the result table that corresponds to the first output port. The written data can be accessed by using the first output port. The PyAlink Script component can write data to a maximum of four result tables.

    • Write data to OSS objects: Specify the OSS path to which you want to write data based on the instructions shown in the following figure. imageThe following sample code provides an example:

      result.link(AkSinkBatchOp() \
                  .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
                  .setOverwriteSink(True))
      BatchOperator.execute()

Example: Deploy a model generated by the PyAlink Script component as a service in EAS

  1. Generate the model to be deployed.

    Only a PipelineModel generated by the PyAlink Script component can be deployed as a service in EAS. Run the following sample code to generate a PipelineModel file. For more information, see Method 1: Use the PyAlink Script component alone.

    from pyalink.alink import *
    
    def main(sources, sinks, parameter):
        PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
        RATING_FILE = "ratings.csv"
        PREDICT_FILE = "predict.csv"
        RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
    
        ratingsData = CsvSourceBatchOp() \
                .setFilePath(PATH + RATING_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        predictData = CsvSourceBatchOp() \
                .setFilePath(PATH + PREDICT_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        itemCFModel = ItemCfTrainBatchOp() \
                .setUserCol("user_id").setItemCol("item_id") \
                .setRateCol("rating").linkFrom(ratingsData);
    
        itemCF = ItemCfRateRecommender() \
                .setModelData(itemCFModel) \
                .setItemCol("item_id") \
                .setUserCol("user_id") \
                .setReservedCols(["user_id", "item_id"]) \
                .setRecommCol("prediction_score")
    
        model = PipelineModel(itemCF)
        model.save().link(AkSinkBatchOp() \
                .setFilePath("oss://<your_bucket_name>/model.ak") \
                .setOverwriteSink(True))
        BatchOperator.execute()

    Replace <your_bucket_name> in the code with the name of the OSS bucket.

    Important

    Make sure that you can read data from the dataset path specified by the PATH parameter. Otherwise, this component cannot be run.

  2. Generate an EAS configuration file.

    Run the following code to write the output data to the config.json file:

    # Generate an EAS configuration file.
    import json
    
    # Generate EAS model configurations.
    model_config = {}
    # Specify the schema of the input data in EAS. 
    model_config['inputDataSchema'] = "id long, movieid long" 
    model_config['modelVersion'] = "v0.2"
    
    eas_config = {
        "name": "recomm_demo",
        "model_path": "http://xxxxxxxx/model.ak",
        "processor": "alink_outer_processor",
        "metadata": {
            "instance": 1,
            "memory": 2048,
            "region":"cn-beijing"
        },
        "model_config": model_config
    }
    print(json.dumps(eas_config, indent=4))

    Important parameters in the config.json file:

    • name: the name of the model to be deployed.

    • model_path: the OSS path in which the PipelineModel file is stored. Replace the value of the model_path parameter in the code with the actual OSS path.

    For more information about the other parameters in the config.json file, see Run commands to use the EASCMD client.

  3. Deploy the model as a service in EAS.

    You can log on to and use the EASCMD client to deploy the model. For more information about how to log on to the EASCMD client, see Download the EASCMD client and complete user authentication. For example, if you use the EASCMD client in 64-bit Windows, run the following command to deploy the model as a service in EAS:

    eascmdwin64.exe create config.json