All Products
Search
Document Center

OpenSearch:Panduan pengembang untuk model pengurutan kustom

Last Updated:Jun 26, 2025

Topik ini menjelaskan cara mengonfigurasi model pengurutan kustom dalam file JSON dan menyediakan contoh kode untuk implementasi kustom.

Ikhtisar

Topik ini menjelaskan cara mengonfigurasi model pengurutan kustom dalam file JSON dan menyediakan contoh kode untuk implementasi kustom.

Memulai

Anda harus mengimplementasikan kelas CustomModel yang mewarisi dari kelas BaseModel.

Kerangka kerja membuat graf dengan memanggil fungsi build. Metode berikut dipanggil:

def build(self):

    self.build_placeholder()
    self.build_model()
    self.setup_global_step()
    self.reg_loss()
    self.loss_op()
    self.update_op()
    self.training_op()
    self.predictions_op()
    self.mark_output()
    self.metrics_op()
    self.summary_op()
    self.trace_sample_op()

Anda harus mengimplementasikan metode berikut:

def build_model(self):
    pass

def update_op(self):
    pass

def reg_loss(self):
    pass

def training_op(self):
    pass

def loss_op(self):
    pass

Contoh Kode untuk Kelas CustomModel

from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope
from model_ops.tflog import tflogger as logging

import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel

from model_ops import ops as base_ops
from model_ops import utils


class CustomModel(BaseModel):
    def __init__(self,
                 config,
                 name="CTR"):
        super(CustomModel, self).__init__(config,name)

        # Tentukan koleksi variabel model
        self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
        self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)

        self.layer_dict = OrderedDict()

        self.embedding_columns = ['feature1','feature2']
        for feature_name in self.embedding_columns:
            self.generate_embedding_feature_column(feature_name,hash_bucket_size=1000,dimension=16,initializer=tf.zeros_initializer,combiner="sum",is_share_embedding=False,shared_embedding_name=None)

        """ 
        
        self.real_valued_columns = ['feature3','feature4']
        for feature_name in self.real_valued_columns:
            self.generate_real_valued_feature_column(feature_name,dtype="Float",value_dimension=1)

        self.sparse_id_columns = ['feature5','feature6']
        for feature_name in self.sparse_id_columns:
            self.generate_sparse_id_feature_column(feature_name,hash_bucket_size=1000,dimension=16,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
        """

        self.embedding_partitino_size = 4 * 1024 * 1024
        self.dnn_partition_size = 64 * 1024
        self.dnn_l2_reg = 1e-6
        self.clip_gradients = 5.0
        self.dnn_hidden_units = [1024, 512, 256]


    def build_placeholder(self):
        try:
            self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
        except KeyError:
            self.is_training = tf.placeholder(tf.bool, name="training")

    def setup_global_step(self):
        global_step = tf.Variable(
            initial_value=0,
            name="global_step",
            trainable=False,
            dtype=tf.int64,
            collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
        self.global_step = global_step


    def embedding_layer(self):
        with tf.variable_scope(name_or_scope="Embedding_Layer",
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.embedding_partitino_size),
                               reuse=tf.AUTO_REUSE) as scope:
            logging.info('ps num: {}, embedding prtition size: {} \n scope :{}'.format(self.config.ps_num,self.embedding_partitino_size,scope))
            self.layer_dict['dnn'] = layers.input_from_feature_columns(self.features,
                                                                                self.feature_columns_from_column_names(
                                                                                    self.embedding_columns),
                                                                                scope=scope)


    def dnn_layer(self):
        dnn_layer = []
        dnn_layer.append(self.layer_dict['dnn'])
        with tf.variable_scope(name_or_scope="{}_Score_Network".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)):
            self.dnn_net = tf.concat(values=dnn_layer, axis=1)
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
                    with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
                        tf.contrib.layers.apply_regularization(
                            regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
                            weights_list=[self.dnn_net])
                        self.dnn_net = layers.fully_connected(
                            self.dnn_net,
                            num_hidden_units,
                            utils.getActivationFunctionOp("llrelu"),
                            scope=dnn_hidden_layer_scope,
                            variables_collections=[self.collections_dnn_hidden_layer],
                            outputs_collections=[self.collections_dnn_hidden_output],
                            normalizer_fn=layers.batch_norm,
                            normalizer_params={"scale": True, "is_training": self.is_training})



    def logits_layer(self):
        with tf.variable_scope(name_or_scope="{}_Logits".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)) as dnn_logits_scope:
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                self.logits = layers.linear(
                    self.dnn_net,
                    1,
                    scope=dnn_logits_scope,
                    variables_collections=[self.collections_dnn_hidden_layer],
                    outputs_collections=[self.collections_dnn_hidden_output])

    def build_model(self):
        self.embedding_layer()
        self.dnn_layer()
        self.logits_layer()

    def update_op(self):
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        self.update_ops = []
        for update_op in update_ops:
            if update_op.name.startswith(self.name):
                self.update_ops.append(update_op)

    def reg_loss(self):
        reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
        self.reg_losses = []
        for reg_loss in reg_losses:
            if reg_loss.name.startswith(self.name):
                self.reg_losses.append(reg_loss)
        self.reg_loss = tf.reduce_sum(self.reg_losses)

    def training_op(self):
        if self.config.predict:
            self.train_op = None
            return
        with tf.variable_scope(name_or_scope="Optimize_Layer",
                               reuse=tf.AUTO_REUSE):
            gs = tf.train.get_or_create_global_step()
            logging.info("Global_step:{},{}".format(self.name, str(gs)))
            logging.info("Model_name:{},train_op_final_loss:{}".format(self.name, str(self.loss)))

            self.train_op, _, _ = myopt.optimize_loss(
                loss=self.loss,
                global_step=self.global_step,
                learning_rate=0.01,
                optimizer=tf.train.AdamAsyncOptimizer(learning_rate=0.01, beta1=0.9,
                                                      beta2=0.999, epsilon=1e-8,
                                                      use_locking=False),
                update_ops=self.update_ops,
                clip_gradients=self.clip_gradients,
                variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
                increment_global_step=True,
                summaries=myopt.OPTIMIZER_SUMMARIES)

    def loss_op(self):
        with tf.name_scope("{}_Loss_Op".format(self.name)):
            label = self.label
            self.loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    logits=self.logits,
                    labels=label))
            self.loss = self.loss + self.reg_loss



    def metrics_op(self):
        super(CustomModel, self).metrics_op()

    def summary_op(self):
        with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
            for key, metric in self.metrics.items():
                tf.summary.scalar(name=key, tensor=metric)

        with tf.name_scope("{}_Layer_Summary".format(self.name)):
            base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
            base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
            base_ops.add_weight_summary(self.collections_dnn_hidden_layer)

Fitur

Anda bisa mendapatkan input sampel menggunakan self.features. Key menunjukkan nama fitur yang Anda tentukan.

Penting

Gunakan fungsi contrib.layers.input_from_feature_columns untuk melakukan penyematan pada fitur. Jangan gunakan fungsi penyematan lainnya, karena dapat menyebabkan masalah online tak terduga. Jenis kolom fitur yang didukung meliputi: sparse_column_with_hash_bucket, embedding_column, real_valued_column, dan shared_embedding_columns. Kolom penyematan tidak dapat digunakan dua kali; gunakan kolom penyematan bersama sebagai gantinya.

Operasi API

Untuk mencegah masalah ketidakcocokan dengan model online, OpenSearch menyediakan operasi API terkait kolom fitur yang dienkapsulasi. Kami sarankan Anda langsung menggunakan operasi API terenkapsulasi berikut:

# Menghasilkan kolom penyematan.
self.generate_embedding_feature_column(
    feature_name,
    hash_bucket_size,
    dimension,
    initializer=tf.zeros_initializer,
    combiner="sum",
    is_share_embedding=False,
    shared_embedding_name=None
)

# Menghasilkan kolom bernilai nyata.
self.generate_real_valued_feature_column(
    feature_name,
    dtype="Float", # Hanya Float dan Int yang didukung.
    value_dimension=1
)

# Menghasilkan kolom jarang.
self.generate_sparse_id_feature_column(
    feature_name,
    hash_bucket_size,
    combiner="sum"
)

# Meminta kolom fitur yang dikonfigurasi.
self.feature_columns_from_column_names(
    feature_list
)

Spesifikasi model

Pastikan model Anda memenuhi spesifikasi berikut untuk kompatibilitas dengan layanan online:

CustomModel: Saat menginisialisasi kelas CustomModel, gunakan kode berikut untuk memanggil kelas induk: super(CustomModel, self).__init__(config, name).

logits: Teruskan logits ke self.logits. OpenSearch menggunakan fungsi sigmoid untuk mengonversi logits menjadi skor akhir. Jika ingin menggunakan fungsi lain untuk menghitung skor, tulis ulang metode predictions_op.

loss: Teruskan loss ke self.loss.

reg_loss: Teruskan reg_loss ke self.reg_loss.

metrics_op: Saat memanggil metode metrics_op, gunakan kode berikut untuk memanggil kelas induk: super(CustomModel, self).metrics_op(). OpenSearch memantau beberapa metrik sistem umum.

Disarankan untuk tidak mengimplementasikan metode berikut:

build_placeholder, mark_output, dan trace_sample_op. Gunakan logika default dari kerangka kerja.

Catatan

Variabel dan bobot

Jika Anda perlu menggunakan tf.Variable untuk membuat variabel lain atau menggunakan fungsi online di luar modul contrib, tambahkan variabel tersebut ke koleksi MODEL_VARIABLES. Bobot dimuat berdasarkan koleksi ini. Anda hanya perlu menambahkan variabel yang bobotnya dimuat secara online, dan tidak perlu menambahkan variabel global_step ke koleksi tersebut.

Contoh:

from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs

self._weights = vs.get_variable(
          _WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
          dtype=dtype,
          initializer=kernel_initializer,
          collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES])