All Products
Search
Document Center

Platform For AI:Skrip PyAlink

Last Updated:Mar 14, 2026

Skrip PyAlink memanggil algoritma Alink melalui kode untuk tugas klasifikasi, regresi, dan rekomendasi. Komponen ini terintegrasi dengan komponen Designer lainnya guna membangun dan memvalidasi alur kerja bisnis.

Ikhtisar

Skrip PyAlink beroperasi dalam dua mode: standalone atau digabungkan dengan komponen Designer lainnya. Komponen ini mendukung ratusan algoritma Alink serta membaca dan menulis berbagai tipe data melalui kode. Lihat Tipe data baca dan tulis. Model PipelineModel yang dihasilkan oleh Skrip PyAlink dapat dideploy sebagai layanan Elastic Algorithm Service (EAS). Lihat Deploy model sebagai layanan EAS.

Istilah

Pahami istilah-istilah berikut sebelum menggunakan Skrip PyAlink.

Modul fungsional

Deskripsi

Operator

Setiap fitur algoritma dalam Alink merupakan Operator. Operator dibagi menjadi tipe batch dan streaming. Sebagai contoh, regresi logistik mencakup:

  • LogisticRegressionTrainBatchOp: Melatih model regresi logistik.

  • LogisticRegressionPredictBatchOp: Melakukan prediksi batch.

  • LogisticRegressionPredictStreamhOp: Melakukan prediksi streaming.

Sambungkan operator menggunakan Link atau LinkFrom:

# Definisikan data.
data = CsvSourceBatchOp()
# Latih model regresi logistik.
lrTrain = LogisticRegressionTrainBatchOp()
# Prediksi menggunakan model regresi logistik.
LrPredict = LogisticRegressionPredictBatchOp()
# Latih model.
data.link(lrTrain)
# Lakukan prediksi.
LrPredict.linkFrom(lrTrain, data)

Setiap operator memiliki parameter yang dapat dikonfigurasi. Parameter regresi logistik meliputi:

  • labelCol: Nama kolom target (wajib). Tipe: String.

  • featureCols: Nama kolom fitur. Tipe: String[]. Default: NULL (semua kolom).

Konfigurasikan parameter menggunakan `set` diikuti nama parameter:

lr = LogisticRegressionTrainBatchOp()\
            .setFeatureCols(colnames)\
            .setLabelCol("label")

Impor data (Source) dan ekspor data (Sink) adalah tipe operator khusus. Sambungkan ke komponen algoritma menggunakan Link atau LinkFrom:

image

Alink menyediakan sumber data streaming dan batch umum. Contoh:

df_data = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 2]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='f0 int, f1 int, label int')
# muat data
dataTest = input
colnames = ["f0","f1"]
lr = LogisticRegressionTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model = input.link(lr)
predictor = LogisticRegressionPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, dataTest).print()

Pipeline

Pipeline menggabungkan pemrosesan data, rekayasa fitur, dan pelatihan model untuk pelatihan, prediksi, serta layanan online:

quantileDiscretizer = QuantileDiscretizer()\
            .setNumBuckets(2)\
            .setSelectedCols("sepal_length")

binarizer = Binarizer()\
            .setSelectedCol("petal_width")\
            .setOutputCol("bina")\
            .setReservedCols("sepal_length", "petal_width", "petal_length", "category")\
            .setThreshold(1.);

lda = Lda()\
            .setPredictionCol("lda_pred")\
            .setPredictionDetailCol("lda_pred_detail")\
            .setSelectedCol("category")\
            .setTopicNum(2)\
            .setRandomSeed(0)

pipeline = Pipeline()\
    .add(binarizer)\
    .add(binarizer)\
    .add(lda)

pipeline.fit(data1)
pipeline.transform(data2)

Vector

Alink mendukung dua format data vektor kustom:

  • SparseVector

    Format: $4$1:0.1 2:0.2. Angka di antara tanda dolar ($) menunjukkan panjang vektor. Nilai setelah tanda dolar menunjukkan indeks kolom dan nilai yang sesuai.

  • DenseVector

    Format: 0.1 0.2 0.3. Nilai dipisahkan spasi.

Catatan

Kolom bertipe Vector menggunakan nama parameter `vectorColName`.

Komponen yang didukung

Skrip PyAlink mendukung ratusan komponen Alink untuk pemrosesan data, rekayasa fitur, dan pelatihan model.

Catatan

Skrip PyAlink hanya mendukung komponen Pipeline dan batch. Komponen streaming tidak didukung.

Penggunaan standalone

Contoh ini memberikan skor pada dataset MovieLens dengan model ItemCf menggunakan Skrip PyAlink di Designer.

  1. Buka Designer dan buat pipeline kosong. Lihat Prosedur.

  2. Di daftar alur kerja, temukan pipeline tersebut dan klik Enter Workflow.

  3. Seret PyAlink Script dari daftar komponen ke kanvas untuk membuat PyAlink Script-1.

    image

  4. Pilih PyAlink Script-1. Konfigurasikan parameter pada tab Parameter Settings dan Execution Tuning.

    • Pada Parameter Settings, masukkan kode berikut:

      from pyalink.alink import *
      
      def main(sources, sinks, parameter):
          PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
          RATING_FILE = "ratings.csv"
          PREDICT_FILE = "predict.csv"
          RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
      
          ratingsData = CsvSourceBatchOp() \
                  .setFilePath(PATH + RATING_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          predictData = CsvSourceBatchOp() \
                  .setFilePath(PATH + PREDICT_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          itemCFModel = ItemCfTrainBatchOp() \
                  .setUserCol("user_id").setItemCol("item_id") \
                  .setRateCol("rating").linkFrom(ratingsData);
      
          itemCF = ItemCfRateRecommender() \
                  .setModelData(itemCFModel) \
                  .setItemCol("item_id") \
                  .setUserCol("user_id") \
                  .setReservedCols(["user_id", "item_id"]) \
                  .setRecommCol("prediction_score")
      
          result = itemCF.transform(predictData)
      
          result.link(sinks[0])
          BatchOperator.execute()

      Skrip PyAlink memiliki empat port output. Kode result.link(sinks[0]) menulis data output ke port pertama. Komponen downstream membaca data tersebut dengan menyambungkan ke port ini. Lihat Tipe data baca dan tulis.

    • Pada Execution Tuning, konfigurasikan mode running dan spesifikasi node.

      Parameter

      Deskripsi

      Select job running mode

      Mode Operasi:

      • DLC (single-node multi-concurrency): Untuk dataset kecil selama pengujian dan validasi.

      • MaxCompute (distributed): Untuk dataset besar atau tugas produksi.

      • Fully managed Flink (distributed): Menggunakan resource kluster Flink yang terhubung ke ruang kerja untuk eksekusi terdistribusi.

      Number of workers

      Wajib diisi ketika Select job running mode adalah MaxCompute (distributed) atau Fully managed Flink (distributed). Menentukan jumlah worker. Default: kosong (sistem mengalokasikan berdasarkan data tugas).

      Memory per worker (MB)

      Wajib diisi ketika Select job running mode adalah MaxCompute (distributed) atau Fully managed Flink (distributed). Menentukan memori per worker dalam MB (bilangan bulat positif). Default: 8192.

      Number of CPU Cores per Node

      Wajib diisi ketika Select job running mode adalah MaxCompute (distributed) atau Fully managed Flink (distributed). Menentukan jumlah core CPU per worker (bilangan bulat positif). Default: kosong.

      Select node specifications for the script

      Tipe resource node DLC. Default: 2 vCPU + 8 GB Mem-ecs.g6.large.

  5. Klik Save, lalu klik tombol Run image untuk menjalankan skrip.

  6. Setelah tugas selesai, klik kanan PyAlink Script-1. Pilih View Data > Output 0 untuk melihat hasil.

    Nama kolom

    Deskripsi

    user_id

    ID pengguna.

    item_id

    ID film.

    prediction_score

    Skor preferensi pengguna terhadap suatu film. Digunakan untuk rekomendasi film.

Penggunaan gabungan dengan komponen Designer

Port input dan output Skrip PyAlink kompatibel dengan komponen Designer lainnya untuk integrasi tanpa hambatan:组合使用

Tipe data baca dan tulis

  • Baca data

    • Baca tabel MaxCompute dari port input komponen upstream:

      train_data = sources[0]
      test_data = sources[1]

      `sources[0]` merepresentasikan tabel MaxCompute untuk port input pertama, `sources[1]` untuk yang kedua. Mendukung hingga empat port input.

    • Baca dari sistem file jaringan menggunakan komponen Source Alink (CsvSourceBatchOp, AkSourceBatchOp). Jenis file yang didukung:

      • File bersama jaringan HTTP:

        ratingsData = CsvSourceBatchOp() \
                    .setFilePath(PATH + RATING_FILE) \
                    .setFieldDelimiter("\t") \
                    .setSchemaStr(RATING_SCHEMA_STRING)
      • File jaringan OSS. Konfigurasikan path baca seperti ditunjukkan:image

        model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
  • Tulis data

    • Tulis ke tabel MaxCompute dan teruskan ke komponen downstream melalui port output:

      result0.link(sinks[0])
      result1.link(sinks[1])
      BatchOperator.execute()

      `result0.link(sinks[0])` menulis data ke port output pertama. Mendukung hingga empat port output.

    • Tulis ke file jaringan OSS. Konfigurasikan path tulis seperti ditunjukkan:image

      result.link(AkSinkBatchOp() \
                  .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
                  .setOverwriteSink(True))
      BatchOperator.execute()

Deploy model sebagai layanan EAS

  1. Hasilkan model yang akan di-deploy

    Deploy model sebagai layanan EAS hanya jika model tersebut merupakan PipelineModel. Hasilkan file PipelineModel menggunakan kode berikut. Lihat Penggunaan standalone.

    from pyalink.alink import *
    
    def main(sources, sinks, parameter):
        PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
        RATING_FILE = "ratings.csv"
        PREDICT_FILE = "predict.csv"
        RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
    
        ratingsData = CsvSourceBatchOp() \
                .setFilePath(PATH + RATING_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        predictData = CsvSourceBatchOp() \
                .setFilePath(PATH + PREDICT_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        itemCFModel = ItemCfTrainBatchOp() \
                .setUserCol("user_id").setItemCol("item_id") \
                .setRateCol("rating").linkFrom(ratingsData);
    
        itemCF = ItemCfRateRecommender() \
                .setModelData(itemCFModel) \
                .setItemCol("item_id") \
                .setUserCol("user_id") \
                .setReservedCols(["user_id", "item_id"]) \
                .setRecommCol("prediction_score")
    
        model = PipelineModel(itemCF)
        model.save().link(AkSinkBatchOp() \
                .setFilePath("oss://<your_bucket_name>/model.ak") \
                .setOverwriteSink(True))
        BatchOperator.execute()

    <your_bucket_name> adalah nama bucket OSS Anda.

    Penting

    Verifikasi izin baca untuk path dataset di PATH. Tanpa izin, komponen gagal dijalankan.

  2. Hasilkan file konfigurasi EAS

    Jalankan skrip berikut untuk menulis output ke `config.json`:

    # File konfigurasi untuk EAS
    import json
    
    # Hasilkan konfigurasi model untuk EAS.
    model_config = {}
    # Skema data yang diterima oleh EAS.
    model_config['inputDataSchema'] = "id long, movieid long" 
    model_config['modelVersion'] = "v0.2"
    
    eas_config = {
        "name": "recomm_demo",
        "model_path": "http://xxxxxxxx/model.ak",
        "processor": "alink_outer_processor",
        "metadata": {
            "instance": 1,
            "memory": 2048,
            "region":"cn-beijing"
        },
        "model_config": model_config
    }
    print(json.dumps(eas_config, indent=4))

    Parameter utama `config.json`:

    • name: Nama layanan model.

    • model_path: Path OSS tempat menyimpan file PipelineModel. Ganti dengan path aktual Anda.

    Lihat Referensi perintah untuk parameter `config.json` lainnya.

  3. Deploy model sebagai layanan EAS

    Login ke klien eascmd untuk deploy layanan model. Lihat Unduh dan autentikasi klien. Untuk sistem Windows 64-bit, gunakan perintah berikut:

    eascmdwin64.exe create config.json