All Products
Search
Document Center

Platform For AI:PyAlink Script

Last Updated:Mar 13, 2026

PyAlink Script calls Alink algorithms through code for classification, regression, and recommendation tasks. The component integrates with other Designer components to build and validate business workflows.

Overview

PyAlink Script operates in two modes: standalone or combined with other Designer components. The component supports hundreds of Alink algorithms and reads/writes various data types through code. See Read and write data types. Deploy PipelineModel models generated by PyAlink Script as Elastic Algorithm Service (EAS) services. See Deploy model as EAS service.

Terms

Familiarize yourself with these terms before using PyAlink Script.

Functional module

Description

Operator

Each algorithm feature in Alink is an Operator. Operators are divided into batch and streaming types. For example, logistic regression includes:

  • LogisticRegressionTrainBatchOp: Trains logistic regression models.

  • LogisticRegressionPredictBatchOp: Performs batch predictions.

  • LogisticRegressionPredictStreamhOp: Performs stream predictions.

Connect operators using Link or LinkFrom:

# Define the data.
data = CsvSourceBatchOp()
# Train the logistic regression model.
lrTrain = LogisticRegressionTrainBatchOp()
# Predict using the logistic regression model.
LrPredict = LogisticRegressionPredictBatchOp()
# Train the model.
data.link(lrTrain)
# Make predictions.
LrPredict.linkFrom(lrTrain, data)

Each operator has configurable parameters. Logistic regression parameters include:

  • labelCol: Target column name (required). Type: String.

  • featureCols: Feature column names. Type: String[]. Default: NULL (all columns).

Configure parameters using `set` followed by the parameter name:

lr = LogisticRegressionTrainBatchOp()\
            .setFeatureCols(colnames)\
            .setLabelCol("label")

Data import (Source) and export (Sink) are special operator types. Connect them to algorithm components using Link or LinkFrom:

image

Alink includes common streaming and batch data sources. Example:

df_data = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 2]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='f0 int, f1 int, label int')
# load data
dataTest = input
colnames = ["f0","f1"]
lr = LogisticRegressionTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model = input.link(lr)
predictor = LogisticRegressionPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, dataTest).print()

Pipeline

Pipeline combines data processing, feature engineering, and model training for training, prediction, and online services:

quantileDiscretizer = QuantileDiscretizer()\
            .setNumBuckets(2)\
            .setSelectedCols("sepal_length")

binarizer = Binarizer()\
            .setSelectedCol("petal_width")\
            .setOutputCol("bina")\
            .setReservedCols("sepal_length", "petal_width", "petal_length", "category")\
            .setThreshold(1.);

lda = Lda()\
            .setPredictionCol("lda_pred")\
            .setPredictionDetailCol("lda_pred_detail")\
            .setSelectedCol("category")\
            .setTopicNum(2)\
            .setRandomSeed(0)

pipeline = Pipeline()\
    .add(binarizer)\
    .add(binarizer)\
    .add(lda)

pipeline.fit(data1)
pipeline.transform(data2)

Vector

Alink supports two custom vector data formats:

  • SparseVector

    Format: $4$1:0.1 2:0.2. The number between dollar signs ($) indicates vector length. Values after the dollar sign show column index and corresponding value.

  • DenseVector

    Format: 0.1 0.2 0.3. Space-separated values.

Note

Vector type columns use the parameter name `vectorColName`.

Supported components

PyAlink Script supports hundreds of Alink components for data processing, feature engineering, and model training.

Note

PyAlink Script supports Pipeline and batch components only. Stream components are not supported.

Standalone usage

This example scores the MovieLens dataset with an ItemCf model using PyAlink Script on Designer.

  1. Open Designer and create a blank pipeline. See Procedure.

  2. In the workflow list, find the pipeline and click Enter Workflow.

  3. Drag PyAlink Script from the component list to the canvas to create PyAlink Script-1.

    image

  4. Select PyAlink Script-1. Configure parameters on the Parameter Settings and Execution Tuning tabs.

    • On Parameter Settings, enter the code:

      from pyalink.alink import *
      
      def main(sources, sinks, parameter):
          PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
          RATING_FILE = "ratings.csv"
          PREDICT_FILE = "predict.csv"
          RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
      
          ratingsData = CsvSourceBatchOp() \
                  .setFilePath(PATH + RATING_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          predictData = CsvSourceBatchOp() \
                  .setFilePath(PATH + PREDICT_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          itemCFModel = ItemCfTrainBatchOp() \
                  .setUserCol("user_id").setItemCol("item_id") \
                  .setRateCol("rating").linkFrom(ratingsData);
      
          itemCF = ItemCfRateRecommender() \
                  .setModelData(itemCFModel) \
                  .setItemCol("item_id") \
                  .setUserCol("user_id") \
                  .setReservedCols(["user_id", "item_id"]) \
                  .setRecommCol("prediction_score")
      
          result = itemCF.transform(predictData)
      
          result.link(sinks[0])
          BatchOperator.execute()

      PyAlink Script has four output ports. The code result.link(sinks[0]) writes output data to the first port. Downstream components read the data by connecting to this port. See Read and write data types.

    • On Execution Tuning, configure the running mode and node specifications.

      Parameter

      Description

      Select job running mode

      Running modes:

      • DLC (single-node multi-concurrency): For small datasets during testing and validation.

      • MaxCompute (distributed): For large datasets or production tasks.

      • Fully managed Flink (distributed): Uses Flink cluster resources attached to the workspace for distributed execution.

      Number of workers

      Required when Select job running mode is MaxCompute (distributed) or Fully managed Flink (distributed). Specifies worker count. Default: empty (system allocates based on task data).

      Memory per worker (MB)

      Required when Select job running mode is MaxCompute (distributed) or Fully managed Flink (distributed). Specifies worker memory in MB (positive integer). Default: 8192.

      Number of CPU Cores per Node

      Required when Select job running mode is MaxCompute (distributed) or Fully managed Flink (distributed). Specifies CPU cores per worker (positive integer). Default: empty.

      Select node specifications for the script

      DLC node resource type. Default: 2 vCPU + 8 GB Mem-ecs.g6.large.

  5. Click Save, then click the Run button image to run the script.

  6. After the task completes, right-click PyAlink Script-1. Select View Data > Output 0 to view results.

    Column name

    Description

    user_id

    User ID.

    item_id

    Movie ID.

    prediction_score

    User's preference score for a movie. Used for movie recommendations.

Combined usage with Designer components

PyAlink Script input and output ports are compatible with other Designer components for seamless integration:组合使用

Read and write data types

  • Read data

    • Read MaxCompute tables from upstream component input ports:

      train_data = sources[0]
      test_data = sources[1]

      `sources[0]` represents the MaxCompute table for the first input port, `sources[1]` for the second. Supports up to four input ports.

    • Read from network file systems using Alink Source components (CsvSourceBatchOp, AkSourceBatchOp). Supported file types:

      • HTTP network shared files:

        ratingsData = CsvSourceBatchOp() \
                    .setFilePath(PATH + RATING_FILE) \
                    .setFieldDelimiter("\t") \
                    .setSchemaStr(RATING_SCHEMA_STRING)
      • OSS network files. Configure the read path as shown:image

        model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
  • Write data

    • Write to MaxCompute tables and pass to downstream components through output ports:

      result0.link(sinks[0])
      result1.link(sinks[1])
      BatchOperator.execute()

      `result0.link(sinks[0])` writes data to the first output port. Supports up to four output ports.

    • Write to OSS network files. Configure the write path as shown:image

      result.link(AkSinkBatchOp() \
                  .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
                  .setOverwriteSink(True))
      BatchOperator.execute()

Deploy model as EAS service

  1. Generate the model to deploy

    Deploy models as EAS services only if the model is a PipelineModel. Generate a PipelineModel file using this code. See Standalone usage.

    from pyalink.alink import *
    
    def main(sources, sinks, parameter):
        PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
        RATING_FILE = "ratings.csv"
        PREDICT_FILE = "predict.csv"
        RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
    
        ratingsData = CsvSourceBatchOp() \
                .setFilePath(PATH + RATING_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        predictData = CsvSourceBatchOp() \
                .setFilePath(PATH + PREDICT_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        itemCFModel = ItemCfTrainBatchOp() \
                .setUserCol("user_id").setItemCol("item_id") \
                .setRateCol("rating").linkFrom(ratingsData);
    
        itemCF = ItemCfRateRecommender() \
                .setModelData(itemCFModel) \
                .setItemCol("item_id") \
                .setUserCol("user_id") \
                .setReservedCols(["user_id", "item_id"]) \
                .setRecommCol("prediction_score")
    
        model = PipelineModel(itemCF)
        model.save().link(AkSinkBatchOp() \
                .setFilePath("oss://<your_bucket_name>/model.ak") \
                .setOverwriteSink(True))
        BatchOperator.execute()

    <your_bucket_name> is your OSS bucket name.

    Important

    Verify read permissions for the dataset path in PATH. Without permissions, the component fails to run.

  2. Generate the EAS configuration file

    Run this script to write output to `config.json`:

    # The configuration file for EAS
    import json
    
    # Generate the model configuration for EAS.
    model_config = {}
    # The schema for the data that EAS accepts.
    model_config['inputDataSchema'] = "id long, movieid long" 
    model_config['modelVersion'] = "v0.2"
    
    eas_config = {
        "name": "recomm_demo",
        "model_path": "http://xxxxxxxx/model.ak",
        "processor": "alink_outer_processor",
        "metadata": {
            "instance": 1,
            "memory": 2048,
            "region":"cn-beijing"
        },
        "model_config": model_config
    }
    print(json.dumps(eas_config, indent=4))

    Key `config.json` parameters:

    • name: Model service name.

    • model_path: OSS path storing the PipelineModel file. Replace with your actual path.

    See Command reference for other `config.json` parameters.

  3. Deploy the model as an EAS service

    Log on to the eascmd client to deploy the model service. See Download and authenticate the client. For Windows 64-bit systems, use this command:

    eascmdwin64.exe create config.json