All Products
Search
Document Center

OpenSearch:Developer guide for custom sorting models

Last Updated:Jul 15, 2024

This topic describes how to configure a custom sorting model in a JSON file and provides sample code to which you can add custom implementations.

Overview

This topic describes how to configure a custom sorting model in a JSON file and provides sample code to which you can add custom implementations.

Getting started

You must implement the CustomModel class that inherits from the BaseModel class.

The framework creates a graph by calling the build function. The following methods are called:

def build(self):

    self.build_placeholder()
    self.build_model()
    self.setup_global_step()
    self.reg_loss()
    self.loss_op()
    self.update_op()
    self.training_op()
    self.predictions_op()
    self.mark_output()
    self.metrics_op()
    self.summary_op()
    self.trace_sample_op()

You must implement the following methods:

def build_model(self):
    pass

def update_op(self):
    pass

def reg_loss(self):
    pass

def training_op(self):
    pass

def loss_op(self):
    pass

Sample code for the CustomModel class

from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope
from model_ops.tflog import tflogger as logging

import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel

from model_ops import ops as base_ops
from model_ops import utils


class CustomModel(BaseModel):
    def __init__(self,
                 config,
                 name="CTR"):
        super(CustomModel, self).__init__(config,name)

        # Define model variables collection
        self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
        self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)

        self.layer_dict = OrderedDict()

        self.embedding_columns = ['feature1','feature2']
        for feature_name in self.embedding_columns:
            self.generate_embedding_feature_column(feature_name,hash_bucket_size=1000,dimension=16,initializer=tf.zeros_initializer,combiner="sum",is_share_embedding=False,shared_embedding_name=None)

        """ 
        
        self.real_valued_columns = ['feature3','feature4']
        for feature_name in self.real_valued_columns:
            self.generate_real_valued_feature_column(feature_name,dtype="Float",value_dimension=1)

        self.sparse_id_columns = ['feature5','feature6']
        for feature_name in self.sparse_id_columns:
            self.generate_sparse_id_feature_column(feature_name,hash_bucket_size=1000,dimension=16,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
        """

        self.embedding_partitino_size = 4 * 1024 * 1024
        self.dnn_partition_size = 64 * 1024
        self.dnn_l2_reg = 1e-6
        self.clip_gradients = 5.0
        self.dnn_hidden_units = [1024, 512, 256]


    def build_placeholder(self):
        try:
            self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
        except KeyError:
            self.is_training = tf.placeholder(tf.bool, name="training")

    def setup_global_step(self):
        global_step = tf.Variable(
            initial_value=0,
            name="global_step",
            trainable=False,
            dtype=tf.int64,
            collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
        self.global_step = global_step


    def embedding_layer(self):
        with tf.variable_scope(name_or_scope="Embedding_Layer",
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.embedding_partitino_size),
                               reuse=tf.AUTO_REUSE) as scope:
            logging.info('ps num: {}, embedding prtition size: {} \n scope :{}'.format(self.config.ps_num,self.embedding_partitino_size,scope))
            self.layer_dict['dnn'] = layers.input_from_feature_columns(self.features,
                                                                                self.feature_columns_from_column_names(
                                                                                    self.embedding_columns),
                                                                                scope=scope)


    def dnn_layer(self):
        dnn_layer = []
        dnn_layer.append(self.layer_dict['dnn'])
        with tf.variable_scope(name_or_scope="{}_Score_Network".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)):
            self.dnn_net = tf.concat(values=dnn_layer, axis=1)
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
                    with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
                        tf.contrib.layers.apply_regularization(
                            regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
                            weights_list=[self.dnn_net])
                        self.dnn_net = layers.fully_connected(
                            self.dnn_net,
                            num_hidden_units,
                            utils.getActivationFunctionOp("llrelu"),
                            scope=dnn_hidden_layer_scope,
                            variables_collections=[self.collections_dnn_hidden_layer],
                            outputs_collections=[self.collections_dnn_hidden_output],
                            normalizer_fn=layers.batch_norm,
                            normalizer_params={"scale": True, "is_training": self.is_training})



    def logits_layer(self):
        with tf.variable_scope(name_or_scope="{}_Logits".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)) as dnn_logits_scope:
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                self.logits = layers.linear(
                    self.dnn_net,
                    1,
                    scope=dnn_logits_scope,
                    variables_collections=[self.collections_dnn_hidden_layer],
                    outputs_collections=[self.collections_dnn_hidden_output])

    def build_model(self):
        self.embedding_layer()
        self.dnn_layer()
        self.logits_layer()

    def update_op(self):
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        self.update_ops = []
        for update_op in update_ops:
            if update_op.name.startswith(self.name):
                self.update_ops.append(update_op)

    def reg_loss(self):
        reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
        self.reg_losses = []
        for reg_loss in reg_losses:
            if reg_loss.name.startswith(self.name):
                self.reg_losses.append(reg_loss)
        self.reg_loss = tf.reduce_sum(self.reg_losses)

    def training_op(self):
        if self.config.predict:
            self.train_op = None
            return
        with tf.variable_scope(name_or_scope="Optimize_Layer",
                               reuse=tf.AUTO_REUSE):
            gs = tf.train.get_or_create_global_step()
            logging.info("Global_step:{},{}".format(self.name, str(gs)))
            logging.info("Model_name:{},train_op_final_loss:{}".format(self.name, str(self.loss)))

            self.train_op, _, _ = myopt.optimize_loss(
                loss=self.loss,
                global_step=self.global_step,
                learning_rate=0.01,
                optimizer=tf.train.AdamAsyncOptimizer(learning_rate=0.01, beta1=0.9,
                                                      beta2=0.999, epsilon=1e-8,
                                                      use_locking=False),
                update_ops=self.update_ops,
                clip_gradients=self.clip_gradients,
                variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
                increment_global_step=True,
                summaries=myopt.OPTIMIZER_SUMMARIES)

    def loss_op(self):
        with tf.name_scope("{}_Loss_Op".format(self.name)):
            label = self.label
            self.loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    logits=self.logits,
                    labels=label))
            self.loss = self.loss + self.reg_loss



    def metrics_op(self):
        super(CustomModel, self).metrics_op()

    def summary_op(self):
        with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
            for key, metric in self.metrics.items():
                tf.summary.scalar(name=key, tensor=metric)

        with tf.name_scope("{}_Layer_Summary".format(self.name)):
            base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
            base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
            base_ops.add_weight_summary(self.collections_dnn_hidden_layer)

Features

You can obtain sample inputs by using self.features. key indicates the feature name that you specify.

Important

You must use the contrib.layers.input_from_feature_columns function to perform embedding on features. Do not use other embedding functions. Otherwise, unexpected online issues occur. Only the following types of feature columns are supported: sparse_column_with_hash_bucket, embedding_column, real_valued_column, and shared_embedding_columns. An embedding column cannot be used twice. In this case, use shared embedding columns instead.

API operations

To prevent incompatibility issues with online models, OpenSearch provides encapsulated API operations related to feature columns. We recommend that you directly use the following encapsulated API operations:

# Generates an embedding column.
self.generate_embedding_feature_column(
    feature_name,
    hash_bucket_size,
    dimension,
    initializer=tf.zeros_initializer,
    combiner="sum",
    is_share_embedding=False,
    shared_embedding_name=None
)

# Generates a real valued column.
self.generate_real_valued_feature_column(
    feature_name,
    dtype="Float", # Only Float and Int are supported.
    value_dimension=1
)

# Generates a sparse column.
self.generate_sparse_id_feature_column(
    feature_name,
    hash_bucket_size,
    combiner="sum"
)

# Queries the feature columns that are configured.
self.feature_columns_from_column_names(
    feature_list
)

Model specifications

To ensure compatibility with the online services, make sure that your model meets the following specifications:

CustomModel: When you initialize the CustomModel class, use the following code to call the parent class: super(CustomModel, self).__init__(config,name).

logits: Pass logits to self.logits. OpenSearch uses the sigmoid function to convert logits to the final score. If you want to use other functions to calculate scores, you must rewrite the predictions_op method.

loss: Pass loss to self.loss.

reg_loss: Pass reg_loss to self.reg_loss.

metrics_op: When you call the metrics_op method, use the following code to call the parent class: super(CustomModel, self).metrics_op(). OpenSearch monitors some common system metrics.

We recommend that you do not implement the following methods:

build_placeholde, rmark_output, and trace_sample_op. Use the default logic of the framework.

Notes

Variables and weights

If you need to use tf.Variable to create other variables or use online functions in addition to the contrib module. Add the variables to the MODEL_VARIABLES collection. Weights are loaded based on that collection. You must add only variables whose weights are loaded online. You do not need to add the global_step variable to that collection.

Example:

from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs

self._weights = vs.get_variable(
          _WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
          dtype=dtype,
          initializer=kernel_initializer,
          collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES])