使用EmbeddingVariable進行超大規模訓練,不僅可以保證模型特徵無損,而且可以節約記憶體資源。
公用雲GPU伺服器即將過保下線,您可以繼續提交CPU版本的TensorFlow任務。如需使用GPU進行模型訓練,請前往DLC提交任務,具體操作請參見建立訓練任務。
背景資訊
Embedding已成為深度學習領域處理Word和ID類特徵的有效途徑。作為一種“函數映射”,Embedding通常將高維稀疏特徵映射為低維稠密向量,再進行模型端到端訓練。在TensorFlow中,使用Variable定義模型或節點狀態,其實現依賴於資料結構Tensor。Tensor是TensorFlow的抽象資料類型,包括標量、向量、矩陣及更高維的資料結構。Tensor作為資料載體在各運算元間流通,任何支援Tensor作為輸入和輸出的運算元,都可以加入Graph的計算過程中。因為Tensor採用連續儲存,所以定義Variable時,必須指定類型(Type)和空間大小(Shape),且該空間大小不支援修改。
TensorFlow通過Variable的方式實現Embedding機制,用於儲存Embedding的Variable空間大小為[vocabulary_size, embedding_dimension]。在大規模稀疏特徵情境下,存在以下弊端:
vocabulary_size由ID空間決定,隨著線上學習情境的ID不斷增加,會導致vocabulary_size難以估計。
ID通常為字串類型且規模龐大,進行Embedding之前,需要將其Hash到vocabulary_size範圍內:
如果vocabulary_size過小,則導致Hash衝突率增加,不同特徵可能尋找到相同的Embedding,即特徵減少。
如果vocabulary_size過大,則導致Variable儲存永遠不會被尋找的Embedding,即記憶體冗餘。
Embedding變數過大是導致模型增大的主要原因,即使通過正則手段降低某些特徵的Embedding對整個模型效果的影響,也無法從模型中去除該Embedding。
為解決上述問題,PAI-TF推出動態Embedding語義的EmbeddingVariable,在特徵無損訓練的條件下,以經濟的方式使用記憶體資源,從而實現超大規模特徵的離線訓練和模型上線。PAI-TF提供EmbeddingVariable(3.1)及Feature_Column(3.3)API,推薦使用Feature_Column API,它可以自動增加字串的Feature ID化流程。
EmbeddingVariable特性
動態Embedding。
無需指定Vocabulary規模,只需指定Embedding維度,PAI-TF就可以根據訓練,動態地調整詞典大小。適用於線上學習情境,同時省略了TensorFlow模型的資料預先處理流程。
Group Lasso正則。
通常經過深度學習的Embedding變數規模超大,如果將其部署為線上服務,則會造成伺服器壓力。使用Group Lasso正則處理Embedding,可以減少模型部署的壓力。
支援將原始特徵值傳入Embedding Lookup,省略了Hash等ID化操作,從而實現特徵的無損訓練。
支援Graph的Inference、Back Propagation及變數的匯入匯出,模型訓練中通過Optimizer自動更新EmbeddingVariable。
tf.get_embedding_variable介面說明
tf.get_embedding_variable介面返回一個已有的或建立的EmbeddingVariable變數,介面定義如下。
get_embedding_variable(
name,
embedding_dim,
key_dtype=dtypes.int64,
value_dtype=None,
initializer=None,
trainable=True,
collections=None,
partitioner=None,
custom_getter=None,
steps_to_live=None,
filter_options=variables.CounterFilterOptions()
)name:Embedding Variable變數名稱。
embedding_dim:Embedding的維度。例如8或64。
key_dtype:Lookup時key的類型,預設值為int64。
value_dtype: EmbeddingVariable變數類型,僅支援float類型。
initializer:EmbeddingVariable變數初始值。
trainable:是否被添加到GraphKeys.TRAINABLE_VARIABLES的Collection。
collections:記錄了collection的keys的列表,預設為[GraphKeys.GLOBAL_VARIABLES]。該Variable會被加入到列表中的collection。
partitioner:分區函數。
custom_getter:可調用對象,將true getter作為它的第一個參數傳入,並允許覆蓋內部的
get_variable方法。應符合def custom_getter(getter,*args,**kwargs)的形式,也可以通過def custom_getter(getter,name,*args,**kwargs)的形式直接存取get_variable中的所有參數。steps_to_live:全域步長,用於自動淘汰到期特徵。系統會刪除全域步數超過該參數值的未更新特徵。
filter_options:准入策略,支援根據特徵頻次進行准入。
EmbeddingVariable
EmbeddingVariable的結構如下。
class EmbeddingVariable(ResourceVariable)
def total_count():
# 返回Embedding當前的total_count,[rowCount,EmbeddingDim]。
def read_value():
raise NotImplementedError("...")
def assign():
raise NotImplementedError("...")
def assign_add():
raise NotImplementedError("...")
def assign_sub():
raise NotImplementedError("...")支援讀取稀疏資料的
sparse_read()方法。如果查詢的key不存在,則返回EmbeddingVariable初始化時,該key對應的initializer。支援查詢EmbeddingVariable詞表總數的
total_count()方法,該方法返回EmbeddingVariable的動態Shape值。不支援全量讀取EmbeddingVariable的
read_value()方法。不支援EmbeddingVariable的賦值方法,包括
assign()、assign_add()及assign_sub()。CounterFilterOptions的結構如下。
@tf_export("CounterFilterOptions") class CounterFilterOptions(object): def __init__(self, filter_freq=0): pass指定準入頻次,預設值為0。
使用feature_column介面構建EmbeddingVariable
def tf.contrib.layers.sparse_column_with_embedding(column_name=column_name,
dtype=tf.string,
partition_num=None,
steps_to_live=None,
# 1120版本不支援以下兩個參數,僅140lite版本支援。
steps_to_live_l2reg=None,
l2reg_theta=None)
# column_name: column name
# dtype: type, default is tf.string樣本
使用底層tf.get_embedding_variable介面構建包含EmbeddingVariable的TensorFlow Graph。
#!/usr/bin/python import tensorflow as tf var = tf.get_embedding_variable("var_0", embedding_dim=3, initializer=tf.ones_initializer(tf.float32), partitioner=tf.fixed_size_partitioner(num_shards=4)) shape = [var1.total_count() for var1 in var] emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64)) fun = tf.multiply(emb, 2.0, name='multiply') loss = tf.reduce_sum(fun, name='reduce_sum') opt = tf.train.FtrlOptimizer(0.1, l1_regularization_strength=2.0, l2_regularization_strength=0.00001) g_v = opt.compute_gradients(loss) train_op = opt.apply_gradients(g_v) init = tf.global_variables_initializer() sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: sess.run([init]) print(sess.run([emb, train_op, loss])) print(sess.run([emb, train_op, loss])) print(sess.run([emb, train_op, loss])) print(sess.run([shape]))將EmbeddingVariable儲存為Checkpoint。
#!/usr/bin/python import tensorflow as tf var = tf.get_embedding_variable("var_0", embedding_dim=3, initializer=tf.ones_initializer(tf.float32), partitioner=tf.fixed_size_partitioner(num_shards=4)) emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64)) init = tf.global_variables_initializer() saver = tf.train.Saver(sharded=True) print("GLOBAL_VARIABLES: ", tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)) print("SAVEABLE_OBJECTS: ", tf.get_collection(tf.GraphKeys.SAVEABLE_OBJECTS)) checkpointDir = "/tmp/model_dir" sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: sess.run([init]) print(sess.run([emb])) save_path = saver.save(sess, checkpointDir + "/model.ckpt", global_step=666) tf.train.write_graph(sess.graph_def, checkpointDir, 'train.pbtxt') print("save_path", save_path) print("list_variables", tf.contrib.framework.list_variables(checkpointDir))從Checkpoint中恢複EmbeddingVariable變數。
#!/usr/bin/python import tensorflow as tf var = tf.get_embedding_variable("var_0", embedding_dim=3, initializer=tf.ones_initializer(tf.float32), partitioner=tf.fixed_size_partitioner(num_shards=4)) emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64)) init = tf.global_variables_initializer() saver = tf.train.Saver(sharded=True) print("GLOBAL_VARIABLES: ", tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)) print("SAVEABLE_OBJECTS: ", tf.get_collection(tf.GraphKeys.SAVEABLE_OBJECTS)) checkpointDir = "/tmp/model_dir" sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: print("list_variables", tf.contrib.framework.list_variables(checkpointDir)) saver.restore(sess, checkpointDir + "/model.ckpt-666") print(sess.run([emb]))使用feature_column介面構建包含EmbeddingVariable的TensorFlow Graph。
import tensorflow as tf import os columns_list=[] columns_list.append(tf.contrib.layers.sparse_column_with_embedding(column_name="col_emb", dtype=tf.string)) W = tf.contrib.layers.shared_embedding_columns(sparse_id_columns=columns_list, dimension=3, initializer=tf.ones_initializer(tf.float32), shared_embedding_name="xxxxx_shared") ids={} ids["col_emb"] = tf.SparseTensor(indices=[[0,0],[1,0],[2,0],[3,0],[4,0]], values=["aaaa","bbbbb","ccc","4nn","5b"], dense_shape=[5, 5]) emb = tf.contrib.layers.input_from_feature_columns(columns_to_tensors=ids, feature_columns=W) fun = tf.multiply(emb, 2.0, name='multiply') loss = tf.reduce_sum(fun, name='reduce_sum') opt = tf.train.FtrlOptimizer(0.1, l1_regularization_strength=2.0, l2_regularization_strength=0.00001) g_v = opt.compute_gradients(loss) train_op = opt.apply_gradients(g_v) init = tf.global_variables_initializer() init_local = tf.local_variables_initializer() sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: sess.run(init) print("init global done") sess.run(init_local) print("init local done") print(sess.run([emb, train_op,loss])) print(sess.run([emb, train_op,loss])) print(sess.run([emb, train_op,loss])) print(sess.run([emb]))說明EmbeddingVariable僅支援Ftrl Optimizer、Adagrad Optimizer、Adam Optimizer及AdagradDecay Optimizer。