Bagian ini menjelaskan cara menggunakan Python SDK untuk mengirimkan pekerjaan yang bertujuan menghitung jumlah kemunculan INFO, WARN, ERROR, dan DEBUG dalam file log.
Note: Pastikan Anda telah mendaftar layanan Batch Compute terlebih dahulu.
Contents:
Persiapkan pekerjaan
Unggah file data ke OSS
Unggah program tugas ke OSS
Gunakan SDK untuk mengirimkan pekerjaan
Periksa hasil
Pekerjaan ini bertujuan untuk menghitung jumlah kemunculan INFO, WARN, ERROR, dan DEBUG dalam file log.
Pekerjaan ini mencakup tugas-tugas berikut:
Tugas split digunakan untuk membagi file log menjadi tiga bagian.
Tugas count digunakan untuk menghitung jumlah kemunculan INFO, WARN, ERROR, dan DEBUG di setiap bagian file log. Dalam tugas count, InstanceCount harus diatur ke 3, yang menunjukkan bahwa tiga tugas count dimulai secara bersamaan.
Tugas merge digunakan untuk menggabungkan semua hasil penghitungan.
DAG

1.1. Unggah file data ke OSS
Unduh file data yang digunakan dalam contoh ini: log-count-data.txt.
Unggah file log-count-data.txt ke oss://your-bucket/log-count/log-count-data.txt.
your-bucketmenunjukkan bucket yang Anda buat sendiri. Dalam contoh ini, diasumsikan bahwa wilayahnya adalahcn-shenzhen.Untuk mengunggah file ke OSS, lihat Unggah file ke OSS.
1.2. Unggah program tugas ke OSS
Program pekerjaan yang digunakan dalam contoh ini dikompilasi menggunakan Python. Unduh programnya: log-count.tar.gz.
Dalam contoh ini, tidak perlu memodifikasi kode sampel. Anda dapat langsung mengunggah log-count.tar.gz ke OSS, misalnya oss://your-bucket/log-count/log-count.tar.gz.
Metode unggah telah dijelaskan sebelumnya.
Batch Compute hanya mendukung paket kompresi dengan ekstensi tar.gz. Pastikan Anda menggunakan metode sebelumnya (gzip) untuk pengepakan; jika tidak, paket tersebut tidak dapat diproses.
Jika Anda harus memodifikasi kode, dekompresi file, modifikasi kode, lalu ikuti langkah-langkah berikut untuk mengemas kode yang telah dimodifikasi:
Perintahnya adalah sebagai berikut:
> cd log-count # Beralih ke direktori. > tar -czf log-count.tar.gz * # Kemas semua file di bawah direktori ini ke log-count.tar.gz.Anda dapat menjalankan perintah berikut untuk memeriksa isi paket kompresi:
$ tar -tvf log-count.tar.gzDaftar berikut akan ditampilkan:
conf.py count.py merge.py split.py
2. Gunakan SDK untuk mengirimkan pekerjaan
Untuk informasi lebih lanjut tentang cara mengunduh dan menginstal Python SDK, klik di sini.
Jika versi SDK adalah v20151111, Anda harus menentukan ID kluster atau menggunakan parameter AutoCluster saat mengirimkan pekerjaan. Dalam contoh ini, AutoCluster digunakan. Anda harus mengonfigurasi parameter berikut untuk AutoCluster:
ID gambar yang tersedia. Anda dapat menggunakan gambar yang disediakan oleh sistem atau menyesuaikan gambar. Untuk informasi lebih lanjut tentang cara menyesuaikan gambar, lihat Gunakan gambar.
InstanceType. Untuk informasi lebih lanjut tentang jenis instans, lihat Jenis instans yang didukung saat ini.
Buat jalur untuk menyimpan StdoutRedirectPath (output program) dan StderrRedirectPath (log kesalahan) di OSS. Dalam contoh ini, jalur yang dibuat adalah oss://your-bucket/log-count/logs/.
Untuk menjalankan program dalam contoh ini, ubah variabel dengan komentar dalam program berdasarkan variabel yang dijelaskan sebelumnya dan variabel jalur OSS.
Berikut ini adalah template pengiriman program ketika Python SDK digunakan. Untuk makna spesifik dari parameter dalam program, klik di sini.
#encoding=utf-8
import sys
from batchcompute import Client, ClientError
from batchcompute import CN_SHENZHEN as REGION
from batchcompute.resources import (
JobDescription, TaskDescription, DAG, AutoCluster
)
ACCESS_KEY_ID='' # Masukkan AccessKeyID Anda
ACCESS_KEY_SECRET='' # Masukkan AccessKeySecret Anda
IMAGE_ID = 'img-ubuntu' # Masukkan ID gambar Anda
INSTANCE_TYPE = 'ecs.sn1.medium' # Masukkan jenis instans berdasarkan wilayah
WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz' Masukkan jalur penyimpanan OSS dari log-count.tar.gz yang diunggah
LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' Masukkan jalur penyimpanan OSS dari feedback error dan output tugas
OSS_MOUNT= '' # 'oss://your-bucket/log-count/' Mount pada "/home/inputs" dan "/home/outputs"
client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
def main():
try:
job_desc = JobDescription()
# Buat kluster otomatis.
cluster = AutoCluster()
cluster.InstanceType = INSTANCE_TYPE
cluster.ResourceType = "OnDemand"
cluster.ImageId = IMAGE_ID
# Buat tugas split.
split_task = TaskDescription()
split_task.Parameters.Command.CommandLine = "python split.py"
split_task.Parameters.Command.PackagePath = WORKER_PATH
split_task.Parameters.StdoutRedirectPath = LOG_PATH
split_task.Parameters.StderrRedirectPath = LOG_PATH
split_task.InstanceCount = 1
split_task.AutoCluster = cluster
split_task.InputMapping[OSS_MOUNT]='/home/input'
split_task.OutputMapping['/home/output'] = OSS_MOUNT
# Buat tugas map.
count_task = TaskDescription(split_task)
count_task.Parameters.Command.CommandLine = "python count.py"
count_task.InstanceCount = 3
count_task.InputMapping[OSS_MOUNT] = '/home/input'
count_task.OutputMapping['/home/output'] = OSS_MOUNT
# Buat tugas merge
merge_task = TaskDescription(split_task)
merge_task.Parameters.Command.CommandLine = "python merge.py"
merge_task.InstanceCount = 1
merge_task.InputMapping[OSS_MOUNT] = '/home/input'
merge_task.OutputMapping['/home/output'] = OSS_MOUNT
# Buat dag tugas.
task_dag = DAG()
task_dag.add_task(task_name="split", task=split_task)
task_dag.add_task(task_name="count", task=count_task)
task_dag.add_task(task_name="merge", task=merge_task)
task_dag.Dependencies = {
'split': ['count'],
'count': ['merge']
}
# Buat deskripsi pekerjaan.
job_desc.DAG = task_dag
job_desc.Priority = 99 # 0-1000
job_desc.Name = "log-count"
job_desc.Description = "PythonSDKDemo"
job_desc.JobFailOnInstanceFail = True
job_id = client.create_job(job_desc).Id
print('pekerjaan dibuat: %s' % job_id)
except ClientError, e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
if __name__ == '__main__':
sys.exit(main())3. Periksa status pekerjaan
Anda dapat melihat status pekerjaan dengan merujuk ke Dapatkan informasi pekerjaan.
jobInfo = client.get_job(job_id)
print (jobInfo.State)Pekerjaan mungkin berada dalam salah satu dari status berikut: Menunggu, Berjalan, Selesai, Gagal, dan Dihentikan.
4. Periksa hasil eksekusi pekerjaan
Anda dapat masuk ke Konsol OSS dan memeriksa file berikut di bawah bucket Anda: /log-count/merge_result.json.
Hasil yang diharapkan adalah sebagai berikut:
{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}Sebagai alternatif, Anda dapat menggunakan Ikhtisar untuk mendapatkan hasilnya.