All Products
Search
Document Center

Platform For AI:Skrip PyAlink

Last Updated:Jun 21, 2026

Komponen Skrip PyAlink memungkinkan Anda memanggil algoritma Alink apa pun melalui penulisan kode. Komponen ini dapat digunakan untuk menjalankan berbagai tugas seperti klasifikasi, regresi, dan rekomendasi, serta terintegrasi secara mulus dengan komponen lain di Machine Learning Designer sehingga Anda dapat membangun dan memvalidasi pipeline bisnis end-to-end. Topik ini menjelaskan cara menggunakan komponen Skrip PyAlink.

Latar Belakang

Komponen Skrip PyAlink dapat digunakan dengan dua cara: secara mandiri atau digabungkan dengan komponen Machine Learning Designer lainnya. Komponen ini menyediakan akses ke ratusan komponen Alink dan mendukung pembacaan dan penulisan tipe data melalui kode. Model PipelineModel yang dihasilkan juga dapat di-deploy sebagai layanan EAS. Untuk informasi lebih lanjut, lihat Contoh: Deploy model yang dihasilkan oleh Skrip PyAlink sebagai layanan EAS.

Konsep utama

Sebelum menggunakan komponen Skrip PyAlink, pahami konsep-konsep utama berikut.

Konsep

Deskripsi

operator

Di Alink, operator merepresentasikan fungsi algoritma. Operator dikategorikan sebagai operator batch atau stream. Sebagai contoh, regresi logistik mencakup operator berikut:

  • LogisticRegressionTrainBatchOp: Melakukan pelatihan regresi logistik.

  • LogisticRegressionPredictBatchOp: Melakukan prediksi batch untuk regresi logistik.

  • LogisticRegressionPredictStreamOp: Melakukan prediksi stream untuk regresi logistik.

Operator dihubungkan menggunakan metode link atau linkFrom. Kode berikut memberikan contohnya.

# Definisikan data.
data = CsvSourceBatchOp()
# Pelatihan regresi logistik.
lrTrain = LogisticRegressionTrainBatchOp()
# Prediksi regresi logistik.
LrPredict = LogisticRegressionPredictBatchOp()
# Latih.
data.link(lrTrain)
# Prediksi.
LrPredict.linkFrom(lrTrain, data)

Setiap operator memiliki parameter. Sebagai contoh, regresi logistik mencakup parameter berikut.

  • labelCol: Nama kolom label pada tabel input. Ini adalah parameter yang diperlukan bertipe String.

  • featureCols: Array nama kolom fitur. Parameter ini bertipe String[]. Nilai default-nya NULL, yang berarti semua kolom dipilih.

Untuk mengonfigurasi parameter, gunakan awalan set diikuti nama parameter dalam CamelCase. Kode berikut memberikan contohnya.

lr = LogisticRegressionTrainBatchOp()\
            .setFeatureCols(colnames)\
            .setLabelCol("label")

Sumber data (data sources) dan sink adalah jenis operator khusus. Setelah didefinisikan, Anda dapat menghubungkannya ke komponen algoritma menggunakan metode link atau linkFrom.

Alink mencakup sumber data stream dan batch umum. Kode berikut memberikan contohnya.

df_data = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 2]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='f0 int, f1 int, label int')
# muat data
dataTest = input
colnames = ["f0","f1"]
lr = LogisticRegressionTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model = input.link(lr)
predictor = LogisticRegressionPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, dataTest).print()

pipeline

Anda juga dapat menggunakan algoritma Alink dalam pipeline, menggabungkan pemrosesan data, pembuatan fitur, dan pelatihan model ke dalam satu pipeline untuk pelatihan, prediksi, dan layanan online. Kode berikut memberikan contohnya.

quantileDiscretizer = QuantileDiscretizer()\
            .setNumBuckets(2)\
            .setSelectedCols("sepal_length")

binarizer = Binarizer()\
            .setSelectedCol("petal_width")\
            .setOutputCol("bina")\
            .setReservedCols("sepal_length", "petal_width", "petal_length", "category")\
            .setThreshold(1.);

lda = Lda()\
            .setPredictionCol("lda_pred")\
            .setPredictionDetailCol("lda_pred_detail")\
            .setSelectedCol("category")\
            .setTopicNum(2)\
            .setRandomSeed(0)

pipeline = Pipeline()\
    .add(binarizer)\
    .add(binarizer)\
    .add(lda)

pipeline.fit(data1)
pipeline.transform(data2)

vector

Tipe data kustom di Alink yang mendukung dua format:

  • sparse vector (SparseVector)

    Contoh: $4$1:0.1 2:0.2. Angka di antara tanda dolar ($) adalah panjang vektor. Nilai setelah tanda dolar kedua adalah pasangan indeks-nilai.

  • dense vector (DenseVector)

    Contoh: 0.1 0.2 0.3. Ini merepresentasikan urutan nilai yang dipisahkan spasi.

Catatan

Di Alink, jika suatu kolom bertipe vector, nama parameternya biasanya vectorColName.

Komponen Alink yang didukung oleh Skrip PyAlink

Anda dapat menggunakan ratusan komponen Alink dalam Skrip PyAlink, termasuk komponen untuk pemrosesan data, rekayasa fitur, dan pelatihan model.

Catatan

Komponen Skrip PyAlink saat ini mendukung komponen pipeline dan batch, tetapi tidak mendukung komponen stream.

Metode 1: Gunakan Skrip PyAlink secara mandiri

Topik ini menggunakan contoh pemberian skor dataset movielens dengan model ItemCf untuk menjelaskan cara menggunakan platform Machine Learning Designer dan sumber daya Alibaba Cloud guna menjalankan alur kerja yang diimplementasikan dengan Skrip PyAlink. Prosedurnya sebagai berikut.

  1. Buka halaman Machine Learning Designer dan buat pipeline kosong. Untuk informasi lebih lanjut, lihat Prosedur.

  2. Di daftar pipeline, pilih pipeline kosong yang telah Anda buat dan klik Open.

  3. Di kotak pencarian daftar komponen di sebelah kiri, cari PyAlink Script dan seret PyAlink Script ke kanvas di sebelah kanan. Node pipeline bernama PyAlink Script-1 akan dibuat secara otomatis di kanvas.

  4. Di kanvas, pilih node PyAlink Script-1. Di panel sebelah kanan, konfigurasikan parameter pada tab Parameter Settings dan Execution tuning.

    • Pada tab Parameter Settings, tulis kode Anda. Kode berikut merupakan contohnya.

      from pyalink.alink import *
      def main(sources, sinks, parameter):
          PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
          RATING_FILE = "ratings.csv"
          PREDICT_FILE = "predict.csv"
          RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
          ratingsData = CsvSourceBatchOp() \
                  .setFilePath(PATH + RATING_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
          predictData = CsvSourceBatchOp() \
                  .setFilePath(PATH + PREDICT_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
          itemCFModel = ItemCfTrainBatchOp() \
                  .setUserCol("user_id").setItemCol("item_id") \
                  .setRateCol("rating").linkFrom(ratingsData);
          itemCF = ItemCfRateRecommender() \
                  .setModelData(itemCFModel) \
                  .setItemCol("item_id") \
                  .setUserCol("user_id") \
                  .setReservedCols(["user_id", "item_id"]) \
                  .setRecommCol("prediction_score")
          result = itemCF.transform(predictData)
          result.link(sinks[0])
          BatchOperator.execute()

      Skrip PyAlink mendukung empat port output. Di skrip, gunakan result.link(sinks[0]) untuk menulis data ke port output pertama. Komponen downstream dapat membaca data dari skrip dengan menghubungkan ke port output pertama ini. Untuk informasi lebih lanjut, lihat Membaca dan menulis berbagai tipe data di Skrip PyAlink.

    • Pada tab Execution tuning, atur mode running dan spesifikasi node.

      Parameter

      Deskripsi

      Select job running mode

      Mode berikut didukung:

      • DLC (Single-machine, multi-concurrency): Direkomendasikan untuk tugas dengan dataset kecil serta untuk tujuan debugging dan validasi.

      • MaxCompute (Distributed): Direkomendasikan untuk tugas dengan dataset besar atau untuk tugas produksi.

      • Fully-managed Flink: Menjalankan pekerjaan pada kluster Fully-managed Flink yang terikat ke ruang kerja.

      Number of workers

      Parameter ini hanya diperlukan ketika job running mode diatur ke MaxCompute (Distributed) atau Fully-managed Flink (Distributed). Parameter ini menentukan jumlah node eksekusi. Jika Anda tidak mengisi parameter ini, sistem akan mengalokasikan node secara otomatis berdasarkan data tugas. Secara default, parameter ini kosong.

      Memory per worker (MB)

      Anda perlu mengonfigurasi parameter ini hanya ketika running mode of the job diatur ke MaxCompute (Distributed) atau Fully-managed Flink (Distributed). Parameter ini menentukan ukuran memori satu node dalam MB. Nilainya harus bilangan bulat positif, dan default-nya 8192.

      CPU cores per worker

      Parameter ini hanya diperlukan ketika Job running mode diatur ke MaxCompute (distributed) atau Fully-managed Flink (distributed). Parameter ini menentukan jumlah core CPU untuk satu node. Nilainya harus bilangan bulat positif dan secara default kosong.

      Select node specification to run script

      Jenis resource node DLC. Default-nya adalah 2 vCPU + 8 GB Mem-ecs.g6.large.

  5. Di atas kanvas, klik Save, lalu klik ikon Run image untuk menjalankan Skrip PyAlink.

  6. Ketika tugas selesai, klik kanan node PyAlink Script-1 di kanvas dan pilih View Data > Output 0 untuk melihat hasilnya.

    Nama kolom

    Deskripsi

    user_id

    ID pengguna.

    item_id

    ID film.

    prediction_score

    Menunjukkan preferensi pengguna terhadap film tersebut. Skor ini digunakan sebagai referensi untuk rekomendasi.

Metode 2: Gabungkan Skrip PyAlink dengan komponen lain

Port input dan output komponen Skrip PyAlink identik dengan komponen algoritma lain di Machine Learning Designer. Anda dapat menghubungkannya untuk membuat pipeline gabungan, seperti yang ditunjukkan pada gambar berikut.组合使用

Baca dan tulis data di Skrip PyAlink

  • Baca data.

    • Baca dari tabel MaxCompute: Skrip membaca data yang diteruskan dari komponen upstream melalui port input. Kode berikut memberikan contohnya.

      train_data = sources[0]
      test_data = sources[1]

      Dalam kode, sources[0] merepresentasikan tabel MaxCompute yang terhubung ke port input pertama, dan sources[1] merepresentasikan tabel yang terhubung ke port input kedua. Komponen ini mendukung hingga empat port input.

    • Baca dari sistem file jaringan: Skrip membaca data menggunakan komponen sumber Alink, seperti CsvSourceBatchOp dan AkSourceBatchOp, di dalam kode. Anda dapat membaca jenis file berikut:

      • Baca file bersama dari jaringan melalui HTTP. Kode berikut memberikan contohnya:

        ratingsData = CsvSourceBatchOp() \
                    .setFilePath(PATH + RATING_FILE) \
                    .setFieldDelimiter("\t") \
                    .setSchemaStr(RATING_SCHEMA_STRING)
      • Baca file OSS. Pertama, temukan path penyimpanan data sementara untuk pipeline di tab pipeline properties. Path ini, diformat sebagai oss://<bucket-name>/<path>, digunakan untuk operasi baca/tulis OSS. Kode berikut memberikan contohnya.

        model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
  • Tulis data.

    • Tulis ke tabel MaxCompute: Skrip menulis data ke komponen downstream melalui port output. Kode berikut memberikan contohnya.

      result0.link(sinks[0])
      result1.link(sinks[1])
      BatchOperator.execute()

      Baris result0.link(sinks[0]) menulis data dan membuatnya dapat diakses melalui port output pertama. Anda dapat menulis hingga empat tabel hasil, sesuai dengan empat port output.

    • Tulis ke file OSS. Gunakan path OSS yang didefinisikan di field temporary data storage path pipeline, yang terdapat di tab pipeline properties. Kode berikut memberikan contohnya.

      result.link(AkSinkBatchOp() \
                  .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
                  .setOverwriteSink(True))
      BatchOperator.execute()

Contoh: Deploy model sebagai layanan EAS

  1. Hasilkan model yang akan di-deploy.

    Anda hanya dapat men-deploy model sebagai layanan EAS jika model tersebut merupakan PipelineModel yang dihasilkan oleh komponen Skrip PyAlink. Gunakan kode berikut untuk menghasilkan file PipelineModel. Untuk instruksi menjalankan skrip, lihat Metode 1: Gunakan komponen Skrip PyAlink secara mandiri.

    from pyalink.alink import *
    def main(sources, sinks, parameter):
        PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
        RATING_FILE = "ratings.csv"
        PREDICT_FILE = "predict.csv"
        RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
        ratingsData = CsvSourceBatchOp() \
                .setFilePath(PATH + RATING_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
        predictData = CsvSourceBatchOp() \
                .setFilePath(PATH + PREDICT_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
        itemCFModel = ItemCfTrainBatchOp() \
                .setUserCol("user_id").setItemCol("item_id") \
                .setRateCol("rating").linkFrom(ratingsData);
        itemCF = ItemCfRateRecommender() \
                .setModelData(itemCFModel) \
                .setItemCol("item_id") \
                .setUserCol("user_id") \
                .setReservedCols(["user_id", "item_id"]) \
                .setRecommCol("prediction_score")
        model = PipelineModel(itemCF)
        model.save().link(AkSinkBatchOp() \
                .setFilePath("oss://<your_bucket_name>/model.ak") \
                .setOverwriteSink(True))
        BatchOperator.execute()

    Ganti <your_bucket_name> dengan nama bucket OSS Anda.

    Penting

    Pastikan Anda memiliki izin baca untuk path dataset yang dikonfigurasi di PATH. Jika tidak, komponen akan gagal dijalankan.

  2. Hasilkan file konfigurasi EAS.

    Jalankan skrip berikut untuk menulis output ke file config.json.

    # File konfigurasi EAS
    import json
    # Hasilkan konfigurasi model EAS
    model_config = {}
    # Skema data yang diterima EAS
    model_config['inputDataSchema'] = "id long, movieid long" 
    model_config['modelVersion'] = "v0.2"
    eas_config = {
        "name": "recomm_demo",
        "model_path": "http://xxxxxxxx/model.ak",
        "processor": "alink_outer_processor",
        "metadata": {
            "instance": 1,
            "memory": 2048,
            "region":"China (Beijing)"
        },
        "model_config": model_config
    }
    print(json.dumps(eas_config, indent=4))

    Parameter utama dalam file config.json:

    • name: Nama layanan model yang di-deploy.

    • model_path: Path OSS tempat file PipelineModel disimpan. Anda harus mengubahnya ke path OSS aktual file model Anda.

    Untuk penjelasan parameter lain dalam file config.json, lihat Referensi perintah.

  3. Deploy model sebagai layanan EAS.

    Deploy model menggunakan klien eascmd. Untuk instruksi penyiapan klien, lihat Unduh dan konfigurasi klien. Sebagai contoh, pada sistem Windows 64-bit, jalankan perintah ini:

    eascmdwin64.exe create config.json