This topic describes how to configure a custom sorting model in a JSON file and provides sample code to which you can add custom implementations.
Overview
This topic describes how to configure a custom sorting model in a JSON file and provides sample code to which you can add custom implementations.
Getting started
You must implement the CustomModel class that inherits from the BaseModel class.
The framework creates a graph by calling the build function. The following methods are called:
def build(self):
self.build_placeholder()
self.build_model()
self.setup_global_step()
self.reg_loss()
self.loss_op()
self.update_op()
self.training_op()
self.predictions_op()
self.mark_output()
self.metrics_op()
self.summary_op()
self.trace_sample_op()You must implement the following methods:
def build_model(self):
pass
def update_op(self):
pass
def reg_loss(self):
pass
def training_op(self):
pass
def loss_op(self):
passSample code for the CustomModel class
from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope
from model_ops.tflog import tflogger as logging
import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel
from model_ops import ops as base_ops
from model_ops import utils
class CustomModel(BaseModel):
def __init__(self,
config,
name="CTR"):
super(CustomModel, self).__init__(config,name)
# Define model variables collection
self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)
self.layer_dict = OrderedDict()
self.embedding_columns = ['feature1','feature2']
for feature_name in self.embedding_columns:
self.generate_embedding_feature_column(feature_name,hash_bucket_size=1000,dimension=16,initializer=tf.zeros_initializer,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
"""
self.real_valued_columns = ['feature3','feature4']
for feature_name in self.real_valued_columns:
self.generate_real_valued_feature_column(feature_name,dtype="Float",value_dimension=1)
self.sparse_id_columns = ['feature5','feature6']
for feature_name in self.sparse_id_columns:
self.generate_sparse_id_feature_column(feature_name,hash_bucket_size=1000,dimension=16,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
"""
self.embedding_partitino_size = 4 * 1024 * 1024
self.dnn_partition_size = 64 * 1024
self.dnn_l2_reg = 1e-6
self.clip_gradients = 5.0
self.dnn_hidden_units = [1024, 512, 256]
def build_placeholder(self):
try:
self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
except KeyError:
self.is_training = tf.placeholder(tf.bool, name="training")
def setup_global_step(self):
global_step = tf.Variable(
initial_value=0,
name="global_step",
trainable=False,
dtype=tf.int64,
collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
self.global_step = global_step
def embedding_layer(self):
with tf.variable_scope(name_or_scope="Embedding_Layer",
partitioner=base_ops.partitioner(self.config.ps_num,
self.embedding_partitino_size),
reuse=tf.AUTO_REUSE) as scope:
logging.info('ps num: {}, embedding prtition size: {} \n scope :{}'.format(self.config.ps_num,self.embedding_partitino_size,scope))
self.layer_dict['dnn'] = layers.input_from_feature_columns(self.features,
self.feature_columns_from_column_names(
self.embedding_columns),
scope=scope)
def dnn_layer(self):
dnn_layer = []
dnn_layer.append(self.layer_dict['dnn'])
with tf.variable_scope(name_or_scope="{}_Score_Network".format(self.name),
partitioner=base_ops.partitioner(self.config.ps_num,
self.dnn_partition_size)):
self.dnn_net = tf.concat(values=dnn_layer, axis=1)
with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
tf.contrib.layers.apply_regularization(
regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
weights_list=[self.dnn_net])
self.dnn_net = layers.fully_connected(
self.dnn_net,
num_hidden_units,
utils.getActivationFunctionOp("llrelu"),
scope=dnn_hidden_layer_scope,
variables_collections=[self.collections_dnn_hidden_layer],
outputs_collections=[self.collections_dnn_hidden_output],
normalizer_fn=layers.batch_norm,
normalizer_params={"scale": True, "is_training": self.is_training})
def logits_layer(self):
with tf.variable_scope(name_or_scope="{}_Logits".format(self.name),
partitioner=base_ops.partitioner(self.config.ps_num,
self.dnn_partition_size)) as dnn_logits_scope:
with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
self.logits = layers.linear(
self.dnn_net,
1,
scope=dnn_logits_scope,
variables_collections=[self.collections_dnn_hidden_layer],
outputs_collections=[self.collections_dnn_hidden_output])
def build_model(self):
self.embedding_layer()
self.dnn_layer()
self.logits_layer()
def update_op(self):
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
self.update_ops = []
for update_op in update_ops:
if update_op.name.startswith(self.name):
self.update_ops.append(update_op)
def reg_loss(self):
reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
self.reg_losses = []
for reg_loss in reg_losses:
if reg_loss.name.startswith(self.name):
self.reg_losses.append(reg_loss)
self.reg_loss = tf.reduce_sum(self.reg_losses)
def training_op(self):
if self.config.predict:
self.train_op = None
return
with tf.variable_scope(name_or_scope="Optimize_Layer",
reuse=tf.AUTO_REUSE):
gs = tf.train.get_or_create_global_step()
logging.info("Global_step:{},{}".format(self.name, str(gs)))
logging.info("Model_name:{},train_op_final_loss:{}".format(self.name, str(self.loss)))
self.train_op, _, _ = myopt.optimize_loss(
loss=self.loss,
global_step=self.global_step,
learning_rate=0.01,
optimizer=tf.train.AdamAsyncOptimizer(learning_rate=0.01, beta1=0.9,
beta2=0.999, epsilon=1e-8,
use_locking=False),
update_ops=self.update_ops,
clip_gradients=self.clip_gradients,
variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
increment_global_step=True,
summaries=myopt.OPTIMIZER_SUMMARIES)
def loss_op(self):
with tf.name_scope("{}_Loss_Op".format(self.name)):
label = self.label
self.loss = tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(
logits=self.logits,
labels=label))
self.loss = self.loss + self.reg_loss
def metrics_op(self):
super(CustomModel, self).metrics_op()
def summary_op(self):
with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
for key, metric in self.metrics.items():
tf.summary.scalar(name=key, tensor=metric)
with tf.name_scope("{}_Layer_Summary".format(self.name)):
base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
base_ops.add_weight_summary(self.collections_dnn_hidden_layer)
Features
You can obtain sample inputs by using self.features. key indicates the feature name that you specify.
You must use the contrib.layers.input_from_feature_columns function to perform embedding on features. Do not use other embedding functions. Otherwise, unexpected online issues occur. Only the following types of feature columns are supported: sparse_column_with_hash_bucket, embedding_column, real_valued_column, and shared_embedding_columns. An embedding column cannot be used twice. In this case, use shared embedding columns instead.
API operations
To prevent incompatibility issues with online models, OpenSearch provides encapsulated API operations related to feature columns. We recommend that you directly use the following encapsulated API operations:
# Generates an embedding column.
self.generate_embedding_feature_column(
feature_name,
hash_bucket_size,
dimension,
initializer=tf.zeros_initializer,
combiner="sum",
is_share_embedding=False,
shared_embedding_name=None
)
# Generates a real valued column.
self.generate_real_valued_feature_column(
feature_name,
dtype="Float", # Only Float and Int are supported.
value_dimension=1
)
# Generates a sparse column.
self.generate_sparse_id_feature_column(
feature_name,
hash_bucket_size,
combiner="sum"
)
# Queries the feature columns that are configured.
self.feature_columns_from_column_names(
feature_list
)Model specifications
To ensure compatibility with the online services, make sure that your model meets the following specifications:
CustomModel: When you initialize the CustomModel class, use the following code to call the parent class: super(CustomModel, self).__init__(config,name).
logits: Pass logits to self.logits. OpenSearch uses the sigmoid function to convert logits to the final score. If you want to use other functions to calculate scores, you must rewrite the predictions_op method.
loss: Pass loss to self.loss.
reg_loss: Pass reg_loss to self.reg_loss.
metrics_op: When you call the metrics_op method, use the following code to call the parent class: super(CustomModel, self).metrics_op(). OpenSearch monitors some common system metrics.
We recommend that you do not implement the following methods:
build_placeholde, rmark_output, and trace_sample_op. Use the default logic of the framework.
Notes
Variables and weights
If you need to use tf.Variable to create other variables or use online functions in addition to the contrib module. Add the variables to the MODEL_VARIABLES collection. Weights are loaded based on that collection. You must add only variables whose weights are loaded online. You do not need to add the global_step variable to that collection.
Example:
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs
self._weights = vs.get_variable(
_WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
dtype=dtype,
initializer=kernel_initializer,
collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES])