すべてのプロダクト
Search
ドキュメントセンター

OpenSearch:カスタムソートモデルの開発者ガイド

最終更新日:Dec 28, 2024

このトピックでは、JSONファイルでカスタムソートモデルを設定する方法について説明し、カスタム実装を追加できるサンプルコードを提供します。

概要

このトピックでは、JSONファイルでカスタムソートモデルを設定する方法について説明し、カスタム実装を追加できるサンプルコードを提供します。

はじめに

BaseModelクラスを継承するCustomModelクラスを実装する必要があります。

フレームワークは、build関数を呼び出すことによってグラフを作成します。次のメソッドが呼び出されます。

def build(self):

    self.build_placeholder()
    self.build_model()
    self.setup_global_step()
    self.reg_loss()
    self.loss_op()
    self.update_op()
    self.training_op()
    self.predictions_op()
    self.mark_output()
    self.metrics_op()
    self.summary_op()
    self.trace_sample_op()

以下のメソッドを実装する必要があります。

def build_model(self):
    pass

def update_op(self):
    pass

def reg_loss(self):
    pass

def training_op(self):
    pass

def loss_op(self):
    pass

CustomModelクラスのサンプルコード

from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope
from model_ops.tflog import tflogger as logging

import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel

from model_ops import ops as base_ops
from model_ops import utils


class CustomModel(BaseModel):
    def __init__(self,
                 config,
                 name="CTR"):
        super(CustomModel, self).__init__(config,name)

        # モデル変数コレクションを定義する
        self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
        self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)

        self.layer_dict = OrderedDict()

        self.embedding_columns = ['feature1','feature2']
        for feature_name in self.embedding_columns:
            self.generate_embedding_feature_column(feature_name,hash_bucket_size=1000,dimension=16,initializer=tf.zeros_initializer,combiner="sum",is_share_embedding=False,shared_embedding_name=None)

        """ 
        
        self.real_valued_columns = ['feature3','feature4']
        for feature_name in self.real_valued_columns:
            self.generate_real_valued_feature_column(feature_name,dtype="Float",value_dimension=1)

        self.sparse_id_columns = ['feature5','feature6']
        for feature_name in self.sparse_id_columns:
            self.generate_sparse_id_feature_column(feature_name,hash_bucket_size=1000,dimension=16,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
        """

        self.embedding_partitino_size = 4 * 1024 * 1024
        self.dnn_partition_size = 64 * 1024
        self.dnn_l2_reg = 1e-6
        self.clip_gradients = 5.0
        self.dnn_hidden_units = [1024, 512, 256]


    def build_placeholder(self):
        try:
            self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
        except KeyError:
            self.is_training = tf.placeholder(tf.bool, name="training")

    def setup_global_step(self):
        global_step = tf.Variable(
            initial_value=0,
            name="global_step",
            trainable=False,
            dtype=tf.int64,
            collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
        self.global_step = global_step


    def embedding_layer(self):
        with tf.variable_scope(name_or_scope="Embedding_Layer",
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.embedding_partitino_size),
                               reuse=tf.AUTO_REUSE) as scope:
            logging.info('ps num: {}, embedding prtition size: {} \n scope :{}'.format(self.config.ps_num,self.embedding_partitino_size,scope))
            self.layer_dict['dnn'] = layers.input_from_feature_columns(self.features,
                                                                                self.feature_columns_from_column_names(
                                                                                    self.embedding_columns),
                                                                                scope=scope)


    def dnn_layer(self):
        dnn_layer = []
        dnn_layer.append(self.layer_dict['dnn'])
        with tf.variable_scope(name_or_scope="{}_Score_Network".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)):
            self.dnn_net = tf.concat(values=dnn_layer, axis=1)
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
                    with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
                        tf.contrib.layers.apply_regularization(
                            regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
                            weights_list=[self.dnn_net])
                        self.dnn_net = layers.fully_connected(
                            self.dnn_net,
                            num_hidden_units,
                            utils.getActivationFunctionOp("llrelu"),
                            scope=dnn_hidden_layer_scope,
                            variables_collections=[self.collections_dnn_hidden_layer],
                            outputs_collections=[self.collections_dnn_hidden_output],
                            normalizer_fn=layers.batch_norm,
                            normalizer_params={"scale": True, "is_training": self.is_training})



    def logits_layer(self):
        with tf.variable_scope(name_or_scope="{}_Logits".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)) as dnn_logits_scope:
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                self.logits = layers.linear(
                    self.dnn_net,
                    1,
                    scope=dnn_logits_scope,
                    variables_collections=[self.collections_dnn_hidden_layer],
                    outputs_collections=[self.collections_dnn_hidden_output])

    def build_model(self):
        self.embedding_layer()
        self.dnn_layer()
        self.logits_layer()

    def update_op(self):
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        self.update_ops = []
        for update_op in update_ops:
            if update_op.name.startswith(self.name):
                self.update_ops.append(update_op)

    def reg_loss(self):
        reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
        self.reg_losses = []
        for reg_loss in reg_losses:
            if reg_loss.name.startswith(self.name):
                self.reg_losses.append(reg_loss)
        self.reg_loss = tf.reduce_sum(self.reg_losses)

    def training_op(self):
        if self.config.predict:
            self.train_op = None
            return
        with tf.variable_scope(name_or_scope="Optimize_Layer",
                               reuse=tf.AUTO_REUSE):
            gs = tf.train.get_or_create_global_step()
            logging.info("Global_step:{},{}".format(self.name, str(gs)))
            logging.info("Model_name:{},train_op_final_loss:{}".format(self.name, str(self.loss)))

            self.train_op, _, _ = myopt.optimize_loss(
                loss=self.loss,
                global_step=self.global_step,
                learning_rate=0.01,
                optimizer=tf.train.AdamAsyncOptimizer(learning_rate=0.01, beta1=0.9,
                                                      beta2=0.999, epsilon=1e-8,
                                                      use_locking=False),
                update_ops=self.update_ops,
                clip_gradients=self.clip_gradients,
                variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
                increment_global_step=True,
                summaries=myopt.OPTIMIZER_SUMMARIES)

    def loss_op(self):
        with tf.name_scope("{}_Loss_Op".format(self.name)):
            label = self.label
            self.loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    logits=self.logits,
                    labels=label))
            self.loss = self.loss + self.reg_loss



    def metrics_op(self):
        super(CustomModel, self).metrics_op()

    def summary_op(self):
        with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
            for key, metric in self.metrics.items():
                tf.summary.scalar(name=key, tensor=metric)

        with tf.name_scope("{}_Layer_Summary".format(self.name)):
            base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
            base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
            base_ops.add_weight_summary(self.collections_dnn_hidden_layer)

機能

self.featuresを使用してサンプル入力を入手できます。 keyは、指定した機能名を示します。

重要

機能に埋め込みを実行するには、contrib.layers.input_from_feature_columns関数を使用する必要があります。他の埋め込み関数を使用しないでください。そうしないと、予期しないオンラインの問題が発生します。サポートされている機能列のタイプは、、、、およびのみです。埋め込み列は2回使用できません。この場合は、代わりに共有埋め込み列を使用してください。ハッシュバケット付きスパースカラム埋め込み列実数値列、および共有埋め込み列. 埋め込み列は2回使用できません。この場合は、共有埋め込み列を使用してください。

API操作

オンラインモデルとの非互換性の問題を防ぐため、OpenSearchは機能列に関連するカプセル化されたAPI操作を提供します。以下のカプセル化されたAPI操作を直接使用することをお勧めします。

# 埋め込み列を生成します。
self.generate_embedding_feature_column(
    feature_name,
    hash_bucket_size,
    dimension,
    initializer=tf.zeros_initializer,
    combiner="sum",
    is_share_embedding=False,
    shared_embedding_name=None
)

# 実数値列を生成します。
self.generate_real_valued_feature_column(
    feature_name,
    dtype="Float", # FloatとIntのみがサポートされています。
    value_dimension=1
)

# スパース列を生成します。
self.generate_sparse_id_feature_column(
    feature_name,
    hash_bucket_size,
    combiner="sum"
)

# 設定されている機能列をクエリします。
self.feature_columns_from_column_names(
    feature_list
)

モデルの仕様

オンラインサービスとの互換性を確保するために、モデルが以下の仕様を満たしていることを確認してください。

CustomModel:CustomModelクラスを初期化するときは、次のコードを使用して親クラスを呼び出します。super(CustomModel, self).__init__(config,name)。

logits:logitsをself.logitsに渡します。OpenSearchはシグモイド関数を使用してlogitsを最終スコアに変換します。他の関数を使用してスコアを計算する場合は、predictions_opメソッドを書き直す必要があります。

loss:lossをself.lossに渡します。

reg_loss:reg_lossをself.reg_lossに渡します。

metrics_op:metrics_opメソッドを呼び出すときは、次のコードを使用して親クラスを呼び出します。super(CustomModel, self).metrics_op()。OpenSearchはいくつかの一般的なシステムメトリックを監視します。

以下のメソッドは実装しないことをお勧めします。

build_placeholde、rmark_output、およびtrace_sample_op。フレームワークのデフォルトロジックを使用してください。

注意事項

変数と重み

tf.Variableを使用して他の変数を作成するか、contribモジュールに加えてオンライン関数を使用する必要がある場合。変数をMODEL_VARIABLESコレクションに追加します。重みはそのコレクションに基づいてロードされます。オンラインでロードされる重みを持つ変数のみを追加する必要があります。global_step変数をそのコレクションに追加する必要はありません。

例:

from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs

self._weights = vs.get_variable(
          _WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
          dtype=dtype,
          initializer=kernel_initializer,
          collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES])