If you want to encrypt and decrypt user data in MaxCompute SQL, you can use user-defined functions (UDFs) together with Key Management Service (KMS) instances. This topic describes how to use KMS instances to encrypt and decrypt data in MaxCompute UDFs.
Overview
In the encryption process, a KMS instance client is initialized and a data key is generated during UDF initialization, and the data key is used for envelope encryption during data processing. If you want to encrypt a large amount of data, we recommend that you use a secret to obtain a data key. This prevents excessive data keys from being generated.
In the decryption process, a KMS instance client, a thread pool, and the cache are initialized during UDF initialization, and the data key ciphertext is decrypted during data processing. Then, data is decrypted by using the plaintext data key.
The fields that store the encrypted data are of the string type. The data format is [Data key length|Fixed length 4][Data key ciphertext][Ciphertext of the data in the field].
The concurrency of MaxCompute UDFs is high. We recommend that you implement envelope encryption to generate a data key from a KMS instance and use the data key to encrypt data rather than you use a key to directly encrypt data in UDFs. For more information about envelope encryption, see Use envelope encryption.
Prerequisites
A KMS instance of the software key management type or a KMS instance of the hardware key management type is purchased. For more information, see Purchase and enable a KMS instance.
A customer master key (CMK) is created. Envelope encryption supports only the Galois/Counter Mode (GCM) mode. Therefore, you must select Symmetric Key for the Key Type parameter when you create a CMK. For more information about key specifications and encryption modes, see Key types and specifications.
An access point is created, a client key is saved, and the certification authority (CA) certificate of a KMS instance is obtained. For more information, see Create an AAP and Obtain the instance CA certificate.
Sample code
If you require high performance, we recommend that you use SynchronousKeyEncryptUDF in Java.
Sample code in Java
1. Install dependencies
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>0.48.6-public</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibabacloud-kms-java-sdk</artifactId>
<version>1.2.5</version>
<exclusions>
<exclusion>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
2. Define the UDFUtils.java class
package com.aliyun.kms.sample;
import com.aliyun.tea.utils.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class UDFUtils {
private UDFUtils() {
// do noting
}
public static boolean isNull(String value) {
return StringUtils.isEmpty(value) || "\\N".equals(value);
}
public static String getSynchronousKeyVersion(String udfIdentify, String keyIdentify) {
UUID uuid = null;
byte[] bytes = null;
if (StringUtils.isEmpty(keyIdentify)) {
bytes = udfIdentify.getBytes(StandardCharsets.UTF_8);
uuid = UUID.nameUUIDFromBytes(bytes);
return uuid.toString();
}
bytes = (udfIdentify + "/" + keyIdentify).getBytes(StandardCharsets.UTF_8);
uuid = UUID.nameUUIDFromBytes(bytes);
return uuid.toString();
}
public static <K, V> Map<K, V> getLRUMap(int capacity) {
return new LRUMap<K, V>(capacity);
}
private static class LRUMap<K, V> extends LinkedHashMap<K, V> {
private final int capacity;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public LRUMap(int capacity) {
super(capacity, 0.75f, true);
this.capacity = capacity;
}
@Override
public V get(Object key) {
readLock.lock();
try {
return super.get(key);
} finally {
readLock.unlock();
}
}
@Override
public V put(K key, V value) {
writeLock.lock();
try {
return super.put(key, value);
} finally {
writeLock.unlock();
}
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > capacity;
}
}
}
Method description
Method | Description |
isNull | Checks whether a string is an empty string. |
getSynchronousKeyVersion | Obtains the version number by combining the UDF ID and the Version input parameter of the UDF. |
3. Define the UDFConstant.java class
The UDFConstant.java class defines the parameter names in configuration files. For more information about parameter values, see Configure variables.
package com.aliyun.kms.sample;
public class UDFConstant {
private UDFConstant() {
// do nothing
}
public static final String KMS_UDF_CONF_NAME = "kms_udf_conf.properties";
public static final String UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId";
public static final String KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file";
public static final String KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password";
public static final String KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file";
public static final String KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint";
public static final String KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name";
public static final String KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name";
public static final String KMS_ENDPOINT_KEY = "udf.kms.endpoint";
public static final String ENCRYPT_DATA_KEY_FORMAT = "%04d";
public static final int GCM_IV_LENGTH = 12;
public static final int GCM_IV_BASE64_LENGTH = 16;
public static final int GCM_TAG_LENGTH = 16;
}
Parameter description
Parameter | Description |
KMS_UDF_CONF_NAME | The name of the configuration file. |
UDF_ENCRYPT_KEY_ID_KEY | The ID of the key that is managed by KMS. |
KMS_CLIENT_KEY_FILE_KEY | The content of the client key file. |
KMS_CLIENT_KEY_PASSWORD_KEY | The password of the client key. |
KMS_INSTANCE_CA_FILE_KEY | The CA certificate of the KMS instance. |
KMS_INSTANCE_ENDPOINT_KEY | The endpoint of the KMS instance. |
KMS_SYNCHRONOUS_SECRET_NAME_KEY | The name of the secret that is synchronized. |
KMS_RAM_SECRET_NAME_KEY | The name of the Resource Access Management (RAM) secret. |
KMS_ENDPOINT_KEY | The endpoint of the shared gateway. |
4. Create an encryption UDF
We recommend that you use the
com.aliyun.kms.sample.SynchronousKeyEncryptUDF
class as a high-performance encryption UDF for tables that contain a large amount of data.If your tables contain a small amount of data and the UDF concurrency is low, you can use the
com.aliyun.kms.sample.EncryptUDF
class as an encryption UDF.
SynchronousKeyEncryptUDF
example
SynchronousKeyEncryptUDF is suitable for scenarios that involve a large amount of data and high concurrency. You can use ForSynchronousKeyEncryptUDF to leverage the secret management capabilities of KMS and store data keys and version information as secrets in KMS. This prevents multiple keys that have the same version, reduces the number of keys generated, and improves performance. The following figure shows the flowchart for generating a data key.
Create a secret named xxxxSynchronousSecret
Create a generic secret in the KMS console or by calling KMS API operations. The secret name is xxxxSynchronousSecret, and the secret value is a random value. The generic secret is used to store your data key. You can specify the udf.synchronous.secret.name parameter in the UDF to specify the secret. For more information about how to create a secret, see Manage and use generic secrets.
Create a RAM secret named xxxxRamSecret
Create a RAM secret in the KMS console or by calling KMS API operations. The value of the RAM secret is the AccessKey pair of the KMS account that is used to call the shared gateway. The RAM secret is retrieved for identity authentication when the UDF calls API operations. You can specify the udf.ram.secret.name parameter in the UDF to specify the secret. For more information about how to create a RAM secret and create an AccessKey pair for a KMS account, see Manage and use RAM secrets and Create an AccessKey pair. Example of the secret value
{ "AccessKeyId":"****", "AccessKeySecret":"****" }
Define the SynchronousKeyEncryptUDF.java class
package com.aliyun.kms.sample; import com.aliyun.kms.kms20160120.TransferClient; import com.aliyun.kms.kms20160120.model.KmsConfig; import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions; import com.aliyun.kms20160120.Client; import com.aliyun.kms20160120.models.*; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import com.aliyun.tea.TeaException; import com.aliyun.tea.utils.StringUtils; import com.google.gson.Gson; import org.apache.commons.codec.binary.Base64; import javax.crypto.Cipher; import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; public class SynchronousKeyEncryptUDF extends UDF { private static Gson gson = new Gson(); private static Client instanceClient; private static com.aliyun.kms.kms20160120.Client shareClient; private static Map<String, DataKeyObject> dataKeyObjectMap; private static Base64 base64 = new Base64(); private String runTaskIdentify; private String keySecretName; private String keyId; private Properties properties = new Properties(); @Override public void setup(ExecutionContext ctx) throws UDFException, IOException { super.setup(ctx); try { InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME); properties.load(in); // The dimension for generating a data key. If a data key is generated by project, you must use the getRunningProject method. If a data key is generated by table, you must use the getTableInfo method. runTaskIdentify = ctx.getRunningProject(); dataKeyObjectMap = new ConcurrentHashMap<>(); keySecretName = properties.getProperty(UDFConstant.KMS_SYNCHRONOUS_SECRET_NAME_KEY); keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY); if (StringUtils.isEmpty(keySecretName)) { throw new UDFException("the secret name is null"); } buildInstanceClient(ctx); checkSecretExist(); buildShareClient(); } catch (Exception e) { throw new UDFException(e); } } public String evaluate(String keyIdentify, String data) { if (UDFUtils.isNull(data)) { return data; } byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); byte[] iv = null; byte[] cipherTextBytes = null; try { String keyVersion = UDFUtils.getSynchronousKeyVersion(runTaskIdentify, keyIdentify); if (!dataKeyObjectMap.containsKey(keyVersion)) { dataKeyObjectMap.put(keyVersion, getDataKeyObject(keyVersion)); } DataKeyObject dataKeyObject = dataKeyObjectMap.get(keyVersion); iv = new byte[UDFConstant.GCM_IV_LENGTH]; SecureRandom random = new SecureRandom(); random.nextBytes(iv); Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); SecretKeySpec keySpec = new SecretKeySpec(dataKeyObject.plaintextBytes, "AES"); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv); cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec); cipherTextBytes = cipher.doFinal(dataBytes); return dataKeyObject.encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } private DataKeyObject getDataKeyObject(String keyVersion) throws Exception { try { return getDataKeyObjectByGetSecretValue(keyVersion); } catch (TeaException e) { if ("Forbidden.ResourceNotFound".equals(e.code)) { return buildDataKeyObject(keyVersion); } } return null; } private DataKeyObject getDataKeyObjectByGetSecretValue(String keyVersion) throws Exception { String secretData = getSecretValue(keyVersion); DataKeyPersistentObject dataKeyPersistentObject = gson.fromJson(secretData, DataKeyPersistentObject.class); byte[] plaintextBytes = base64.decode(dataKeyPersistentObject.plaintext); return new DataKeyObject(plaintextBytes, dataKeyPersistentObject.encryptedDataKeyPart); } private DataKeyObject buildDataKeyObject(String keyVersion) throws Exception { GenerateDataKeyRequest request = new GenerateDataKeyRequest(); request.setKeyId(keyId); KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GenerateDataKeyResponse response = instanceClient.generateDataKeyWithOptions(request, runtimeOptions); String plaintext = response.getBody().plaintext; byte[] plaintextBytes = base64.decode(plaintext); String encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob; DataKeyObject dataKeyObject = new DataKeyObject(plaintextBytes, encryptedDataKeyPart); DataKeyPersistentObject dataKeyPersistentObject = new DataKeyPersistentObject(plaintext, encryptedDataKeyPart); PutSecretValueRequest putSecretValueRequest = new PutSecretValueRequest(); putSecretValueRequest.secretData = dataKeyPersistentObject.toJsonString(); putSecretValueRequest.secretName = keySecretName; putSecretValueRequest.versionId = keyVersion; try { shareClient.putSecretValue(putSecretValueRequest); } catch (TeaException e) { if ("Rejected.ResourceExist".equals(e.code)) { return getDataKeyObjectByGetSecretValue(keyVersion); } } return dataKeyObject; } @Override public void close() throws UDFException, IOException { super.close(); } private void buildInstanceClient(ExecutionContext ctx) throws Exception { try { KmsConfig config = new KmsConfig(); String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY); byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName); config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8)); String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY); byte[] caFileContent = ctx.readResourceFile(caFileName); config.setCa(new String(caFileContent, StandardCharsets.UTF_8)); config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY)); config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY)); instanceClient = new TransferClient(config); } catch (Exception e) { throw new UDFException(e); } } private void checkSecretExist() throws Exception { getSecretValue(null); } private String getSecretValue(String versionId) throws Exception { GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest(); getSecretValueRequest.setSecretName(keySecretName); if (!StringUtils.isEmpty(versionId)) { getSecretValueRequest.setVersionId(versionId); } KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions); return getSecretValueResponse.body.secretData; } private void buildShareClient() throws Exception { String ramSecretName = properties.getProperty(UDFConstant.KMS_RAM_SECRET_NAME_KEY); GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest(); getSecretValueRequest.setSecretName(ramSecretName); KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions); AccessKeyObject accessKeyObject = gson.fromJson(getSecretValueResponse.body.secretData, AccessKeyObject.class); com.aliyun.teaopenapi.models.Config shareConfig = new com.aliyun.teaopenapi.models.Config() .setAccessKeyId(accessKeyObject.AccessKeyId) .setAccessKeySecret(accessKeyObject.AccessKeySecret) .setEndpoint(properties.getProperty(UDFConstant.KMS_ENDPOINT_KEY)); shareClient = new com.aliyun.kms.kms20160120.Client(shareConfig); } private static class AccessKeyObject implements Serializable { private String AccessKeyId; private String AccessKeySecret; } private final static class DataKeyObject { private byte[] plaintextBytes; private String encryptedDataKeyPart; public DataKeyObject() { } public DataKeyObject(byte[] plaintextBytes, String encryptedDataKeyPart) { this.plaintextBytes = plaintextBytes; this.encryptedDataKeyPart = encryptedDataKeyPart; } } private final static class DataKeyPersistentObject implements Serializable { private String plaintext; private String encryptedDataKeyPart; public DataKeyPersistentObject() { } public DataKeyPersistentObject(String plaintext, String encryptedDataKeyPart) { this.plaintext = plaintext; this.encryptedDataKeyPart = encryptedDataKeyPart; } public String toJsonString() { return gson.toJson(this); } } }
Main method description
Method
Description
setup
Reads the configuration file, initializes KMS Instance SDK and Alibaba Cloud SDK, and checks whether the initial secret exists.
buildInstanceClient
Initializes KMS Instance SDK.
buildShareClient
Obtains the AccessKey pair that is stored in the RAM secret and initializes Alibaba Cloud SDK.
getDataKeyObjectByGetSecretValue
Obtains the key that is stored in the secret.
buildDataKeyObject
Generates a new key and stores the key in the secret.
evaluate
Encrypts data and returns the encrypted data.
NoteThe following API operations of KMS are used.
KMS Instance API operations: The GenerateDataKey operation is called to generate a data key and the GetSecretValue operation to retrieve a secret value.
KMS API operations: The PutSecretValue operation is called to store a secret value.
For more information about API operations of KMS and how to call the API operations, see API references and SDK references.
EncryptUDF
example
Define the EncryptUDF.java class
package com.aliyun.kms.sample;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.GenerateDataKeyRequest;
import com.aliyun.kms20160120.models.GenerateDataKeyResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Properties;
public class EncryptUDF extends UDF {
private static Client client;
private static String plaintext;
private static byte[] plaintextBytes;
private static String encryptedDataKeyPart;
private static Base64 base64 = new Base64();
@Override
public void setup(ExecutionContext ctx) throws UDFException, IOException {
super.setup(ctx);
try {
InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
Properties properties = new Properties();
properties.load(in);
KmsConfig config = new KmsConfig();
String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
byte[] caFileContent = ctx.readResourceFile(caFileName);
config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
String keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
client = new TransferClient(config);
GenerateDataKeyRequest request = new GenerateDataKeyRequest();
request.setKeyId(keyId);
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
GenerateDataKeyResponse response = client.generateDataKeyWithOptions(request, runtimeOptions);
plaintext = response.getBody().plaintext;
plaintextBytes = base64.decode(plaintext);
encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
} catch (Exception e) {
throw new UDFException(e);
}
}
public String evaluate(String data) {
if (UDFUtils.isNull(data)) {
return data;
}
byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
byte[] iv = null;
byte[] cipherTextBytes = null;
try {
iv = new byte[UDFConstant.GCM_IV_LENGTH];
SecureRandom random = new SecureRandom();
random.nextBytes(iv);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
SecretKeySpec keySpec = new SecretKeySpec(plaintextBytes, "AES");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
cipherTextBytes = cipher.doFinal(dataBytes);
return encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public void close() throws UDFException, IOException {
super.close();
}
}
Main method description
Method | Description |
setup | Reads the configuration file, initializes KMS Instance SDK, and generates the data key ciphertext. |
evaluate | Encrypts data and returns the encrypted data. |
The following API operations of KMS are used.
KMS Instance API: The GenerateDataKey operation is called to generate a data key.
For more information about API operations of KMS and how to call the API operations, see API references and SDK references.
5. Create a decryption UDF
Define the DecryptUDF.java class
package com.aliyun.kms.sample;
import com.aliyun.dkms.gcs.openapi.models.Config;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.DecryptRequest;
import com.aliyun.kms20160120.models.DecryptResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
public class DecryptUDF extends UDF {
private static Client client;
private static Map<String, byte[]> dataKeyMap;
private static final int BASE_STEP=4;
private static Base64 base64 = new Base64();
@Override
public void setup(ExecutionContext ctx) throws UDFException, IOException {
super.setup(ctx);
try {
InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
Properties properties = new Properties();
properties.load(in);
KmsConfig config = new KmsConfig();
String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
byte[] caFileContent = ctx.readResourceFile(caFileName);
config.setCa(new String(caFileContent,StandardCharsets.UTF_8));
config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
dataKeyMap = UDFUtils.getLRUMap(1000);
client = new TransferClient(config);
} catch (Exception e) {
throw new UDFException(e);
}
}
public String evaluate(String data) {
if (UDFUtils.isNull(data)) {
return data;
}
if (data.length() <= BASE_STEP) {
return data;
}
int dataLength = Integer.valueOf(data.substring(0, 4));
if (data.length() <= BASE_STEP + dataLength) {
return data;
}
String dataKeyCiphertext = data.substring(4, 4 + dataLength);
try {
byte[] dataKeyBytes = getDataKeyPlaintext(dataKeyCiphertext);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
SecretKeySpec keySpec = new SecretKeySpec(dataKeyBytes, "AES");
byte[] iv = base64.decode(data.substring(4 + dataLength, 4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
byte[] cipherText = base64.decode(data.substring(4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
cipher.init(Cipher.DECRYPT_MODE, keySpec, gcmParameterSpec);
byte[] decryptedData = cipher.doFinal(cipherText);
return new String(decryptedData, "UTF-8");
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private byte[] getDataKeyPlaintext(String dataKeyCiphertext) throws Exception {
if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
synchronized (dataKeyCiphertext.intern()) {
DecryptRequest request = new DecryptRequest();
request.ciphertextBlob = dataKeyCiphertext;
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
DecryptResponse response = client.decryptWithOptions(request, runtimeOptions);
if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
dataKeyMap.put(dataKeyCiphertext, base64.decode(response.getBody().plaintext));
}
}
}
return dataKeyMap.get(dataKeyCiphertext);
}
@Override
public void close() throws UDFException, IOException {
super.close();
}
}
Main method description
Method | Description |
setup | Reads the configuration file and initializes KMS Instance SDK. |
getDataKeyPlaintext | Obtains the plaintext data key. |
evaluate | Uses the plaintext data key to decrypt data and returns the decrypted data. |
The following API operations of KMS are used.
KMS Instance API: The Decrypt operation is called to obtain the plaintext data key.
For more information about API operations of KMS and how to call the API operations, see API references and SDK references.
6. Create an SQL UDF
Before you create an SQL UDF, you must add the JAR package of the UDF implementation class to your MaxCompute project.
In this example, the encrypt_udf, sync_key_encrypt_udf, and decrypt_udf functions are created. You can delete UDFs based on your business requirements. To delete a UDF, you need to only delete the JAR package and CREATE statements.
The following statements are used to create a UDF:
# Add the JAR package of the UDF implementation class to the MaxCompute project.
ADD JAR encrypt_udf.jar;
ADD JAR sync_key_encrypt_udf.jar;
ADD JAR decrypt_udf.jar;
# Add the configuration file to the MaxCompute project.
ADD FILE kms_udf_conf.properties;
# Add the client key file of the KMS instance to the MaxCompute project.
ADD FILE clientkey_xxxx.json;
# Add the CA certificate file of the KMS instance to the MaxCompute project.
ADD FILE PrivateKmsCA_xxxx.pem;
# Create an encryption UDF named encrypt_udf.
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# Create a high-performance encryption UDF named sync_key_encrypt_udf.
CREATE FUNCTION sync_key_encrypt_udf AS 'com.aliyun.kms.sample.SynchronousKeyEncryptUDF'
USING 'sync_key_encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# Create a decryption UDF named decrypt_udf.
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
Parameter description
Implementation class | JAR package | UDF | Description |
EncryptUDF.java | encrypt_udf.jar | encrypt_udf | Encryption UDF |
SynchronousKeyEncryptUDF.java | sync_key_encrypt_udf.jar | sync_key_encrypt_udf | High-performance encryption UDF |
DecryptUDF.java | decrypt_udf.jar | decrypt_udf | Decryption UDF |
7. Configure variables
The variables required by the UDF implementation class are stored in the kms_udf_conf.properties configuration file. Secret-related configurations are added to the configuration file for the high-performance UDF.
Configuration file for the high-performance UDF
# Configure variables.
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
# Specify the name of the secret that is synchronized.
udf.synchronous.secret.name=xxxxSynchronousSecretName
# Specify the name of the RAM secret.
udf.ram.secret.name=xxxxRamSecret
# Specify the endpoint of the shared gateway.
udf.kms.endpoint=kms-vpc.cn-xxx.aliyuncs.com
The secret-related configurations, such as the name of the secret that is synchronized, the name of the RAM secret, and the endpoint of the shared gateway, are added.
Regular configuration file
# Configure variables.
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
8. Use UDFs
-- Use a UDF to encrypt data.
SELECT encrypt_udf(column_name) FROM my_table;
-- Use a UDF to encrypt data. The value of version is the secret version, and the default value of version is null.
SELECT sync_key_encrypt_udf(version,column_name) FROM my_table;
-- Use a UDF to decrypt data.
SELECT decrypt_udf(column_name) FROM my_table;
Sample code in Python
1. Install requirements.txt-related dependencies
pyodps==0.11.6.2
alibabacloud-kms-python-sdk==1.1.3
2. Define the kms_udf_utils.py class
# coding=utf-8
import uuid
from collections import OrderedDict
def is_null(value):
return len(value) == 0 or value == '\\N'
def generate_uuid(key_version):
return uuid.uuid5(uuid.NAMESPACE_DNS, key_version)
def get_config_value(config_dict, dict_key):
if dict_key not in config_dict:
raise ValueError("can not find key[{}]" % dict_key)
return config_dict[dict_key]
def properties_to_dict(cache_file):
config_dict = {}
for line in cache_file:
line = line.strip()
if not line:
continue
k, v = line.split("=")
config_dict[k.strip()] = v.strip()
return config_dict
def object_to_dict(obj):
return obj.__dict__
class LRUCache(object):
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return -1
value = self.cache.pop(key) # Remove the item to update the order
self.cache[key] = value # Re-insert it to the end
return value
def put(self, key, value):
if key in self.cache:
self.cache.pop(key) # Remove the existing entry to update the order
elif len(self.cache) == self.capacity:
self.cache.popitem(last=False) # Remove the least recently used item
self.cache[key] = value
def contains_key(self, key):
return key in self.cache
3. Define the kms_udf_constant.py class
# coding=utf-8
UDF_CONFIG_CACHE_FILE = "kms_udf_conf.properties"
KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file"
KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password"
KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint"
KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file"
KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name"
KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name"
KMS_REGION_KEY = "udf.kms.regionId"
UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId"
ENCRYPT_DATA_KEY_FORMAT = "%04d"
GCM_IV_LENGTH = 12
GCM_TAG_LENGTH = 16
DEFAULT_NUMBER_OF_BYTES = 32
Parameter description
Parameter | Description |
UDF_CONFIG_CACHE_FILE | The name of the configuration file. |
KMS_CLIENT_KEY_FILE_KEY | The content of the client key file. |
KMS_CLIENT_KEY_PASSWORD_KEY | The password of the client key. |
KMS_INSTANCE_ENDPOINT_KEY | The endpoint of the KMS instance. |
KMS_INSTANCE_CA_FILE_KEY | The CA certificate of the KMS instance. |
KMS_SYNCHRONOUS_SECRET_NAME_KEY | The name of the secret that is synchronized. |
KMS_RAM_SECRET_NAME_KEY | The name of the RAM secret. |
KMS_ENDPOINT_KEY | The endpoint of the shared gateway. |
UDF_ENCRYPT_KEY_ID_KEY | The ID of the key that is managed by KMS. |
4. Create an encryption UDF
Define the encrypt_udf.py class
# coding=utf-8
import base64
from alibabacloud_kms20160120.models import GenerateDataKeyRequest
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import random
import string
import kms_udf_constant
import kms_udf_utils
class EncryptUDF(object):
def __init__(self):
cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
cache_file.close()
kms_config = KmsConfig(
protocol='https',
client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
)
self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
self.instance_client = TransferClient(kms_config=kms_config)
resp = self.generate_data_key()
self.encrypted_data_key_part = (kms_udf_constant.ENCRYPT_DATA_KEY_FORMAT %
len(resp.body.ciphertext_blob) + resp.body.ciphertext_blob)
self.plaintext = resp.body.plaintext
self.ciphertext_blob = resp.body.ciphertext_blob
def evaluate(self, data):
if kms_udf_utils.is_null(data):
return data
iv = bytes(''.join(random.sample(string.ascii_letters + string.digits, kms_udf_constant.GCM_IV_LENGTH)),
encoding="utf-8")
data_bytes = data.encode("utf-8")
encryptor = Cipher(
algorithms.AES(base64.b64decode(self.plaintext)),
modes.GCM(iv),
).encryptor()
ciphertext = encryptor.update(data_bytes) + encryptor.finalize()
return self.encrypted_data_key_part + str(base64.b64encode(iv), "utf-8") + str(
base64.b64encode(ciphertext + encryptor.tag), "utf-8")
def generate_data_key(self):
request = GenerateDataKeyRequest()
request.key_id = self.keyId
request.number_of_bytes = kms_udf_constant.DEFAULT_NUMBER_OF_BYTES
runtime = KmsRuntimeOptions(
ca=self.ca_file_path
)
return self.instance_client.generate_data_key_with_options(request, runtime)
The following API operations of KMS are used.
KMS Instance API: The GenerateDataKey operation is called to generate a data key.
For more information about API operations of KMS and how to call the API operations, see API references and SDK references.
5. Create a decryption UDF
Define the decrypt_udf.py class
# coding=utf-8
import base64
from alibabacloud_kms20160120 import models as kms_20160120_models
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import kms_udf_constant
import kms_udf_utils
from kms_udf_utils import LRUCache
class DecryptUDF(object):
def __init__(self):
cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
cache_file.close()
kms_config = KmsConfig(
protocol='https',
client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
)
self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
self.instance_client = TransferClient(kms_config=kms_config)
self.data_key_cache = LRUCache(1000)
def evaluate(self, data):
if kms_udf_utils.is_null(data):
return data
if len(data) < 4:
return data
data_key_len = int(data[0:4])
if len(data) < 4 + data_key_len:
return data
data_key_ciphertext = data[4:4 + data_key_len]
if not self.data_key_cache.contains_key(data_key_ciphertext):
decrypt_request = kms_20160120_models.DecryptRequest(
ciphertext_blob=data_key_ciphertext
)
decrypt_runtime = KmsRuntimeOptions(
ca=self.ca_file_path
)
decrypt_response = self.instance_client.decrypt_with_options(decrypt_request, decrypt_runtime)
self.data_key_cache.put(data_key_ciphertext, base64.b64decode(decrypt_response.body.plaintext))
data_key = self.data_key_cache.get(data_key_ciphertext)
envelope_cipher_bytes = base64.b64decode(data[4 + data_key_len:])
iv = envelope_cipher_bytes[0:kms_udf_constant.GCM_IV_LENGTH]
cipher_bytes = envelope_cipher_bytes[
kms_udf_constant.GCM_IV_LENGTH:len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH]
tag = envelope_cipher_bytes[len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH:]
decryptor = Cipher(algorithms.AES(data_key), modes.GCM(iv, tag)).decryptor()
return str(decryptor.update(cipher_bytes) + decryptor.finalize())
The following API operations of KMS are used.
KMS Instance API: The Decrypt operation is called to obtain the plaintext data key.
For more information about API operations of KMS and how to call the API operations, see API references and SDK references.
6. Create an SQL UDF
In this example, the encrypt_udf and decrypt_udf functions are created. You can delete UDFs based on your business requirements. To delete a UDF, you need to only delete the JAR package and CREATE statements.
# Add the preceding program and requirements.txt-related dependencies to the resource.
ADD PY decrypt_udf.py;
ADD PY encrypt_udf.py;
# Add the configuration file to the MaxCompute project.
ADD FILE kms_udf_conf.properties;
# Add the client key file of the KMS instance to the MaxCompute project.
ADD FILE clientkey_xxxx.json;
# Add the CA certificate file of the KMS instance to the MaxCompute project.
ADD FILE PrivateKmsCA_xxxx.pem;
# Create an encryption UDF.
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# Create a decryption UDF.
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
Parameter description
Dependency name | UDF | Description |
encrypt_udf.py | encrypt_udf | Encryption UDF |
decrypt_udf.py | decrypt_udf | Decryption UDF |
7. Configure variables
The variables required by the UDF implementation class are stored in the kms_udf_conf.properties configuration file.
# Configure variables.
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
8. Use UDFs
-- Use a UDF to encrypt data.
SELECT encrypt_udf(column_name) FROM my_table;
-- Use a UDF to decrypt data.
SELECT decrypt_udf(column_name) FROM my_table;