如果您在MaxComputeMaxCompute SQL中要對資料進行加解密,可採用UDF結合KMS執行個體的方式來實現。本文介紹如何在MaxCompute UDF中使用KMS執行個體對資料進行加解密。
方案概述
在加密流程中,在UDF初始化階段完成KMS執行個體Client初始化並產生資料密鑰,在資料處理階段使用資料密鑰進行信封加密。對於大數量的資料加密建議通過憑據擷取資料密鑰的方式,減少資料密鑰產生的數量。
在解密流程中,在UDF初始化階段完成KMS執行個體Client初始化和線程池、Cache初始化,在資料處理階段對資料密鑰密文進行解密,使用解密後得到的資料密鑰明文對資料欄位進行解密。
加密資料欄位儲存類型為字元型,儲存格式:[資料密鑰長度|固定長度4][資料密鑰密文][原始欄位加密後密文]
由於MaxCompute UDF並發度很高,因此不建議您在UDF中通過金鑰組目標資料直接加密,建議採用信封加密即通過KMS執行個體產生資料密鑰,使用資料金鑰組目標資料加密。更多信封加密資訊,請參見使用KMS密鑰進行信封加密。
前置條件
提前購置KMS軟體密鑰執行個體或者硬體密鑰執行個體。具體操作,請參見購買和啟用KMS執行個體。
建立使用者主要金鑰。由於信封加密僅支援GCM加密模式,所以在建立主要金鑰時密鑰類型只能選擇對稱金鑰。關於密鑰規格以及加密模式的詳細資料,請參見密鑰管理類型和密鑰規格。
建立存取點並儲存ClientKey,获取实例CA证书。具體操作,請參見建立應用存取點、擷取KMS執行個體CA認證。
代碼範例
如果您對效能有要求,建議使用Java範例的SynchronousKeyEncryptUDF。
java範例
1、依賴安裝
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>0.48.6-public</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibabacloud-kms-java-sdk</artifactId>
<version>1.2.5</version>
<exclusions>
<exclusion>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>2、定義工具類UDFUtils.java
package com.aliyun.kms.sample;
import com.aliyun.tea.utils.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class UDFUtils {
private UDFUtils() {
// do noting
}
public static boolean isNull(String value) {
return StringUtils.isEmpty(value) || "\\N".equals(value);
}
public static String getSynchronousKeyVersion(String udfIdentify, String keyIdentify) {
UUID uuid = null;
byte[] bytes = null;
if (StringUtils.isEmpty(keyIdentify)) {
bytes = udfIdentify.getBytes(StandardCharsets.UTF_8);
uuid = UUID.nameUUIDFromBytes(bytes);
return uuid.toString();
}
bytes = (udfIdentify + "/" + keyIdentify).getBytes(StandardCharsets.UTF_8);
uuid = UUID.nameUUIDFromBytes(bytes);
return uuid.toString();
}
public static <K, V> Map<K, V> getLRUMap(int capacity) {
return new LRUMap<K, V>(capacity);
}
private static class LRUMap<K, V> extends LinkedHashMap<K, V> {
private final int capacity;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public LRUMap(int capacity) {
super(capacity, 0.75f, true);
this.capacity = capacity;
}
@Override
public V get(Object key) {
readLock.lock();
try {
return super.get(key);
} finally {
readLock.unlock();
}
}
@Override
public V put(K key, V value) {
writeLock.lock();
try {
return super.put(key, value);
} finally {
writeLock.unlock();
}
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > capacity;
}
}
}
方法說明
方法名稱 | 功能說明 |
isNull | 判斷Null 字元串。 |
getSynchronousKeyVersion | 擷取版本號碼(UDF標識+函數入參Version)。 |
3、定義常量類UDFConstant.java
常量類定義設定檔參數名稱,參數值請參見組態變數。
package com.aliyun.kms.sample;
public class UDFConstant {
private UDFConstant() {
// do nothing
}
public static final String KMS_UDF_CONF_NAME = "kms_udf_conf.properties";
public static final String UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId";
public static final String KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file";
public static final String KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password";
public static final String KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file";
public static final String KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint";
public static final String KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name";
public static final String KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name";
public static final String KMS_ENDPOINT_KEY = "udf.kms.endpoint";
public static final String ENCRYPT_DATA_KEY_FORMAT = "%04d";
public static final int GCM_IV_LENGTH = 12;
public static final int GCM_IV_BASE64_LENGTH = 16;
public static final int GCM_TAG_LENGTH = 16;
}
參數說明
參數名稱 | 參數說明 |
KMS_UDF_CONF_NAME | 設定檔名稱 |
UDF_ENCRYPT_KEY_ID_KEY | KMS密鑰ID |
KMS_CLIENT_KEY_FILE_KEY | ClientKey身份憑證內容 |
KMS_CLIENT_KEY_PASSWORD_KEY | ClientKey身份憑證口令 |
KMS_INSTANCE_CA_FILE_KEY | 執行個體CA認證 |
KMS_INSTANCE_ENDPOINT_KEY | KMS執行個體的存取點endpoint |
KMS_SYNCHRONOUS_SECRET_NAME_KEY | 同步憑據名稱 |
KMS_RAM_SECRET_NAME_KEY | RAM憑據名稱 |
KMS_ENDPOINT_KEY | 共用網關存取點endpoint |
4、加密UDF
對於資料量大的表建議使用
com.aliyun.kms.sample.SynchronousKeyEncryptUDF類作為高效能加密UDF。對於資料量不大的表,使用者UDF並行度低可以使用
com.aliyun.kms.sample.EncryptUDF類作為加密UDF。
SynchronousKeyEncryptUDF樣本
SynchronousKeyEncryptUDF針對資料量大、並行度高的情況,藉助KMS憑據管理將Version和資料密鑰作為憑據存入KMS,避免同一個Version下多密鑰的情況,減少密鑰的產生數量,從而提高效能。資料密鑰產生流程圖如下:
建立憑據xxxxSynchronousSecret
在KMS控制台或調用KMS API建立通用憑據,憑據名稱為xxxxSynchronousSecret,憑據值為任意值。UDF用其作為密鑰的儲存容器,對應UDF配置項udf.synchronous.secret.name。更多關於建立憑據的資訊,請參見管理及使用通用憑據。
建立RAM憑據xxxxRamSecret
在KMS控制台或調用KMS API建立RAM憑據,憑據值為調用共用網關KMS服務帳號的AK/SK,UDF用其作為調用OpenAPI的身份憑證。對應UDF配置項udf.ram.secret.name。更多關於建立RAM憑據、建立KMS服務帳號的AK/SK的資訊,請參見管理及使用RAM憑據、建立AccessKey。憑據值樣本如下:
{ "AccessKeyId":"LTAI****************", "AccessKeySecret":"yourAccessKeySecret" }定義加密實作類別SynchronousKeyEncryptUDF.java
package com.aliyun.kms.sample; import com.aliyun.kms.kms20160120.TransferClient; import com.aliyun.kms.kms20160120.model.KmsConfig; import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions; import com.aliyun.kms20160120.Client; import com.aliyun.kms20160120.models.*; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import com.aliyun.tea.TeaException; import com.aliyun.tea.utils.StringUtils; import com.google.gson.Gson; import org.apache.commons.codec.binary.Base64; import javax.crypto.Cipher; import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; public class SynchronousKeyEncryptUDF extends UDF { private static Gson gson = new Gson(); private static Client instanceClient; private static com.aliyun.kms.kms20160120.Client shareClient; private static Map<String, DataKeyObject> dataKeyObjectMap; private static Base64 base64 = new Base64(); private String runTaskIdentify; private String keySecretName; private String keyId; private Properties properties = new Properties(); @Override public void setup(ExecutionContext ctx) throws UDFException, IOException { super.setup(ctx); try { InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME); properties.load(in); // 資料密鑰產生維度,如果按照專案使用getRunningProject方法,如果按照表使用getTableInfo方法, runTaskIdentify = ctx.getRunningProject(); dataKeyObjectMap = new ConcurrentHashMap<>(); keySecretName = properties.getProperty(UDFConstant.KMS_SYNCHRONOUS_SECRET_NAME_KEY); keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY); if (StringUtils.isEmpty(keySecretName)) { throw new UDFException("the secret name is null"); } buildInstanceClient(ctx); checkSecretExist(); buildShareClient(); } catch (Exception e) { throw new UDFException(e); } } public String evaluate(String keyIdentify, String data) { if (UDFUtils.isNull(data)) { return data; } byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); byte[] iv = null; byte[] cipherTextBytes = null; try { String keyVersion = UDFUtils.getSynchronousKeyVersion(runTaskIdentify, keyIdentify); if (!dataKeyObjectMap.containsKey(keyVersion)) { dataKeyObjectMap.put(keyVersion, getDataKeyObject(keyVersion)); } DataKeyObject dataKeyObject = dataKeyObjectMap.get(keyVersion); iv = new byte[UDFConstant.GCM_IV_LENGTH]; SecureRandom random = new SecureRandom(); random.nextBytes(iv); Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); SecretKeySpec keySpec = new SecretKeySpec(dataKeyObject.plaintextBytes, "AES"); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv); cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec); cipherTextBytes = cipher.doFinal(dataBytes); return dataKeyObject.encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } private DataKeyObject getDataKeyObject(String keyVersion) throws Exception { try { return getDataKeyObjectByGetSecretValue(keyVersion); } catch (TeaException e) { if ("Forbidden.ResourceNotFound".equals(e.code)) { return buildDataKeyObject(keyVersion); } } return null; } private DataKeyObject getDataKeyObjectByGetSecretValue(String keyVersion) throws Exception { String secretData = getSecretValue(keyVersion); DataKeyPersistentObject dataKeyPersistentObject = gson.fromJson(secretData, DataKeyPersistentObject.class); byte[] plaintextBytes = base64.decode(dataKeyPersistentObject.plaintext); return new DataKeyObject(plaintextBytes, dataKeyPersistentObject.encryptedDataKeyPart); } private DataKeyObject buildDataKeyObject(String keyVersion) throws Exception { GenerateDataKeyRequest request = new GenerateDataKeyRequest(); request.setKeyId(keyId); KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GenerateDataKeyResponse response = instanceClient.generateDataKeyWithOptions(request, runtimeOptions); String plaintext = response.getBody().plaintext; byte[] plaintextBytes = base64.decode(plaintext); String encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob; DataKeyObject dataKeyObject = new DataKeyObject(plaintextBytes, encryptedDataKeyPart); DataKeyPersistentObject dataKeyPersistentObject = new DataKeyPersistentObject(plaintext, encryptedDataKeyPart); PutSecretValueRequest putSecretValueRequest = new PutSecretValueRequest(); putSecretValueRequest.secretData = dataKeyPersistentObject.toJsonString(); putSecretValueRequest.secretName = keySecretName; putSecretValueRequest.versionId = keyVersion; try { shareClient.putSecretValue(putSecretValueRequest); } catch (TeaException e) { if ("Rejected.ResourceExist".equals(e.code)) { return getDataKeyObjectByGetSecretValue(keyVersion); } } return dataKeyObject; } @Override public void close() throws UDFException, IOException { super.close(); } private void buildInstanceClient(ExecutionContext ctx) throws Exception { try { KmsConfig config = new KmsConfig(); String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY); byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName); config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8)); String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY); byte[] caFileContent = ctx.readResourceFile(caFileName); config.setCa(new String(caFileContent, StandardCharsets.UTF_8)); config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY)); config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY)); instanceClient = new TransferClient(config); } catch (Exception e) { throw new UDFException(e); } } private void checkSecretExist() throws Exception { getSecretValue(null); } private String getSecretValue(String versionId) throws Exception { GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest(); getSecretValueRequest.setSecretName(keySecretName); if (!StringUtils.isEmpty(versionId)) { getSecretValueRequest.setVersionId(versionId); } KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions); return getSecretValueResponse.body.secretData; } private void buildShareClient() throws Exception { String ramSecretName = properties.getProperty(UDFConstant.KMS_RAM_SECRET_NAME_KEY); GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest(); getSecretValueRequest.setSecretName(ramSecretName); KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions); AccessKeyObject accessKeyObject = gson.fromJson(getSecretValueResponse.body.secretData, AccessKeyObject.class); com.aliyun.teaopenapi.models.Config shareConfig = new com.aliyun.teaopenapi.models.Config() .setAccessKeyId(accessKeyObject.AccessKeyId) .setAccessKeySecret(accessKeyObject.AccessKeySecret) .setEndpoint(properties.getProperty(UDFConstant.KMS_ENDPOINT_KEY)); shareClient = new com.aliyun.kms.kms20160120.Client(shareConfig); } private static class AccessKeyObject implements Serializable { private String AccessKeyId; private String AccessKeySecret; } private final static class DataKeyObject { private byte[] plaintextBytes; private String encryptedDataKeyPart; public DataKeyObject() { } public DataKeyObject(byte[] plaintextBytes, String encryptedDataKeyPart) { this.plaintextBytes = plaintextBytes; this.encryptedDataKeyPart = encryptedDataKeyPart; } } private final static class DataKeyPersistentObject implements Serializable { private String plaintext; private String encryptedDataKeyPart; public DataKeyPersistentObject() { } public DataKeyPersistentObject(String plaintext, String encryptedDataKeyPart) { this.plaintext = plaintext; this.encryptedDataKeyPart = encryptedDataKeyPart; } public String toJsonString() { return gson.toJson(this); } } }主要方法說明
方法名稱
功能說明
setup
讀取設定檔,完成KMS執行個體SDK 和阿里雲SDK初始化以及校正初始憑據是否存在。
buildInstanceClient
初始化KMS執行個體SDK。
buildShareClient
擷取RAM憑據中儲存的AK完成阿里雲SDK初始化。
getDataKeyObjectByGetSecretValue
擷取憑據中儲存的密鑰。
buildDataKeyObject
產生新密鑰並存放在憑據中。
evaluate
加密資料,輸出加密結果。
說明使用的KMS API如下:
執行個體API:GenerateDataKey產生資料密鑰、GetSecretValue擷取憑據值。
OpenAPI:PutSecretValue存入憑據值。
EncryptUDF樣本
定義加密UDF實作類別EncryptUDF.java
package com.aliyun.kms.sample;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.GenerateDataKeyRequest;
import com.aliyun.kms20160120.models.GenerateDataKeyResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Properties;
public class EncryptUDF extends UDF {
private static Client client;
private static String plaintext;
private static byte[] plaintextBytes;
private static String encryptedDataKeyPart;
private static Base64 base64 = new Base64();
@Override
public void setup(ExecutionContext ctx) throws UDFException, IOException {
super.setup(ctx);
try {
InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
Properties properties = new Properties();
properties.load(in);
KmsConfig config = new KmsConfig();
String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
byte[] caFileContent = ctx.readResourceFile(caFileName);
config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
String keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
client = new TransferClient(config);
GenerateDataKeyRequest request = new GenerateDataKeyRequest();
request.setKeyId(keyId);
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
GenerateDataKeyResponse response = client.generateDataKeyWithOptions(request, runtimeOptions);
plaintext = response.getBody().plaintext;
plaintextBytes = base64.decode(plaintext);
encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
} catch (Exception e) {
throw new UDFException(e);
}
}
public String evaluate(String data) {
if (UDFUtils.isNull(data)) {
return data;
}
byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
byte[] iv = null;
byte[] cipherTextBytes = null;
try {
iv = new byte[UDFConstant.GCM_IV_LENGTH];
SecureRandom random = new SecureRandom();
random.nextBytes(iv);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
SecretKeySpec keySpec = new SecretKeySpec(plaintextBytes, "AES");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
cipherTextBytes = cipher.doFinal(dataBytes);
return encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public void close() throws UDFException, IOException {
super.close();
}
}
主要方法說明
方法名稱 | 功能說明 |
setup | 讀取設定檔,完成KMS執行個體SDK初始化,產生資料密鑰密文。 |
evaluate | 加密資料,輸出加密結果。 |
5、解密UDF
定義解密實作類別DecryptUDF.java
package com.aliyun.kms.sample;
import com.aliyun.dkms.gcs.openapi.models.Config;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.DecryptRequest;
import com.aliyun.kms20160120.models.DecryptResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
public class DecryptUDF extends UDF {
private static Client client;
private static Map<String, byte[]> dataKeyMap;
private static final int BASE_STEP=4;
private static Base64 base64 = new Base64();
@Override
public void setup(ExecutionContext ctx) throws UDFException, IOException {
super.setup(ctx);
try {
InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
Properties properties = new Properties();
properties.load(in);
KmsConfig config = new KmsConfig();
String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
byte[] caFileContent = ctx.readResourceFile(caFileName);
config.setCa(new String(caFileContent,StandardCharsets.UTF_8));
config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
dataKeyMap = UDFUtils.getLRUMap(1000);
client = new TransferClient(config);
} catch (Exception e) {
throw new UDFException(e);
}
}
public String evaluate(String data) {
if (UDFUtils.isNull(data)) {
return data;
}
if (data.length() <= BASE_STEP) {
return data;
}
int dataLength = Integer.valueOf(data.substring(0, 4));
if (data.length() <= BASE_STEP + dataLength) {
return data;
}
String dataKeyCiphertext = data.substring(4, 4 + dataLength);
try {
byte[] dataKeyBytes = getDataKeyPlaintext(dataKeyCiphertext);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
SecretKeySpec keySpec = new SecretKeySpec(dataKeyBytes, "AES");
byte[] iv = base64.decode(data.substring(4 + dataLength, 4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
byte[] cipherText = base64.decode(data.substring(4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
cipher.init(Cipher.DECRYPT_MODE, keySpec, gcmParameterSpec);
byte[] decryptedData = cipher.doFinal(cipherText);
return new String(decryptedData, "UTF-8");
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private byte[] getDataKeyPlaintext(String dataKeyCiphertext) throws Exception {
if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
synchronized (dataKeyCiphertext.intern()) {
DecryptRequest request = new DecryptRequest();
request.ciphertextBlob = dataKeyCiphertext;
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
DecryptResponse response = client.decryptWithOptions(request, runtimeOptions);
if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
dataKeyMap.put(dataKeyCiphertext, base64.decode(response.getBody().plaintext));
}
}
}
return dataKeyMap.get(dataKeyCiphertext);
}
@Override
public void close() throws UDFException, IOException {
super.close();
}
}
主要方法說明
方法名稱 | 功能說明 |
setup | 讀取設定檔,完成KMS執行個體SDK初始化。 |
getDataKeyPlaintext | 擷取資料密鑰明文。 |
evaluate | 使用資料密鑰明文解密資料,輸出解密結果。 |
6、建立UDF SQL
建立UDF SQL前要將UDF實作類別打成的jar包資源添加至MaxCompute專案中。
本文建立加密函數encrypt_udf、高效能加密函數sync_key_encrypt_udf、解密函數decrypt_udf三個函數。您可根據需要刪減函數,刪減函數只需刪除對應的JAR和CREATE語句。
建立語句如下所示:
# 將UDF實作類別打成的jar包資源添加至MaxCompute專案中
ADD JAR encrypt_udf.jar;
ADD JAR sync_key_encrypt_udf.jar;
ADD JAR decrypt_udf.jar;
# 將依賴的設定檔資源添加至MaxCompute專案中
ADD FILE kms_udf_conf.properties;
# 將依賴的kms執行個體clientkey檔案資源添加至MaxCompute專案中
ADD FILE clientkey_xxxx.json;
# 將依賴的kms執行個體ca認證檔案資源添加至MaxCompute專案中
ADD FILE PrivateKmsCA_xxxx.pem;
# 建立UDF加密函數encrypt_udf
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# 建立UDF高效能加密函數sync_key_encrypt_udf
CREATE FUNCTION sync_key_encrypt_udf AS 'com.aliyun.kms.sample.SynchronousKeyEncryptUDF'
USING 'sync_key_encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# 建立UDF解密函數decrypt_udf
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
參數說明
實作類別 | jar包名稱 | UDF函數名稱 | 功能說明 |
EncryptUDF.java | encrypt_udf.jar | encrypt_udf | 加密函數 |
SynchronousKeyEncryptUDF.java | sync_key_encrypt_udf.jar | sync_key_encrypt_udf | 高效能加密函數 |
DecryptUDF.java | decrypt_udf.jar | decrypt_udf | 解密函數 |
7、組態變數
UDF實作類別所需變數統一存放在設定檔kms_udf_conf.properties,高效能函數增加了憑據相關配置。
高效能設定檔
#設定變數
#設定密鑰Id
udf.kms.keyId=key-xxxxxxxxxxxx
#設定client key檔案名稱
udf.kms.clientkey.file=clientkey_xxxx.json
#設定client key密碼
udf.kms.clientkey.password=xxxxxxxx
#設定執行個體的ca認證
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
#設定執行個體的endpoint
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
#設定同步憑據名稱
udf.synchronous.secret.name=xxxxSynchronousSecretName
#設定ram憑據名稱
udf.ram.secret.name=xxxxRamSecret
#設定共用網關endpoint
udf.kms.endpoint=kms-vpc.cn-xxx.aliyuncs.com增加了憑據相關配置:同步憑據名稱、RAM憑據名稱、共用網關endpoint。
普通設定檔
#設定變數
#設定密鑰Id
udf.kms.keyId=key-xxxxxxxxxxxx
#設定client key檔案名稱
udf.kms.clientkey.file=clientkey_xxxx.json
#設定client key密碼
udf.kms.clientkey.password=xxxxxxxx
#設定執行個體的ca認證
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
#設定執行個體的endpoint
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com8、函數使用
--使用UDF函數加密資料
SELECT encrypt_udf(column_name) FROM my_table;
--使用UDF函數加密資料,version為憑據版本,預設填null
SELECT sync_key_encrypt_udf(version,column_name) FROM my_table;
--使用UDF函數解密資料
SELECT decrypt_udf(column_name) FROM my_table;Python範例
1、依賴安裝requirements.txt
pyodps==0.11.6.2
alibabacloud-kms-python-sdk==1.1.32、定義工具類kms_udf_utils.py
# coding=utf-8
import uuid
from collections import OrderedDict
def is_null(value):
return len(value) == 0 or value == '\\N'
def generate_uuid(key_version):
return uuid.uuid5(uuid.NAMESPACE_DNS, key_version)
def get_config_value(config_dict, dict_key):
if dict_key not in config_dict:
raise ValueError("can not find key[{}]" % dict_key)
return config_dict[dict_key]
def properties_to_dict(cache_file):
config_dict = {}
for line in cache_file:
line = line.strip()
if not line:
continue
k, v = line.split("=")
config_dict[k.strip()] = v.strip()
return config_dict
def object_to_dict(obj):
return obj.__dict__
class LRUCache(object):
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return -1
value = self.cache.pop(key) # Remove the item to update the order
self.cache[key] = value # Re-insert it to the end
return value
def put(self, key, value):
if key in self.cache:
self.cache.pop(key) # Remove the existing entry to update the order
elif len(self.cache) == self.capacity:
self.cache.popitem(last=False) # Remove the least recently used item
self.cache[key] = value
def contains_key(self, key):
return key in self.cache
3、定義常量類kms_udf_constant.py
# coding=utf-8
UDF_CONFIG_CACHE_FILE = "kms_udf_conf.properties"
KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file"
KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password"
KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint"
KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file"
KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name"
KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name"
KMS_REGION_KEY = "udf.kms.regionId"
UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId"
ENCRYPT_DATA_KEY_FORMAT = "%04d"
GCM_IV_LENGTH = 12
GCM_TAG_LENGTH = 16
DEFAULT_NUMBER_OF_BYTES = 32參數說明
參數名稱 | 參數說明 |
UDF_CONFIG_CACHE_FILE | 設定檔名稱 |
KMS_CLIENT_KEY_FILE_KEY | SDK身份憑證內容 |
KMS_CLIENT_KEY_PASSWORD_KEY | SDK身份憑證口令 |
KMS_INSTANCE_ENDPOINT_KEY | KMS執行個體的存取點endpoint |
KMS_INSTANCE_CA_FILE_KEY | 執行個體CA認證 |
KMS_SYNCHRONOUS_SECRET_NAME_KEY | 同步憑據名稱 |
KMS_RAM_SECRET_NAME_KEY | RAM憑據名稱 |
KMS_ENDPOINT_KEY | 共用網關存取點endpoint |
UDF_ENCRYPT_KEY_ID_KEY | KMS密鑰ID |
4、加密UDF
定義加密實作類別encrypt_udf.py
# coding=utf-8
import base64
from alibabacloud_kms20160120.models import GenerateDataKeyRequest
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import random
import string
import kms_udf_constant
import kms_udf_utils
class EncryptUDF(object):
def __init__(self):
cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
cache_file.close()
kms_config = KmsConfig(
protocol='https',
client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
)
self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
self.instance_client = TransferClient(kms_config=kms_config)
resp = self.generate_data_key()
self.encrypted_data_key_part = (kms_udf_constant.ENCRYPT_DATA_KEY_FORMAT %
len(resp.body.ciphertext_blob) + resp.body.ciphertext_blob)
self.plaintext = resp.body.plaintext
self.ciphertext_blob = resp.body.ciphertext_blob
def evaluate(self, data):
if kms_udf_utils.is_null(data):
return data
iv = bytes(''.join(random.sample(string.ascii_letters + string.digits, kms_udf_constant.GCM_IV_LENGTH)),
encoding="utf-8")
data_bytes = data.encode("utf-8")
encryptor = Cipher(
algorithms.AES(base64.b64decode(self.plaintext)),
modes.GCM(iv),
).encryptor()
ciphertext = encryptor.update(data_bytes) + encryptor.finalize()
return self.encrypted_data_key_part + str(base64.b64encode(iv), "utf-8") + str(
base64.b64encode(ciphertext + encryptor.tag), "utf-8")
def generate_data_key(self):
request = GenerateDataKeyRequest()
request.key_id = self.keyId
request.number_of_bytes = kms_udf_constant.DEFAULT_NUMBER_OF_BYTES
runtime = KmsRuntimeOptions(
ca=self.ca_file_path
)
return self.instance_client.generate_data_key_with_options(request, runtime)
5、解密UDF
定義解密實作類別decrypt_udf.py
# coding=utf-8
import base64
from alibabacloud_kms20160120 import models as kms_20160120_models
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import kms_udf_constant
import kms_udf_utils
from kms_udf_utils import LRUCache
class DecryptUDF(object):
def __init__(self):
cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
cache_file.close()
kms_config = KmsConfig(
protocol='https',
client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
)
self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
self.instance_client = TransferClient(kms_config=kms_config)
self.data_key_cache = LRUCache(1000)
def evaluate(self, data):
if kms_udf_utils.is_null(data):
return data
if len(data) < 4:
return data
data_key_len = int(data[0:4])
if len(data) < 4 + data_key_len:
return data
data_key_ciphertext = data[4:4 + data_key_len]
if not self.data_key_cache.contains_key(data_key_ciphertext):
decrypt_request = kms_20160120_models.DecryptRequest(
ciphertext_blob=data_key_ciphertext
)
decrypt_runtime = KmsRuntimeOptions(
ca=self.ca_file_path
)
decrypt_response = self.instance_client.decrypt_with_options(decrypt_request, decrypt_runtime)
self.data_key_cache.put(data_key_ciphertext, base64.b64decode(decrypt_response.body.plaintext))
data_key = self.data_key_cache.get(data_key_ciphertext)
envelope_cipher_bytes = base64.b64decode(data[4 + data_key_len:])
iv = envelope_cipher_bytes[0:kms_udf_constant.GCM_IV_LENGTH]
cipher_bytes = envelope_cipher_bytes[
kms_udf_constant.GCM_IV_LENGTH:len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH]
tag = envelope_cipher_bytes[len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH:]
decryptor = Cipher(algorithms.AES(data_key), modes.GCM(iv, tag)).decryptor()
return str(decryptor.update(cipher_bytes) + decryptor.finalize())
6、建立UDF SQL
本文建立加密函數encrypt_udf、解密函數decrypt_udf兩個函數。您可根據需要刪減函數,刪減函數只需刪除對應的JAR和CREATE語句。
# 將以上程式及requirements.txt相關依賴加入資源
ADD PY decrypt_udf.py;
ADD PY encrypt_udf.py;
# 將依賴的設定檔資源添加至MaxCompute專案中
ADD FILE kms_udf_conf.properties;
# 將依賴的kms執行個體clientkey檔案資源添加至MaxCompute專案中
ADD FILE clientkey_xxxx.json;
# 將依賴的kms執行個體ca認證檔案資源添加至MaxCompute專案中
ADD FILE PrivateKmsCA_xxxx.pem;
# 建立加密UDF函數
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# 建立解密UDF函數
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';參數說明
依賴名稱 | UDF函數名稱 | 功能說明 |
encrypt_udf.py | encrypt_udf | 加密函數 |
decrypt_udf.py | decrypt_udf | 解密函數 |
7、組態變數
UDF實作類別所需變數統一存放在設定檔kms_udf_conf.properties。
#設定變數
#設定密鑰Id
udf.kms.keyId=key-xxxxxxxxxxxx
#設定client key檔案名稱
udf.kms.clientkey.file=clientkey_xxxx.json
#設定client key密碼
udf.kms.clientkey.password=xxxxxxxx
#設定執行個體的ca認證
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
#設定執行個體的endpoint
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com8、函數使用
-- 使用UDF函數加密資料
SELECT encrypt_udf(column_name) FROM my_table;
-- 使用UDF函數解密資料
SELECT decrypt_udf(column_name) FROM my_table;