All Products
Search
Document Center

Key Management Service:Use KMS to build MaxCompute UDFs to encrypt and decrypt data

Last Updated:Mar 31, 2026

Use Key Management Service (KMS) instances together with user-defined functions (UDFs) to encrypt and decrypt field-level data in MaxCompute SQL. This document provides complete Java and Python implementations using envelope encryption.

How it works

MaxCompute UDFs run with high concurrency. Calling KMS directly to encrypt each field would generate too many API calls and degrade performance. Envelope encryption solves this: KMS generates a data key once per UDF initialization, and that key encrypts field values locally.

Encryption flow:

  1. During UDF initialization, the KMS instance client initializes and generates a data key.

  2. During data processing, the data key encrypts each field value (envelope encryption).

Decryption flow:

  1. During UDF initialization, the KMS instance client, a thread pool, and an LRU cache initialize.

  2. During data processing, KMS decrypts the data key ciphertext, and the plaintext data key decrypts the field value.

Encrypted field format:

Encrypted fields are stored as strings with the following layout:

SegmentLengthDescription
Data key length4 bytes (fixed)Length of the data key ciphertext — used by the decryption UDF to locate the boundary between the data key ciphertext and the field ciphertext
Data key ciphertextVariableThe encrypted data key, decrypted by KMS during decryption
Field ciphertextVariableThe encrypted field value
Envelope encryption supports only Galois/Counter Mode (GCM). When creating the customer master key (CMK), set Key Type to Symmetric Key.

Choose a UDF

SynchronousKeyEncryptUDFEncryptUDF
Use whenLarge datasets, high concurrencySmall tables, low concurrency
Data key reuseKeys are stored in a KMS secret and reused across runs — reduces API call volume and prevents key proliferationA new data key is generated each time the UDF initializes
PerformanceHigher — fewer GenerateDataKey callsLower — one GenerateDataKey call per UDF initialization
Additional setupRequires a generic secret and a RAM secretNone

Without SynchronousKeyEncryptUDF, each UDF worker generates its own data key. At high concurrency, this produces a large number of GenerateDataKey API calls and accumulates many distinct data keys in KMS. SynchronousKeyEncryptUDF avoids this by storing the data key as a KMS secret that all workers share.

Prerequisites

Before you begin, ensure that you have:

Sample code in Java

Step 1: Install dependencies

<dependencies>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-udf</artifactId>
            <version>0.48.6-public</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>alibabacloud-kms-java-sdk</artifactId>
            <version>1.2.5</version>
            <exclusions>
                <exclusion>
                    <groupId>com.aliyun</groupId>
                    <artifactId>tea</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.2.3</version>
        </dependency>
</dependencies>

Step 2: Define the UDFUtils.java class

package com.aliyun.kms.sample;

import com.aliyun.tea.utils.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class UDFUtils {

    private UDFUtils() {
        // do noting
    }

    public static boolean isNull(String value) {
        return StringUtils.isEmpty(value) || "\\N".equals(value);
    }

    public static String getSynchronousKeyVersion(String udfIdentify, String keyIdentify) {
        UUID uuid = null;
        byte[] bytes = null;
        if (StringUtils.isEmpty(keyIdentify)) {
            bytes = udfIdentify.getBytes(StandardCharsets.UTF_8);
            uuid = UUID.nameUUIDFromBytes(bytes);
            return uuid.toString();
        }
        bytes = (udfIdentify + "/" + keyIdentify).getBytes(StandardCharsets.UTF_8);
        uuid = UUID.nameUUIDFromBytes(bytes);
        return uuid.toString();
    }

    public static <K, V> Map<K, V> getLRUMap(int capacity) {
        return new LRUMap<K, V>(capacity);
    }

    private static class LRUMap<K, V> extends LinkedHashMap<K, V> {
        private final int capacity;
        private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private final Lock readLock = readWriteLock.readLock();
        private final Lock writeLock = readWriteLock.writeLock();

        public LRUMap(int capacity) {
            super(capacity, 0.75f, true);
            this.capacity = capacity;
        }

        @Override
        public V get(Object key) {
            readLock.lock();
            try {
                return super.get(key);
            } finally {
                readLock.unlock();
            }
        }

        @Override
        public V put(K key, V value) {
            writeLock.lock();
            try {
                return super.put(key, value);
            } finally {
                writeLock.unlock();
            }
        }

        @Override
        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
            return size() > capacity;
        }
    }

}

Methods:

MethodDescription
isNullChecks whether a string is empty
getSynchronousKeyVersionDerives a version identifier by combining the UDF ID and the keyIdentify input parameter

Step 3: Define the UDFConstant.java class

This class defines the parameter names used in the configuration file. For parameter values, see Step 7: Configure variables.

package com.aliyun.kms.sample;

public class UDFConstant {

    private UDFConstant() {
        // do nothing
    }
    public static final String KMS_UDF_CONF_NAME = "kms_udf_conf.properties";
    public static final String UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId";
    public static final String KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file";
    public static final String KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password";
    public static final String KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file";
    public static final String KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint";
    public static final String KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name";
    public static final String KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name";
    public static final String KMS_ENDPOINT_KEY = "udf.kms.endpoint";
    public static final String ENCRYPT_DATA_KEY_FORMAT = "%04d";
    public static final int GCM_IV_LENGTH = 12;
    public static final int GCM_IV_BASE64_LENGTH = 16;
    public static final int GCM_TAG_LENGTH = 16;
}

Parameters:

ParameterDescription
KMS_UDF_CONF_NAMEName of the configuration file
UDF_ENCRYPT_KEY_ID_KEYID of the key managed by KMS
KMS_CLIENT_KEY_FILE_KEYPath to the client key file
KMS_CLIENT_KEY_PASSWORD_KEYPassword of the client key
KMS_INSTANCE_CA_FILE_KEYPath to the KMS instance CA certificate
KMS_INSTANCE_ENDPOINT_KEYEndpoint of the KMS instance
KMS_SYNCHRONOUS_SECRET_NAME_KEYName of the generic secret (SynchronousKeyEncryptUDF only)
KMS_RAM_SECRET_NAME_KEYName of the RAM secret (SynchronousKeyEncryptUDF only)
KMS_ENDPOINT_KEYEndpoint of the shared gateway (SynchronousKeyEncryptUDF only)

Step 4: Create an encryption UDF

SynchronousKeyEncryptUDF (recommended for large datasets)

SynchronousKeyEncryptUDF stores data keys as KMS secrets. Multiple UDF workers look up the same key version instead of each generating a new data key. This keeps the number of data keys bounded and reduces GenerateDataKey API calls.

The following diagram shows the data key generation flow:

image

Before writing the UDF class, create two secrets:

  1. Create a generic secret named `xxxxSynchronousSecret` Create a generic secret in the KMS console or by calling KMS API operations. The secret stores your data key. Set the secret name to xxxxSynchronousSecret and use a random value as the initial secret value. Specify the secret name via the udf.synchronous.secret.name parameter in the UDF. See Manage and use generic secrets.

  2. Create a RAM secret named `xxxxRamSecret` Create a RAM secret in the KMS console or by calling KMS API operations. The secret stores the AccessKey pair of the KMS account used to call the shared gateway API — the UDF retrieves it for identity authentication at runtime. Specify the secret name via the udf.ram.secret.name parameter in the UDF. See Manage and use RAM secrets and Create an AccessKey pair. Secret value format:

    {
      "AccessKeyId": "LTAI****************",
      "AccessKeySecret": "yourAccessKeySecret"
    }

Define the SynchronousKeyEncryptUDF.java class:

Important

This example loads credentials from a properties file for demonstration. In production, store sensitive values (client key content, passwords) outside the JAR and control access via KMS instance permissions.

package com.aliyun.kms.sample;

import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.*;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.tea.TeaException;
import com.aliyun.tea.utils.StringUtils;
import com.google.gson.Gson;
import org.apache.commons.codec.binary.Base64;

import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

public class SynchronousKeyEncryptUDF extends UDF {

    private static Gson gson = new Gson();
    private static Client instanceClient;
    private static com.aliyun.kms.kms20160120.Client shareClient;

    private static Map<String, DataKeyObject> dataKeyObjectMap;
    private static Base64 base64 = new Base64();

    private String runTaskIdentify;
    private String keySecretName;
    private String keyId;
    private Properties properties = new Properties();

    @Override
    public void setup(ExecutionContext ctx) throws UDFException, IOException {
        super.setup(ctx);
        try {
            InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
            properties.load(in);
            // Key dimension: use getRunningProject() to scope keys by project,
            // or getTableInfo() to scope keys by table.
            runTaskIdentify = ctx.getRunningProject();
            dataKeyObjectMap = new ConcurrentHashMap<>();
            keySecretName = properties.getProperty(UDFConstant.KMS_SYNCHRONOUS_SECRET_NAME_KEY);
            keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);

            if (StringUtils.isEmpty(keySecretName)) {
                throw new UDFException("the secret name is null");
            }
            buildInstanceClient(ctx);
            checkSecretExist();
            buildShareClient();
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }

    public String evaluate(String keyIdentify, String data) {
        if (UDFUtils.isNull(data)) {
            return data;
        }
        byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
        byte[] iv = null;
        byte[] cipherTextBytes = null;
        try {
            String keyVersion = UDFUtils.getSynchronousKeyVersion(runTaskIdentify, keyIdentify);
            if (!dataKeyObjectMap.containsKey(keyVersion)) {
                dataKeyObjectMap.put(keyVersion, getDataKeyObject(keyVersion));
            }
            DataKeyObject dataKeyObject = dataKeyObjectMap.get(keyVersion);
            iv = new byte[UDFConstant.GCM_IV_LENGTH];
            SecureRandom random = new SecureRandom();
            random.nextBytes(iv);
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(dataKeyObject.plaintextBytes, "AES");
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
            cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
            cipherTextBytes = cipher.doFinal(dataBytes);
            // Output format: [4-byte key length][data key ciphertext][base64(iv)][base64(ciphertext)]
            return dataKeyObject.encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private DataKeyObject getDataKeyObject(String keyVersion) throws Exception {
        try {
            return getDataKeyObjectByGetSecretValue(keyVersion);
        } catch (TeaException e) {
            if ("Forbidden.ResourceNotFound".equals(e.code)) {
                return buildDataKeyObject(keyVersion);
            }
        }
        return null;
    }

    private DataKeyObject getDataKeyObjectByGetSecretValue(String keyVersion) throws Exception {
        String secretData = getSecretValue(keyVersion);
        DataKeyPersistentObject dataKeyPersistentObject = gson.fromJson(secretData, DataKeyPersistentObject.class);
        byte[] plaintextBytes = base64.decode(dataKeyPersistentObject.plaintext);
        return new DataKeyObject(plaintextBytes, dataKeyPersistentObject.encryptedDataKeyPart);
    }

    private DataKeyObject buildDataKeyObject(String keyVersion) throws Exception {
        GenerateDataKeyRequest request = new GenerateDataKeyRequest();
        request.setKeyId(keyId);
        KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
        GenerateDataKeyResponse response = instanceClient.generateDataKeyWithOptions(request, runtimeOptions);
        String plaintext = response.getBody().plaintext;
        byte[] plaintextBytes = base64.decode(plaintext);
        String encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;

        DataKeyObject dataKeyObject = new DataKeyObject(plaintextBytes, encryptedDataKeyPart);
        DataKeyPersistentObject dataKeyPersistentObject = new DataKeyPersistentObject(plaintext, encryptedDataKeyPart);
        PutSecretValueRequest putSecretValueRequest = new PutSecretValueRequest();
        putSecretValueRequest.secretData = dataKeyPersistentObject.toJsonString();
        putSecretValueRequest.secretName = keySecretName;
        putSecretValueRequest.versionId = keyVersion;

        try {
            shareClient.putSecretValue(putSecretValueRequest);
        } catch (TeaException e) {
            if ("Rejected.ResourceExist".equals(e.code)) {
                return getDataKeyObjectByGetSecretValue(keyVersion);
            }
        }
        return dataKeyObject;
    }

    @Override
    public void close() throws UDFException, IOException {
        super.close();
    }

    private void buildInstanceClient(ExecutionContext ctx) throws Exception {
        try {
            KmsConfig config = new KmsConfig();
            String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
            byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
            config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
            String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
            byte[] caFileContent = ctx.readResourceFile(caFileName);
            config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
            config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
            config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
            instanceClient = new TransferClient(config);
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }

    private void checkSecretExist() throws Exception {
        getSecretValue(null);
    }

    private String getSecretValue(String versionId) throws Exception {
        GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest();
        getSecretValueRequest.setSecretName(keySecretName);
        if (!StringUtils.isEmpty(versionId)) {
            getSecretValueRequest.setVersionId(versionId);
        }
        KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
        GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions);
        return getSecretValueResponse.body.secretData;
    }

    private void buildShareClient() throws Exception {
        String ramSecretName = properties.getProperty(UDFConstant.KMS_RAM_SECRET_NAME_KEY);
        GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest();
        getSecretValueRequest.setSecretName(ramSecretName);
        KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
        GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions);
        AccessKeyObject accessKeyObject = gson.fromJson(getSecretValueResponse.body.secretData, AccessKeyObject.class);
        com.aliyun.teaopenapi.models.Config shareConfig = new com.aliyun.teaopenapi.models.Config()
                .setAccessKeyId(accessKeyObject.AccessKeyId)
                .setAccessKeySecret(accessKeyObject.AccessKeySecret)
                .setEndpoint(properties.getProperty(UDFConstant.KMS_ENDPOINT_KEY));
        shareClient = new com.aliyun.kms.kms20160120.Client(shareConfig);
    }


    private static class AccessKeyObject implements Serializable {
        private String AccessKeyId;
        private String AccessKeySecret;
    }

    private final static class DataKeyObject {
        private byte[] plaintextBytes;
        private String encryptedDataKeyPart;

        public DataKeyObject() {
        }

        public DataKeyObject(byte[] plaintextBytes, String encryptedDataKeyPart) {
            this.plaintextBytes = plaintextBytes;
            this.encryptedDataKeyPart = encryptedDataKeyPart;
        }
    }

    private final static class DataKeyPersistentObject implements Serializable {
        private String plaintext;
        private String encryptedDataKeyPart;

        public DataKeyPersistentObject() {
        }

        public DataKeyPersistentObject(String plaintext, String encryptedDataKeyPart) {
            this.plaintext = plaintext;
            this.encryptedDataKeyPart = encryptedDataKeyPart;
        }

        public String toJsonString() {
            return gson.toJson(this);
        }
    }

}

Key methods:

MethodDescription
setupReads the configuration file, initializes the KMS instance SDK and Alibaba Cloud SDK, and checks whether the initial secret exists
buildInstanceClientInitializes the KMS instance SDK
buildShareClientRetrieves the AccessKey pair from the RAM secret and initializes the Alibaba Cloud SDK
getDataKeyObjectByGetSecretValueRetrieves a data key stored in the generic secret
buildDataKeyObjectGenerates a new data key and stores it in the secret
evaluateEncrypts a field value and returns the encrypted string

KMS API operations used:

For full API and SDK documentation, see API references and SDK references.

EncryptUDF (for small tables with low concurrency)

Define the EncryptUDF.java class:

package com.aliyun.kms.sample;

import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.GenerateDataKeyRequest;
import com.aliyun.kms20160120.models.GenerateDataKeyResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;

import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Properties;

public class EncryptUDF extends UDF {

    private static Client client;
    private static String plaintext;
    private static byte[] plaintextBytes;
    private static String encryptedDataKeyPart;

    private static Base64 base64 = new Base64();

    @Override
    public void setup(ExecutionContext ctx) throws UDFException, IOException {
        super.setup(ctx);
        try {
            InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
            Properties properties = new Properties();
            properties.load(in);
            KmsConfig config = new KmsConfig();
            String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
            byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
            config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
            String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
            byte[] caFileContent = ctx.readResourceFile(caFileName);
            config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
            config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
            config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
            String keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
            client = new TransferClient(config);
            GenerateDataKeyRequest request = new GenerateDataKeyRequest();
            request.setKeyId(keyId);
            KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
            GenerateDataKeyResponse response = client.generateDataKeyWithOptions(request, runtimeOptions);
            plaintext = response.getBody().plaintext;
            plaintextBytes = base64.decode(plaintext);
            // Output format: [4-byte key length][data key ciphertext]
            encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }

    public String evaluate(String data) {
        if (UDFUtils.isNull(data)) {
            return data;
        }
        byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
        byte[] iv = null;
        byte[] cipherTextBytes = null;
        try {
            iv = new byte[UDFConstant.GCM_IV_LENGTH];
            SecureRandom random = new SecureRandom();
            random.nextBytes(iv);
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(plaintextBytes, "AES");
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
            cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
            cipherTextBytes = cipher.doFinal(dataBytes);
            // Output format: [4-byte key length][data key ciphertext][base64(iv)][base64(ciphertext)]
            return encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() throws UDFException, IOException {
        super.close();
    }

}

Key methods:

MethodDescription
setupReads the configuration file, initializes the KMS instance SDK, and generates the data key ciphertext
evaluateEncrypts a field value and returns the encrypted string

KMS API operations used:

For full API and SDK documentation, see API references and SDK references.

Step 5: Create a decryption UDF

Define the DecryptUDF.java class:

package com.aliyun.kms.sample;

import com.aliyun.dkms.gcs.openapi.models.Config;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.DecryptRequest;
import com.aliyun.kms20160120.models.DecryptResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;

import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;

public class DecryptUDF extends UDF {

    private static Client client;
    private static Map<String, byte[]> dataKeyMap;
    // Offset of the 4-byte data key length prefix
    private static final int BASE_STEP=4;

    private static Base64 base64 = new Base64();

    @Override
    public void setup(ExecutionContext ctx) throws UDFException, IOException {
        super.setup(ctx);
        try {
            InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
            Properties properties = new Properties();
            properties.load(in);
            KmsConfig config = new KmsConfig();
            String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
            byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
            config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
            String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
            byte[] caFileContent = ctx.readResourceFile(caFileName);
            config.setCa(new String(caFileContent,StandardCharsets.UTF_8));
            config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
            config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
            // Cache up to 1,000 plaintext data keys to avoid repeated KMS Decrypt calls
            dataKeyMap = UDFUtils.getLRUMap(1000);
            client = new TransferClient(config);
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }

    public String evaluate(String data) {
        if (UDFUtils.isNull(data)) {
            return data;
        }
        if (data.length() <= BASE_STEP) {
            return data;
        }
        // Read the 4-byte prefix to determine the length of the data key ciphertext
        int dataLength = Integer.valueOf(data.substring(0, 4));
        if (data.length() <= BASE_STEP + dataLength) {
            return data;
        }
        // Extract the data key ciphertext segment
        String dataKeyCiphertext = data.substring(4, 4 + dataLength);
        try {
            byte[] dataKeyBytes = getDataKeyPlaintext(dataKeyCiphertext);
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(dataKeyBytes, "AES");
            // Extract the IV segment (base64-encoded, 16 chars = 12 bytes decoded)
            byte[] iv = base64.decode(data.substring(4 + dataLength, 4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
            // Extract the field ciphertext segment
            byte[] cipherText = base64.decode(data.substring(4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
            cipher.init(Cipher.DECRYPT_MODE, keySpec, gcmParameterSpec);
            byte[] decryptedData = cipher.doFinal(cipherText);
            return new String(decryptedData, "UTF-8");

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private byte[] getDataKeyPlaintext(String dataKeyCiphertext) throws Exception {
        if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
            synchronized (dataKeyCiphertext.intern()) {
                DecryptRequest request = new DecryptRequest();
                request.ciphertextBlob = dataKeyCiphertext;
                KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
                DecryptResponse response = client.decryptWithOptions(request, runtimeOptions);
                if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
                    dataKeyMap.put(dataKeyCiphertext, base64.decode(response.getBody().plaintext));
                }
            }
        }
        return dataKeyMap.get(dataKeyCiphertext);
    }


    @Override
    public void close() throws UDFException, IOException {
        super.close();
    }
}

Key methods:

MethodDescription
setupReads the configuration file, initializes the KMS instance SDK, and sets up the LRU cache
getDataKeyPlaintextRetrieves the plaintext data key, using the LRU cache to avoid redundant KMS Decrypt calls
evaluateDecrypts a field value using the plaintext data key

KMS API operations used:

  • KMS instance API: Decrypt (decrypt the data key ciphertext)

For full API and SDK documentation, see API references and SDK references.

Step 6: Register the UDFs in MaxCompute

Important

Before creating a UDF, add the JAR package of the UDF implementation class to your MaxCompute project. Remove any UDF (and its JAR package and CREATE statement) that your workflow does not need.

-- Add JAR packages to the MaxCompute project
ADD JAR encrypt_udf.jar;
ADD JAR sync_key_encrypt_udf.jar;
ADD JAR decrypt_udf.jar;
-- Add the configuration file
ADD FILE kms_udf_conf.properties;
-- Add the KMS instance client key file
ADD FILE clientkey_xxxx.json;
-- Add the KMS instance CA certificate
ADD FILE PrivateKmsCA_xxxx.pem;
-- Create the standard encryption UDF
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
-- Create the high-performance encryption UDF
CREATE FUNCTION sync_key_encrypt_udf AS 'com.aliyun.kms.sample.SynchronousKeyEncryptUDF'
USING 'sync_key_encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
-- Create the decryption UDF
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
Implementation classJAR packageUDF nameDescription
EncryptUDF.javaencrypt_udf.jarencrypt_udfStandard encryption UDF
SynchronousKeyEncryptUDF.javasync_key_encrypt_udf.jarsync_key_encrypt_udfHigh-performance encryption UDF
DecryptUDF.javadecrypt_udf.jardecrypt_udfDecryption UDF

Step 7: Configure variables

All UDF configuration is stored in kms_udf_conf.properties.

Configuration file for the high-performance UDF

# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
# (SynchronousKeyEncryptUDF only) Specify the name of the generic secret.
udf.synchronous.secret.name=xxxxSynchronousSecretName
# (SynchronousKeyEncryptUDF only) Specify the name of the RAM secret.
udf.ram.secret.name=xxxxRamSecret
# (SynchronousKeyEncryptUDF only) Specify the endpoint of the shared gateway.
udf.kms.endpoint=kms-vpc.cn-xxx.aliyuncs.com

Regular configuration file

# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com

Step 8: Use the UDFs

-- Encrypt a column using the standard UDF
SELECT encrypt_udf(column_name) FROM my_table;
-- Encrypt a column using the high-performance UDF
-- version: the secret version (defaults to null)
SELECT sync_key_encrypt_udf(version, column_name) FROM my_table;
-- Decrypt a column
SELECT decrypt_udf(column_name) FROM my_table;

Sample code in Python

Step 1: Install dependencies

pyodps==0.11.6.2
alibabacloud-kms-python-sdk==1.1.3

Step 2: Define the kms_udf_utils.py class

# coding=utf-8
import uuid
from collections import OrderedDict


def is_null(value):
    return len(value) == 0 or value == '\\N'


def generate_uuid(key_version):
    return uuid.uuid5(uuid.NAMESPACE_DNS, key_version)


def get_config_value(config_dict, dict_key):
    if dict_key not in config_dict:
        raise ValueError("can not find key[{}]" % dict_key)
    return config_dict[dict_key]


def properties_to_dict(cache_file):
    config_dict = {}
    for line in cache_file:
        line = line.strip()
        if not line:
            continue
        k, v = line.split("=")
        config_dict[k.strip()] = v.strip()
    return config_dict


def object_to_dict(obj):
    return obj.__dict__


class LRUCache(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()

    def get(self, key):
        if key not in self.cache:
            return -1
        value = self.cache.pop(key)  # Remove the item to update the order
        self.cache[key] = value  # Re-insert it to the end
        return value

    def put(self, key, value):
        if key in self.cache:
            self.cache.pop(key)  # Remove the existing entry to update the order
        elif len(self.cache) == self.capacity:
            self.cache.popitem(last=False)  # Remove the least recently used item
        self.cache[key] = value

    def contains_key(self, key):
        return key in self.cache

Step 3: Define the kms_udf_constant.py class

# coding=utf-8
UDF_CONFIG_CACHE_FILE = "kms_udf_conf.properties"
KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file"
KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password"
KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint"
KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file"
KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name"
KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name"
KMS_REGION_KEY = "udf.kms.regionId"
UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId"
ENCRYPT_DATA_KEY_FORMAT = "%04d"
GCM_IV_LENGTH = 12
GCM_TAG_LENGTH = 16
DEFAULT_NUMBER_OF_BYTES = 32

Parameters:

ParameterDescription
UDF_CONFIG_CACHE_FILEName of the configuration file
KMS_CLIENT_KEY_FILE_KEYPath to the client key file
KMS_CLIENT_KEY_PASSWORD_KEYPassword of the client key
KMS_INSTANCE_ENDPOINT_KEYEndpoint of the KMS instance
KMS_INSTANCE_CA_FILE_KEYPath to the KMS instance CA certificate
KMS_SYNCHRONOUS_SECRET_NAME_KEYName of the generic secret
KMS_RAM_SECRET_NAME_KEYName of the RAM secret
UDF_ENCRYPT_KEY_ID_KEYID of the key managed by KMS

Step 4: Create an encryption UDF

Define the encrypt_udf.py class:

Important

This example loads credentials from a properties file for demonstration. In production, store sensitive values (client key content, passwords) outside the project and control access via KMS instance permissions.

# coding=utf-8
import base64

from alibabacloud_kms20160120.models import GenerateDataKeyRequest
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import random
import string
import kms_udf_constant
import kms_udf_utils


class EncryptUDF(object):

    def __init__(self):
        cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
        self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
        cache_file.close()
        kms_config = KmsConfig(
            protocol='https',
            client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
            password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
            endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
        )
        self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
        self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
        self.instance_client = TransferClient(kms_config=kms_config)
        resp = self.generate_data_key()
        # Output format: [4-byte key length][data key ciphertext]
        self.encrypted_data_key_part = (kms_udf_constant.ENCRYPT_DATA_KEY_FORMAT %
                                        len(resp.body.ciphertext_blob) + resp.body.ciphertext_blob)
        self.plaintext = resp.body.plaintext
        self.ciphertext_blob = resp.body.ciphertext_blob

    def evaluate(self, data):
        if kms_udf_utils.is_null(data):
            return data
        iv = bytes(''.join(random.sample(string.ascii_letters + string.digits, kms_udf_constant.GCM_IV_LENGTH)),
                   encoding="utf-8")
        data_bytes = data.encode("utf-8")
        encryptor = Cipher(
            algorithms.AES(base64.b64decode(self.plaintext)),
            modes.GCM(iv),
        ).encryptor()
        ciphertext = encryptor.update(data_bytes) + encryptor.finalize()
        # Output format: [4-byte key length][data key ciphertext][base64(iv)][base64(ciphertext+tag)]
        return self.encrypted_data_key_part + str(base64.b64encode(iv), "utf-8") + str(
            base64.b64encode(ciphertext + encryptor.tag), "utf-8")

    def generate_data_key(self):
        request = GenerateDataKeyRequest()
        request.key_id = self.keyId
        request.number_of_bytes = kms_udf_constant.DEFAULT_NUMBER_OF_BYTES
        runtime = KmsRuntimeOptions(
            ca=self.ca_file_path
        )
        return self.instance_client.generate_data_key_with_options(request, runtime)

KMS API operations used:

For full API and SDK documentation, see API references and SDK references.

Step 5: Create a decryption UDF

Define the decrypt_udf.py class:

# coding=utf-8
import base64

from alibabacloud_kms20160120 import models as kms_20160120_models
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file

import kms_udf_constant
import kms_udf_utils
from kms_udf_utils import LRUCache


class DecryptUDF(object):

    def __init__(self):
        cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
        self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
        cache_file.close()
        kms_config = KmsConfig(
            protocol='https',
            client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
            password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
            endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
        )
        self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
        self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
        self.instance_client = TransferClient(kms_config=kms_config)
        # Cache up to 1,000 plaintext data keys to avoid repeated KMS Decrypt calls
        self.data_key_cache = LRUCache(1000)

    def evaluate(self, data):
        if kms_udf_utils.is_null(data):
            return data
        if len(data) < 4:
            return data
        # Read the 4-byte prefix to get the data key ciphertext length
        data_key_len = int(data[0:4])
        if len(data) < 4 + data_key_len:
            return data
        # Extract the data key ciphertext segment
        data_key_ciphertext = data[4:4 + data_key_len]
        if not self.data_key_cache.contains_key(data_key_ciphertext):
            decrypt_request = kms_20160120_models.DecryptRequest(
                ciphertext_blob=data_key_ciphertext
            )
            decrypt_runtime = KmsRuntimeOptions(
                ca=self.ca_file_path
            )
            decrypt_response = self.instance_client.decrypt_with_options(decrypt_request, decrypt_runtime)
            self.data_key_cache.put(data_key_ciphertext, base64.b64decode(decrypt_response.body.plaintext))
        data_key = self.data_key_cache.get(data_key_ciphertext)
        # Decode the combined IV + ciphertext + tag segment
        envelope_cipher_bytes = base64.b64decode(data[4 + data_key_len:])
        iv = envelope_cipher_bytes[0:kms_udf_constant.GCM_IV_LENGTH]
        cipher_bytes = envelope_cipher_bytes[
                   kms_udf_constant.GCM_IV_LENGTH:len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH]
        tag = envelope_cipher_bytes[len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH:]
        decryptor = Cipher(algorithms.AES(data_key), modes.GCM(iv, tag)).decryptor()
        return str(decryptor.update(cipher_bytes) + decryptor.finalize())

KMS API operations used:

  • KMS instance API: Decrypt (decrypt the data key ciphertext)

For full API and SDK documentation, see API references and SDK references.

Step 6: Register the UDFs in MaxCompute

Important

Remove any UDF (and its source file and CREATE statement) that your workflow does not need.

-- Add source files to the resource
ADD PY decrypt_udf.py;
ADD PY encrypt_udf.py;
-- Add the configuration file
ADD FILE kms_udf_conf.properties;
-- Add the KMS instance client key file
ADD FILE clientkey_xxxx.json;
-- Add the KMS instance CA certificate
ADD FILE PrivateKmsCA_xxxx.pem;
-- Create the encryption UDF
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
-- Create the decryption UDF
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
DependencyUDF nameDescription
encrypt_udf.pyencrypt_udfEncryption UDF
decrypt_udf.pydecrypt_udfDecryption UDF

Step 7: Configure variables

# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com

Step 8: Use the UDFs

-- Encrypt a column
SELECT encrypt_udf(column_name) FROM my_table;
-- Decrypt a column
SELECT decrypt_udf(column_name) FROM my_table;

What's next