Use Key Management Service (KMS) instances together with user-defined functions (UDFs) to encrypt and decrypt field-level data in MaxCompute SQL. This document provides complete Java and Python implementations using envelope encryption.
How it works
MaxCompute UDFs run with high concurrency. Calling KMS directly to encrypt each field would generate too many API calls and degrade performance. Envelope encryption solves this: KMS generates a data key once per UDF initialization, and that key encrypts field values locally.
Encryption flow:
During UDF initialization, the KMS instance client initializes and generates a data key.
During data processing, the data key encrypts each field value (envelope encryption).
Decryption flow:
During UDF initialization, the KMS instance client, a thread pool, and an LRU cache initialize.
During data processing, KMS decrypts the data key ciphertext, and the plaintext data key decrypts the field value.
Encrypted field format:
Encrypted fields are stored as strings with the following layout:
| Segment | Length | Description |
|---|---|---|
| Data key length | 4 bytes (fixed) | Length of the data key ciphertext — used by the decryption UDF to locate the boundary between the data key ciphertext and the field ciphertext |
| Data key ciphertext | Variable | The encrypted data key, decrypted by KMS during decryption |
| Field ciphertext | Variable | The encrypted field value |
Envelope encryption supports only Galois/Counter Mode (GCM). When creating the customer master key (CMK), set Key Type to Symmetric Key.
Choose a UDF
SynchronousKeyEncryptUDF | EncryptUDF | |
|---|---|---|
| Use when | Large datasets, high concurrency | Small tables, low concurrency |
| Data key reuse | Keys are stored in a KMS secret and reused across runs — reduces API call volume and prevents key proliferation | A new data key is generated each time the UDF initializes |
| Performance | Higher — fewer GenerateDataKey calls | Lower — one GenerateDataKey call per UDF initialization |
| Additional setup | Requires a generic secret and a RAM secret | None |
Without SynchronousKeyEncryptUDF, each UDF worker generates its own data key. At high concurrency, this produces a large number of GenerateDataKey API calls and accumulates many distinct data keys in KMS. SynchronousKeyEncryptUDF avoids this by storing the data key as a KMS secret that all workers share.
Prerequisites
Before you begin, ensure that you have:
A KMS instance of the software key management type or hardware key management type. See Purchase and enable a KMS instance
A CMK with Key Type set to Symmetric Key (GCM mode required). See Key types and specifications
An Application Access Point (AAP) created, a client key saved, and the KMS instance CA certificate obtained. See Create an AAP and Obtain the instance CA certificate
Sample code in Java
Step 1: Install dependencies
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>0.48.6-public</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibabacloud-kms-java-sdk</artifactId>
<version>1.2.5</version>
<exclusions>
<exclusion>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>Step 2: Define the UDFUtils.java class
package com.aliyun.kms.sample;
import com.aliyun.tea.utils.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class UDFUtils {
private UDFUtils() {
// do noting
}
public static boolean isNull(String value) {
return StringUtils.isEmpty(value) || "\\N".equals(value);
}
public static String getSynchronousKeyVersion(String udfIdentify, String keyIdentify) {
UUID uuid = null;
byte[] bytes = null;
if (StringUtils.isEmpty(keyIdentify)) {
bytes = udfIdentify.getBytes(StandardCharsets.UTF_8);
uuid = UUID.nameUUIDFromBytes(bytes);
return uuid.toString();
}
bytes = (udfIdentify + "/" + keyIdentify).getBytes(StandardCharsets.UTF_8);
uuid = UUID.nameUUIDFromBytes(bytes);
return uuid.toString();
}
public static <K, V> Map<K, V> getLRUMap(int capacity) {
return new LRUMap<K, V>(capacity);
}
private static class LRUMap<K, V> extends LinkedHashMap<K, V> {
private final int capacity;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public LRUMap(int capacity) {
super(capacity, 0.75f, true);
this.capacity = capacity;
}
@Override
public V get(Object key) {
readLock.lock();
try {
return super.get(key);
} finally {
readLock.unlock();
}
}
@Override
public V put(K key, V value) {
writeLock.lock();
try {
return super.put(key, value);
} finally {
writeLock.unlock();
}
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > capacity;
}
}
}Methods:
| Method | Description |
|---|---|
isNull | Checks whether a string is empty |
getSynchronousKeyVersion | Derives a version identifier by combining the UDF ID and the keyIdentify input parameter |
Step 3: Define the UDFConstant.java class
This class defines the parameter names used in the configuration file. For parameter values, see Step 7: Configure variables.
package com.aliyun.kms.sample;
public class UDFConstant {
private UDFConstant() {
// do nothing
}
public static final String KMS_UDF_CONF_NAME = "kms_udf_conf.properties";
public static final String UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId";
public static final String KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file";
public static final String KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password";
public static final String KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file";
public static final String KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint";
public static final String KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name";
public static final String KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name";
public static final String KMS_ENDPOINT_KEY = "udf.kms.endpoint";
public static final String ENCRYPT_DATA_KEY_FORMAT = "%04d";
public static final int GCM_IV_LENGTH = 12;
public static final int GCM_IV_BASE64_LENGTH = 16;
public static final int GCM_TAG_LENGTH = 16;
}Parameters:
| Parameter | Description |
|---|---|
KMS_UDF_CONF_NAME | Name of the configuration file |
UDF_ENCRYPT_KEY_ID_KEY | ID of the key managed by KMS |
KMS_CLIENT_KEY_FILE_KEY | Path to the client key file |
KMS_CLIENT_KEY_PASSWORD_KEY | Password of the client key |
KMS_INSTANCE_CA_FILE_KEY | Path to the KMS instance CA certificate |
KMS_INSTANCE_ENDPOINT_KEY | Endpoint of the KMS instance |
KMS_SYNCHRONOUS_SECRET_NAME_KEY | Name of the generic secret (SynchronousKeyEncryptUDF only) |
KMS_RAM_SECRET_NAME_KEY | Name of the RAM secret (SynchronousKeyEncryptUDF only) |
KMS_ENDPOINT_KEY | Endpoint of the shared gateway (SynchronousKeyEncryptUDF only) |
Step 4: Create an encryption UDF
SynchronousKeyEncryptUDF (recommended for large datasets)
SynchronousKeyEncryptUDF stores data keys as KMS secrets. Multiple UDF workers look up the same key version instead of each generating a new data key. This keeps the number of data keys bounded and reduces GenerateDataKey API calls.
The following diagram shows the data key generation flow:
Before writing the UDF class, create two secrets:
Create a generic secret named `xxxxSynchronousSecret` Create a generic secret in the KMS console or by calling KMS API operations. The secret stores your data key. Set the secret name to
xxxxSynchronousSecretand use a random value as the initial secret value. Specify the secret name via theudf.synchronous.secret.nameparameter in the UDF. See Manage and use generic secrets.Create a RAM secret named `xxxxRamSecret` Create a RAM secret in the KMS console or by calling KMS API operations. The secret stores the AccessKey pair of the KMS account used to call the shared gateway API — the UDF retrieves it for identity authentication at runtime. Specify the secret name via the
udf.ram.secret.nameparameter in the UDF. See Manage and use RAM secrets and Create an AccessKey pair. Secret value format:{ "AccessKeyId": "LTAI****************", "AccessKeySecret": "yourAccessKeySecret" }
Define the SynchronousKeyEncryptUDF.java class:
This example loads credentials from a properties file for demonstration. In production, store sensitive values (client key content, passwords) outside the JAR and control access via KMS instance permissions.
package com.aliyun.kms.sample;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.*;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.tea.TeaException;
import com.aliyun.tea.utils.StringUtils;
import com.google.gson.Gson;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
public class SynchronousKeyEncryptUDF extends UDF {
private static Gson gson = new Gson();
private static Client instanceClient;
private static com.aliyun.kms.kms20160120.Client shareClient;
private static Map<String, DataKeyObject> dataKeyObjectMap;
private static Base64 base64 = new Base64();
private String runTaskIdentify;
private String keySecretName;
private String keyId;
private Properties properties = new Properties();
@Override
public void setup(ExecutionContext ctx) throws UDFException, IOException {
super.setup(ctx);
try {
InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
properties.load(in);
// Key dimension: use getRunningProject() to scope keys by project,
// or getTableInfo() to scope keys by table.
runTaskIdentify = ctx.getRunningProject();
dataKeyObjectMap = new ConcurrentHashMap<>();
keySecretName = properties.getProperty(UDFConstant.KMS_SYNCHRONOUS_SECRET_NAME_KEY);
keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
if (StringUtils.isEmpty(keySecretName)) {
throw new UDFException("the secret name is null");
}
buildInstanceClient(ctx);
checkSecretExist();
buildShareClient();
} catch (Exception e) {
throw new UDFException(e);
}
}
public String evaluate(String keyIdentify, String data) {
if (UDFUtils.isNull(data)) {
return data;
}
byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
byte[] iv = null;
byte[] cipherTextBytes = null;
try {
String keyVersion = UDFUtils.getSynchronousKeyVersion(runTaskIdentify, keyIdentify);
if (!dataKeyObjectMap.containsKey(keyVersion)) {
dataKeyObjectMap.put(keyVersion, getDataKeyObject(keyVersion));
}
DataKeyObject dataKeyObject = dataKeyObjectMap.get(keyVersion);
iv = new byte[UDFConstant.GCM_IV_LENGTH];
SecureRandom random = new SecureRandom();
random.nextBytes(iv);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
SecretKeySpec keySpec = new SecretKeySpec(dataKeyObject.plaintextBytes, "AES");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
cipherTextBytes = cipher.doFinal(dataBytes);
// Output format: [4-byte key length][data key ciphertext][base64(iv)][base64(ciphertext)]
return dataKeyObject.encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private DataKeyObject getDataKeyObject(String keyVersion) throws Exception {
try {
return getDataKeyObjectByGetSecretValue(keyVersion);
} catch (TeaException e) {
if ("Forbidden.ResourceNotFound".equals(e.code)) {
return buildDataKeyObject(keyVersion);
}
}
return null;
}
private DataKeyObject getDataKeyObjectByGetSecretValue(String keyVersion) throws Exception {
String secretData = getSecretValue(keyVersion);
DataKeyPersistentObject dataKeyPersistentObject = gson.fromJson(secretData, DataKeyPersistentObject.class);
byte[] plaintextBytes = base64.decode(dataKeyPersistentObject.plaintext);
return new DataKeyObject(plaintextBytes, dataKeyPersistentObject.encryptedDataKeyPart);
}
private DataKeyObject buildDataKeyObject(String keyVersion) throws Exception {
GenerateDataKeyRequest request = new GenerateDataKeyRequest();
request.setKeyId(keyId);
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
GenerateDataKeyResponse response = instanceClient.generateDataKeyWithOptions(request, runtimeOptions);
String plaintext = response.getBody().plaintext;
byte[] plaintextBytes = base64.decode(plaintext);
String encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
DataKeyObject dataKeyObject = new DataKeyObject(plaintextBytes, encryptedDataKeyPart);
DataKeyPersistentObject dataKeyPersistentObject = new DataKeyPersistentObject(plaintext, encryptedDataKeyPart);
PutSecretValueRequest putSecretValueRequest = new PutSecretValueRequest();
putSecretValueRequest.secretData = dataKeyPersistentObject.toJsonString();
putSecretValueRequest.secretName = keySecretName;
putSecretValueRequest.versionId = keyVersion;
try {
shareClient.putSecretValue(putSecretValueRequest);
} catch (TeaException e) {
if ("Rejected.ResourceExist".equals(e.code)) {
return getDataKeyObjectByGetSecretValue(keyVersion);
}
}
return dataKeyObject;
}
@Override
public void close() throws UDFException, IOException {
super.close();
}
private void buildInstanceClient(ExecutionContext ctx) throws Exception {
try {
KmsConfig config = new KmsConfig();
String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
byte[] caFileContent = ctx.readResourceFile(caFileName);
config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
instanceClient = new TransferClient(config);
} catch (Exception e) {
throw new UDFException(e);
}
}
private void checkSecretExist() throws Exception {
getSecretValue(null);
}
private String getSecretValue(String versionId) throws Exception {
GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest();
getSecretValueRequest.setSecretName(keySecretName);
if (!StringUtils.isEmpty(versionId)) {
getSecretValueRequest.setVersionId(versionId);
}
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions);
return getSecretValueResponse.body.secretData;
}
private void buildShareClient() throws Exception {
String ramSecretName = properties.getProperty(UDFConstant.KMS_RAM_SECRET_NAME_KEY);
GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest();
getSecretValueRequest.setSecretName(ramSecretName);
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions);
AccessKeyObject accessKeyObject = gson.fromJson(getSecretValueResponse.body.secretData, AccessKeyObject.class);
com.aliyun.teaopenapi.models.Config shareConfig = new com.aliyun.teaopenapi.models.Config()
.setAccessKeyId(accessKeyObject.AccessKeyId)
.setAccessKeySecret(accessKeyObject.AccessKeySecret)
.setEndpoint(properties.getProperty(UDFConstant.KMS_ENDPOINT_KEY));
shareClient = new com.aliyun.kms.kms20160120.Client(shareConfig);
}
private static class AccessKeyObject implements Serializable {
private String AccessKeyId;
private String AccessKeySecret;
}
private final static class DataKeyObject {
private byte[] plaintextBytes;
private String encryptedDataKeyPart;
public DataKeyObject() {
}
public DataKeyObject(byte[] plaintextBytes, String encryptedDataKeyPart) {
this.plaintextBytes = plaintextBytes;
this.encryptedDataKeyPart = encryptedDataKeyPart;
}
}
private final static class DataKeyPersistentObject implements Serializable {
private String plaintext;
private String encryptedDataKeyPart;
public DataKeyPersistentObject() {
}
public DataKeyPersistentObject(String plaintext, String encryptedDataKeyPart) {
this.plaintext = plaintext;
this.encryptedDataKeyPart = encryptedDataKeyPart;
}
public String toJsonString() {
return gson.toJson(this);
}
}
}Key methods:
| Method | Description |
|---|---|
setup | Reads the configuration file, initializes the KMS instance SDK and Alibaba Cloud SDK, and checks whether the initial secret exists |
buildInstanceClient | Initializes the KMS instance SDK |
buildShareClient | Retrieves the AccessKey pair from the RAM secret and initializes the Alibaba Cloud SDK |
getDataKeyObjectByGetSecretValue | Retrieves a data key stored in the generic secret |
buildDataKeyObject | Generates a new data key and stores it in the secret |
evaluate | Encrypts a field value and returns the encrypted string |
KMS API operations used:
KMS instance API: GenerateDataKey (generate a data key) and GetSecretValue (retrieve a secret value)
KMS API: PutSecretValue (store a secret value)
For full API and SDK documentation, see API references and SDK references.
EncryptUDF (for small tables with low concurrency)
Define the EncryptUDF.java class:
package com.aliyun.kms.sample;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.GenerateDataKeyRequest;
import com.aliyun.kms20160120.models.GenerateDataKeyResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Properties;
public class EncryptUDF extends UDF {
private static Client client;
private static String plaintext;
private static byte[] plaintextBytes;
private static String encryptedDataKeyPart;
private static Base64 base64 = new Base64();
@Override
public void setup(ExecutionContext ctx) throws UDFException, IOException {
super.setup(ctx);
try {
InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
Properties properties = new Properties();
properties.load(in);
KmsConfig config = new KmsConfig();
String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
byte[] caFileContent = ctx.readResourceFile(caFileName);
config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
String keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
client = new TransferClient(config);
GenerateDataKeyRequest request = new GenerateDataKeyRequest();
request.setKeyId(keyId);
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
GenerateDataKeyResponse response = client.generateDataKeyWithOptions(request, runtimeOptions);
plaintext = response.getBody().plaintext;
plaintextBytes = base64.decode(plaintext);
// Output format: [4-byte key length][data key ciphertext]
encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
} catch (Exception e) {
throw new UDFException(e);
}
}
public String evaluate(String data) {
if (UDFUtils.isNull(data)) {
return data;
}
byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
byte[] iv = null;
byte[] cipherTextBytes = null;
try {
iv = new byte[UDFConstant.GCM_IV_LENGTH];
SecureRandom random = new SecureRandom();
random.nextBytes(iv);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
SecretKeySpec keySpec = new SecretKeySpec(plaintextBytes, "AES");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
cipherTextBytes = cipher.doFinal(dataBytes);
// Output format: [4-byte key length][data key ciphertext][base64(iv)][base64(ciphertext)]
return encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public void close() throws UDFException, IOException {
super.close();
}
}Key methods:
| Method | Description |
|---|---|
setup | Reads the configuration file, initializes the KMS instance SDK, and generates the data key ciphertext |
evaluate | Encrypts a field value and returns the encrypted string |
KMS API operations used:
KMS instance API: GenerateDataKey (generate a data key)
For full API and SDK documentation, see API references and SDK references.
Step 5: Create a decryption UDF
Define the DecryptUDF.java class:
package com.aliyun.kms.sample;
import com.aliyun.dkms.gcs.openapi.models.Config;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.DecryptRequest;
import com.aliyun.kms20160120.models.DecryptResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
public class DecryptUDF extends UDF {
private static Client client;
private static Map<String, byte[]> dataKeyMap;
// Offset of the 4-byte data key length prefix
private static final int BASE_STEP=4;
private static Base64 base64 = new Base64();
@Override
public void setup(ExecutionContext ctx) throws UDFException, IOException {
super.setup(ctx);
try {
InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
Properties properties = new Properties();
properties.load(in);
KmsConfig config = new KmsConfig();
String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
byte[] caFileContent = ctx.readResourceFile(caFileName);
config.setCa(new String(caFileContent,StandardCharsets.UTF_8));
config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
// Cache up to 1,000 plaintext data keys to avoid repeated KMS Decrypt calls
dataKeyMap = UDFUtils.getLRUMap(1000);
client = new TransferClient(config);
} catch (Exception e) {
throw new UDFException(e);
}
}
public String evaluate(String data) {
if (UDFUtils.isNull(data)) {
return data;
}
if (data.length() <= BASE_STEP) {
return data;
}
// Read the 4-byte prefix to determine the length of the data key ciphertext
int dataLength = Integer.valueOf(data.substring(0, 4));
if (data.length() <= BASE_STEP + dataLength) {
return data;
}
// Extract the data key ciphertext segment
String dataKeyCiphertext = data.substring(4, 4 + dataLength);
try {
byte[] dataKeyBytes = getDataKeyPlaintext(dataKeyCiphertext);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
SecretKeySpec keySpec = new SecretKeySpec(dataKeyBytes, "AES");
// Extract the IV segment (base64-encoded, 16 chars = 12 bytes decoded)
byte[] iv = base64.decode(data.substring(4 + dataLength, 4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
// Extract the field ciphertext segment
byte[] cipherText = base64.decode(data.substring(4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
cipher.init(Cipher.DECRYPT_MODE, keySpec, gcmParameterSpec);
byte[] decryptedData = cipher.doFinal(cipherText);
return new String(decryptedData, "UTF-8");
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private byte[] getDataKeyPlaintext(String dataKeyCiphertext) throws Exception {
if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
synchronized (dataKeyCiphertext.intern()) {
DecryptRequest request = new DecryptRequest();
request.ciphertextBlob = dataKeyCiphertext;
KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
DecryptResponse response = client.decryptWithOptions(request, runtimeOptions);
if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
dataKeyMap.put(dataKeyCiphertext, base64.decode(response.getBody().plaintext));
}
}
}
return dataKeyMap.get(dataKeyCiphertext);
}
@Override
public void close() throws UDFException, IOException {
super.close();
}
}Key methods:
| Method | Description |
|---|---|
setup | Reads the configuration file, initializes the KMS instance SDK, and sets up the LRU cache |
getDataKeyPlaintext | Retrieves the plaintext data key, using the LRU cache to avoid redundant KMS Decrypt calls |
evaluate | Decrypts a field value using the plaintext data key |
KMS API operations used:
KMS instance API: Decrypt (decrypt the data key ciphertext)
For full API and SDK documentation, see API references and SDK references.
Step 6: Register the UDFs in MaxCompute
Before creating a UDF, add the JAR package of the UDF implementation class to your MaxCompute project. Remove any UDF (and its JAR package and CREATE statement) that your workflow does not need.
-- Add JAR packages to the MaxCompute project
ADD JAR encrypt_udf.jar;
ADD JAR sync_key_encrypt_udf.jar;
ADD JAR decrypt_udf.jar;
-- Add the configuration file
ADD FILE kms_udf_conf.properties;
-- Add the KMS instance client key file
ADD FILE clientkey_xxxx.json;
-- Add the KMS instance CA certificate
ADD FILE PrivateKmsCA_xxxx.pem;
-- Create the standard encryption UDF
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
-- Create the high-performance encryption UDF
CREATE FUNCTION sync_key_encrypt_udf AS 'com.aliyun.kms.sample.SynchronousKeyEncryptUDF'
USING 'sync_key_encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
-- Create the decryption UDF
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';| Implementation class | JAR package | UDF name | Description |
|---|---|---|---|
EncryptUDF.java | encrypt_udf.jar | encrypt_udf | Standard encryption UDF |
SynchronousKeyEncryptUDF.java | sync_key_encrypt_udf.jar | sync_key_encrypt_udf | High-performance encryption UDF |
DecryptUDF.java | decrypt_udf.jar | decrypt_udf | Decryption UDF |
Step 7: Configure variables
All UDF configuration is stored in kms_udf_conf.properties.
Configuration file for the high-performance UDF
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
# (SynchronousKeyEncryptUDF only) Specify the name of the generic secret.
udf.synchronous.secret.name=xxxxSynchronousSecretName
# (SynchronousKeyEncryptUDF only) Specify the name of the RAM secret.
udf.ram.secret.name=xxxxRamSecret
# (SynchronousKeyEncryptUDF only) Specify the endpoint of the shared gateway.
udf.kms.endpoint=kms-vpc.cn-xxx.aliyuncs.comRegular configuration file
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.comStep 8: Use the UDFs
-- Encrypt a column using the standard UDF
SELECT encrypt_udf(column_name) FROM my_table;
-- Encrypt a column using the high-performance UDF
-- version: the secret version (defaults to null)
SELECT sync_key_encrypt_udf(version, column_name) FROM my_table;
-- Decrypt a column
SELECT decrypt_udf(column_name) FROM my_table;Sample code in Python
Step 1: Install dependencies
pyodps==0.11.6.2
alibabacloud-kms-python-sdk==1.1.3Step 2: Define the kms_udf_utils.py class
# coding=utf-8
import uuid
from collections import OrderedDict
def is_null(value):
return len(value) == 0 or value == '\\N'
def generate_uuid(key_version):
return uuid.uuid5(uuid.NAMESPACE_DNS, key_version)
def get_config_value(config_dict, dict_key):
if dict_key not in config_dict:
raise ValueError("can not find key[{}]" % dict_key)
return config_dict[dict_key]
def properties_to_dict(cache_file):
config_dict = {}
for line in cache_file:
line = line.strip()
if not line:
continue
k, v = line.split("=")
config_dict[k.strip()] = v.strip()
return config_dict
def object_to_dict(obj):
return obj.__dict__
class LRUCache(object):
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return -1
value = self.cache.pop(key) # Remove the item to update the order
self.cache[key] = value # Re-insert it to the end
return value
def put(self, key, value):
if key in self.cache:
self.cache.pop(key) # Remove the existing entry to update the order
elif len(self.cache) == self.capacity:
self.cache.popitem(last=False) # Remove the least recently used item
self.cache[key] = value
def contains_key(self, key):
return key in self.cacheStep 3: Define the kms_udf_constant.py class
# coding=utf-8
UDF_CONFIG_CACHE_FILE = "kms_udf_conf.properties"
KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file"
KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password"
KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint"
KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file"
KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name"
KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name"
KMS_REGION_KEY = "udf.kms.regionId"
UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId"
ENCRYPT_DATA_KEY_FORMAT = "%04d"
GCM_IV_LENGTH = 12
GCM_TAG_LENGTH = 16
DEFAULT_NUMBER_OF_BYTES = 32Parameters:
| Parameter | Description |
|---|---|
UDF_CONFIG_CACHE_FILE | Name of the configuration file |
KMS_CLIENT_KEY_FILE_KEY | Path to the client key file |
KMS_CLIENT_KEY_PASSWORD_KEY | Password of the client key |
KMS_INSTANCE_ENDPOINT_KEY | Endpoint of the KMS instance |
KMS_INSTANCE_CA_FILE_KEY | Path to the KMS instance CA certificate |
KMS_SYNCHRONOUS_SECRET_NAME_KEY | Name of the generic secret |
KMS_RAM_SECRET_NAME_KEY | Name of the RAM secret |
UDF_ENCRYPT_KEY_ID_KEY | ID of the key managed by KMS |
Step 4: Create an encryption UDF
Define the encrypt_udf.py class:
This example loads credentials from a properties file for demonstration. In production, store sensitive values (client key content, passwords) outside the project and control access via KMS instance permissions.
# coding=utf-8
import base64
from alibabacloud_kms20160120.models import GenerateDataKeyRequest
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import random
import string
import kms_udf_constant
import kms_udf_utils
class EncryptUDF(object):
def __init__(self):
cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
cache_file.close()
kms_config = KmsConfig(
protocol='https',
client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
)
self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
self.instance_client = TransferClient(kms_config=kms_config)
resp = self.generate_data_key()
# Output format: [4-byte key length][data key ciphertext]
self.encrypted_data_key_part = (kms_udf_constant.ENCRYPT_DATA_KEY_FORMAT %
len(resp.body.ciphertext_blob) + resp.body.ciphertext_blob)
self.plaintext = resp.body.plaintext
self.ciphertext_blob = resp.body.ciphertext_blob
def evaluate(self, data):
if kms_udf_utils.is_null(data):
return data
iv = bytes(''.join(random.sample(string.ascii_letters + string.digits, kms_udf_constant.GCM_IV_LENGTH)),
encoding="utf-8")
data_bytes = data.encode("utf-8")
encryptor = Cipher(
algorithms.AES(base64.b64decode(self.plaintext)),
modes.GCM(iv),
).encryptor()
ciphertext = encryptor.update(data_bytes) + encryptor.finalize()
# Output format: [4-byte key length][data key ciphertext][base64(iv)][base64(ciphertext+tag)]
return self.encrypted_data_key_part + str(base64.b64encode(iv), "utf-8") + str(
base64.b64encode(ciphertext + encryptor.tag), "utf-8")
def generate_data_key(self):
request = GenerateDataKeyRequest()
request.key_id = self.keyId
request.number_of_bytes = kms_udf_constant.DEFAULT_NUMBER_OF_BYTES
runtime = KmsRuntimeOptions(
ca=self.ca_file_path
)
return self.instance_client.generate_data_key_with_options(request, runtime)KMS API operations used:
KMS instance API: GenerateDataKey (generate a data key)
For full API and SDK documentation, see API references and SDK references.
Step 5: Create a decryption UDF
Define the decrypt_udf.py class:
# coding=utf-8
import base64
from alibabacloud_kms20160120 import models as kms_20160120_models
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import kms_udf_constant
import kms_udf_utils
from kms_udf_utils import LRUCache
class DecryptUDF(object):
def __init__(self):
cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
cache_file.close()
kms_config = KmsConfig(
protocol='https',
client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
)
self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
self.instance_client = TransferClient(kms_config=kms_config)
# Cache up to 1,000 plaintext data keys to avoid repeated KMS Decrypt calls
self.data_key_cache = LRUCache(1000)
def evaluate(self, data):
if kms_udf_utils.is_null(data):
return data
if len(data) < 4:
return data
# Read the 4-byte prefix to get the data key ciphertext length
data_key_len = int(data[0:4])
if len(data) < 4 + data_key_len:
return data
# Extract the data key ciphertext segment
data_key_ciphertext = data[4:4 + data_key_len]
if not self.data_key_cache.contains_key(data_key_ciphertext):
decrypt_request = kms_20160120_models.DecryptRequest(
ciphertext_blob=data_key_ciphertext
)
decrypt_runtime = KmsRuntimeOptions(
ca=self.ca_file_path
)
decrypt_response = self.instance_client.decrypt_with_options(decrypt_request, decrypt_runtime)
self.data_key_cache.put(data_key_ciphertext, base64.b64decode(decrypt_response.body.plaintext))
data_key = self.data_key_cache.get(data_key_ciphertext)
# Decode the combined IV + ciphertext + tag segment
envelope_cipher_bytes = base64.b64decode(data[4 + data_key_len:])
iv = envelope_cipher_bytes[0:kms_udf_constant.GCM_IV_LENGTH]
cipher_bytes = envelope_cipher_bytes[
kms_udf_constant.GCM_IV_LENGTH:len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH]
tag = envelope_cipher_bytes[len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH:]
decryptor = Cipher(algorithms.AES(data_key), modes.GCM(iv, tag)).decryptor()
return str(decryptor.update(cipher_bytes) + decryptor.finalize())KMS API operations used:
KMS instance API: Decrypt (decrypt the data key ciphertext)
For full API and SDK documentation, see API references and SDK references.
Step 6: Register the UDFs in MaxCompute
Remove any UDF (and its source file and CREATE statement) that your workflow does not need.
-- Add source files to the resource
ADD PY decrypt_udf.py;
ADD PY encrypt_udf.py;
-- Add the configuration file
ADD FILE kms_udf_conf.properties;
-- Add the KMS instance client key file
ADD FILE clientkey_xxxx.json;
-- Add the KMS instance CA certificate
ADD FILE PrivateKmsCA_xxxx.pem;
-- Create the encryption UDF
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
-- Create the decryption UDF
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';| Dependency | UDF name | Description |
|---|---|---|
encrypt_udf.py | encrypt_udf | Encryption UDF |
decrypt_udf.py | decrypt_udf | Decryption UDF |
Step 7: Configure variables
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.comStep 8: Use the UDFs
-- Encrypt a column
SELECT encrypt_udf(column_name) FROM my_table;
-- Decrypt a column
SELECT decrypt_udf(column_name) FROM my_table;