All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Connecting to ApsaraMQ for RabbitMQ using RRSA in ACK clusters

Last Updated:Mar 07, 2026

RRSA lets you bind an independent RAM role to each pod in an ACK cluster for fine-grained permission isolation. Instead of hard coding an AccessKey, your application uses a mounted OpenID Connect (OIDC) token to request temporary identity credentials from the Security Token Service (STS). This method provides secure access to ApsaraMQ for RabbitMQ and reduces the risk of AccessKey leaks.

How it works

  1. Submit a pod: Deploy a pod with the service account token volume projection feature enabled.

  2. Mount automatically: The ACK cluster automatically creates and mounts a signed OIDC token file for the pod.

  3. Assume a role: The application in the pod reads the token and calls the STS AssumeRoleWithOIDC API. This securely obtains temporary identity credentials for the specified RAM role.

  4. Establish a connection: The application generates a dynamic username and password from the temporary identity credentials. It then establishes a connection to ApsaraMQ for RabbitMQ to send and receive messages and perform metadata operations.

Scope

The RRSA feature is available only for clusters of version 1.22 or later. This includes ACK Managed Cluster Basic Edition, ACK Managed Cluster Pro Edition, ACK Serverless Cluster Basic Edition, ACK Serverless Cluster Pro Edition, and ACK Edge Cluster Pro Edition.

Step 1: Enable the RRSA feature

Enable the RRSA feature for your cluster and inject the following configurations into the pod. For more information, see Pod permission isolation based on RRSA.

Category

Configuration item name

Description

Environment variable

ALIBABA_CLOUD_ROLE_ARN

The ARN of the RAM role to assume.

ALIBABA_CLOUD_OIDC_PROVIDER_ARN

The ARN of the OIDC IdP.

ALIBABA_CLOUD_OIDC_TOKEN_FILE

The file path that contains the OIDC token.

Step 2: Obtain temporary identity credentials

Next, you can obtain the temporary identity credentials based on the environment variables injected into the pod. The following code provides an example:

from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient

def get_sts_credentials(region_id):
    # Get the three authentication-related pieces of information from the environment variables
    oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
    role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
    token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')

    if not oidc_provider_arn:
        raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
    if not role_arn:
        raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
    if not token_file_path:
        raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")

    # Read the content of the token file
    try:
        with open(token_file_path, 'r') as f:
            oidc_token = f.read().strip()
    except Exception as e:
        raise RuntimeError(f"Failed to read OIDC token file: {e}")

    # Call the AssumeRoleWithOIDC API to obtain temporary identity credentials
    # Note: Fill in the endpoint based on your region.
    config = open_api_models.Config(signature_algorithm='v2')
    config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
    client = OpenApiClient(config)

    params = open_api_models.Params(
        action='AssumeRoleWithOIDC',
        version='2015-04-01',
        protocol='HTTPS',
        method='POST',
        auth_type='AK',
        style='RPC',
        pathname='/',
        req_body_type='json',
        body_type='json'
    )

    queries = {
        'OIDCProviderArn': oidc_provider_arn,
        'RoleArn': role_arn,
        'OIDCToken': oidc_token,
        'RoleSessionName': 'test'
    }

    request = open_api_models.OpenApiRequest(
        query=OpenApiUtilClient.query(queries)
    )
    runtime = util_models.RuntimeOptions()

    response = client.call_api(params, request, runtime)

    body = response.get('body')
    if not body:
        raise RuntimeError("Response body is empty")

    credentials = body.get('Credentials')
    if not credentials:
        raise RuntimeError("Credentials not found in response")

    # Print the obtained temporary accessKeyId, accessKeySecret, and securityToken
    security_token = credentials.get('SecurityToken')
    access_key_id = credentials.get('AccessKeyId')
    access_key_secret = credentials.get('AccessKeySecret')

    print("SecurityToken:", security_token)
    print("AccessKeyId:", access_key_id)
    print("AccessKeySecret:", access_key_secret)

    return access_key_id, access_key_secret, security_token

Step 3: Generate a username and password

Next, you can use the temporary identity credentials to generate a username and password for accessing ApsaraMQ for RabbitMQ. The following code provides an example:

def get_user_name(ak, instance_id, sts_token):
    # Concatenate the instance ID, AK, and STS token, and then perform Base64 encoding to get the username
    buf = f"0:{instance_id}:{ak}:{sts_token}"
    return base64.b64encode(buf.encode('utf-8')).decode('utf-8')


def get_password(sk):
    # Get the current timestamp
    timestamp = int(time.time() * 1000)
    # Use HmacSHA1 to hash the SK and the timestamp
    signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
    # Concatenate the hash result with the time and return the Base64-encoded result
    result = f"{signature}:{timestamp}"
    return base64.b64encode(result.encode('utf-8')).decode('utf-8')


def hmac_sha1(data, key):
    mac = hmac.new(key, data, hashlib.sha1)
    return mac.digest().hex().upper()
Note
  • This username and password are not displayed in the console. They are temporary and have the same lifecycle as the temporary identity credentials.

  • For Java, you can use the com.alibaba.mq.amqp.utils.UserUtils class to generate the username and password. For more information, see Sample code.

  • For other languages, use the same logic as the code above to generate the username and password.

Step 4: Establish a connection

After you generate the username and password, use them to establish a connection. The following code provides an example:

import pika

credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host=rabbitmq_host,
        port=port,
        virtual_host=vhost,
        credentials=credentials
    )
)
channel = connection.channel()
print("Connect success.")

Complete code example

The following complete Python code example shows how to:

  1. Obtain temporary identity credentials from the pod's environment variables.

  2. Create a username and password.

  3. Establish a connection to ApsaraMQ for RabbitMQ.

  4. Send and receive one message per second.

  5. Rotate the temporary identity credentials every 15 seconds.

# -*- coding: utf-8 -*-
import os
import sys
import base64
import hmac
import hashlib
import time
import pika
from datetime import datetime

from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient


# ==================== Configuration Parameters ====================
# Modify the following configurations based on your environment

# STS-related configurations
REGION_ID = "cn-hangzhou"  # Alibaba Cloud region ID

# RabbitMQ connection configurations
RABBITMQ_HOST = "xxx.net.mq.amqp.aliyuncs.com"  # RabbitMQ endpoint address
INSTANCE_ID = "rabbitmq-xxx-xxxx"  # RabbitMQ instance ID
VHOST = "test"  # Virtual host name
PORT = 5672  # Connection port (default for AMQP protocol is 5672)

# Queue and exchange configurations
QUEUE_NAME = "test_queue"  # Queue name
EXCHANGE_NAME = ''  # Exchange name, an empty string for the default exchange

# Test run configurations
TOTAL_DURATION = 300  # Total run time: 5 minutes (in seconds)
REFRESH_INTERVAL = 15  # STS credential refresh interval: 15 seconds
MESSAGE_INTERVAL = 1  # Message sending interval: 1 second

# ==================================================


def get_sts_credentials(region_id):
    """
    Obtain temporary STS credentials through OIDC
    
    Args:
        region_id: Alibaba Cloud region ID
        
    Returns:
        tuple: (access_key_id, access_key_secret, security_token)
    """
    # Get the three authentication-related pieces of information from the environment variables
    oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
    role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
    token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')

    if not oidc_provider_arn:
        raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
    if not role_arn:
        raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
    if not token_file_path:
        raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")

    # Read the content of the token file
    try:
        with open(token_file_path, 'r') as f:
            oidc_token = f.read().strip()
    except Exception as e:
        raise RuntimeError(f"Failed to read OIDC token file: {e}")

    # Call the AssumeRoleWithOIDC API to obtain temporary identity credentials
    config = open_api_models.Config(signature_algorithm='v2')
    config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
    client = OpenApiClient(config)

    params = open_api_models.Params(
        action='AssumeRoleWithOIDC',
        version='2015-04-01',
        protocol='HTTPS',
        method='POST',
        auth_type='AK',
        style='RPC',
        pathname='/',
        req_body_type='json',
        body_type='json'
    )

    queries = {
        'OIDCProviderArn': oidc_provider_arn,
        'RoleArn': role_arn,
        'OIDCToken': oidc_token,
        'RoleSessionName': 'test'
    }

    request = open_api_models.OpenApiRequest(
        query=OpenApiUtilClient.query(queries)
    )
    runtime = util_models.RuntimeOptions()

    response = client.call_api(params, request, runtime)

    body = response.get('body')
    if not body:
        raise RuntimeError("Response body is empty")

    credentials = body.get('Credentials')
    if not credentials:
        raise RuntimeError("Credentials not found in response")

    # Get the temporary accessKeyId, accessKeySecret, and securityToken
    security_token = credentials.get('SecurityToken')
    access_key_id = credentials.get('AccessKeyId')
    access_key_secret = credentials.get('AccessKeySecret')

    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] STS Credentials refreshed")
    print(f"  AccessKeyId: {access_key_id[:10]}...")
    print(f"  SecurityToken: {security_token[:20]}...")

    return access_key_id, access_key_secret, security_token

    
def get_user_name(ak, instance_id, sts_token):
    """
    Generate a RabbitMQ username
    
    Args:
        ak: AccessKey ID
        instance_id: RabbitMQ instance ID
        sts_token: Security Token
        
    Returns:
        str: Base64-encoded username
    """
    buf = f"0:{instance_id}:{ak}:{sts_token}"
    return base64.b64encode(buf.encode('utf-8')).decode('utf-8')


def get_password(sk):
    """
    Generate a RabbitMQ password
    
    Args:
        sk: AccessKey Secret
        
    Returns:
        str: Base64-encoded password
    """
    timestamp = int(time.time() * 1000)
    signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
    result = f"{signature}:{timestamp}"
    return base64.b64encode(result.encode('utf-8')).decode('utf-8')


def hmac_sha1(data, key):
    """
    Calculate the HMAC-SHA1 signature
    
    Args:
        data: The data to sign
        key: The signature key
        
    Returns:
        str: Uppercase hexadecimal signature string
    """
    mac = hmac.new(key, data, hashlib.sha1)
    return mac.digest().hex().upper()


def connect_to_rabbitmq(ak, sk, sts, rabbitmq_host, instance_id, vhost, port):
    """
    Connect to RabbitMQ
    
    Args:
        ak: AccessKey ID
        sk: AccessKey Secret
        sts: Security Token
        rabbitmq_host: RabbitMQ endpoint address
        instance_id: RabbitMQ instance ID
        vhost: Virtual host name
        port: Connection port
        
    Returns:
        tuple: (connection, channel)
    """
    username = get_user_name(ak, instance_id, sts)
    password = get_password(sk)
    
    credentials = pika.PlainCredentials(username, password)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host=rabbitmq_host,
            port=port,
            virtual_host=vhost,
            credentials=credentials,
            heartbeat=600,
            blocked_connection_timeout=300
        )
    )
    channel = connection.channel()
    
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connected to RabbitMQ successfully")
    print(f"  Host: {rabbitmq_host}:{port}")
    print(f"  Instance: {instance_id}")
    print(f"  VHost: {vhost}")
    
    return connection, channel


def close_connection_safely(connection):
    """
    Safely close the RabbitMQ connection
    
    Args:
        connection: RabbitMQ connection object
    """
    try:
        if connection and connection.is_open:
            connection.close()
            print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connection closed")
    except Exception as e:
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error closing connection: {e}")


def send_receive_messages(connection, channel, queue_name, exchange_name=''):
    """
    Send and receive a message
    
    Args:
        connection: RabbitMQ connection object
        channel: RabbitMQ channel object
        queue_name: Queue name
        exchange_name: Exchange name
        
    Returns:
        bool: Whether the operation was successful
    """
    try:
        # Make sure the queue exists
        channel.queue_declare(queue=queue_name, durable=True)
        
        # Send a message
        message = f"Test message at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=queue_name,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # Make the message persistent
            )
        )
        
        # Receive a message
        method_frame, header_frame, body = channel.basic_get(queue=queue_name, auto_ack=True)
        
        if method_frame:
            print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent and received: {body.decode()[:50]}...")
            return True
        else:
            print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent message (no message in queue to receive)")
            return True
            
    except Exception as e:
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error in send/receive: {e}")
        return False


def print_config_summary():
    """Print the configuration summary"""
    print(f"=== Configuration Summary ===")
    print(f"Region: {REGION_ID}")
    print(f"RabbitMQ Host: {RABBITMQ_HOST}:{PORT}")
    print(f"Instance ID: {INSTANCE_ID}")
    print(f"VHost: {VHOST}")
    print(f"Queue: {QUEUE_NAME}")
    print(f"Exchange: {EXCHANGE_NAME if EXCHANGE_NAME else '(default)'}")
    print(f"Total duration: {TOTAL_DURATION} seconds")
    print(f"STS refresh interval: {REFRESH_INTERVAL} seconds")
    print(f"Message interval: {MESSAGE_INTERVAL} second(s)")
    print()


def print_test_summary(message_count, error_count):
    """
    Print the test summary
    
    Args:
        message_count: Number of successful messages
        error_count: Number of errors
    """
    total = message_count + error_count
    success_rate = (message_count / total * 100) if total > 0 else 0
    
    print(f"\n=== Test Summary ===")
    print(f"Total messages: {message_count}")
    print(f"Total errors: {error_count}")
    print(f"Success rate: {success_rate:.2f}%")


def main():
    """Main function: Run RabbitMQ connection tests and message sending/receiving"""
    
    print("=== Starting RabbitMQ Test with STS Credentials ===\n")
    print_config_summary()
    
    start_time = time.time()
    last_refresh_time = 0
    connection = None
    channel = None
    message_count = 0
    error_count = 0
    
    try:
        # Initially obtain STS credentials and establish a connection
        print("=== Initial STS credentials and connection ===")
        ak, sk, sts = get_sts_credentials(REGION_ID)
        connection, channel = connect_to_rabbitmq(
            ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
        )
        last_refresh_time = time.time()
        
        while True:
            current_time = time.time()
            elapsed_time = current_time - start_time
            
            # Check if the total run time has been exceeded
            if elapsed_time >= TOTAL_DURATION:
                print(f"\n=== Test completed ===")
                print_test_summary(message_count, error_count)
                break
            
            # Check if the STS credentials need to be refreshed
            if current_time - last_refresh_time >= REFRESH_INTERVAL:
                print(f"\n=== Refreshing STS credentials (elapsed: {int(elapsed_time)}s) ===")
                
                # Close the old connection
                close_connection_safely(connection)
                
                try:
                    # Obtain new credentials and re-establish the connection
                    ak, sk, sts = get_sts_credentials(REGION_ID)
                    connection, channel = connect_to_rabbitmq(
                        ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
                    )
                    last_refresh_time = current_time
                except Exception as e:
                    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Failed to refresh credentials/connection: {e}")
                    error_count += 1
                    time.sleep(1)
                    continue
            
            # Send and receive messages
            try:
                if send_receive_messages(connection, channel, QUEUE_NAME, EXCHANGE_NAME):
                    message_count += 1
                else:
                    error_count += 1
            except Exception as e:
                print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Message operation failed: {e}")
                error_count += 1
            
            # Wait for the specified interval
            time.sleep(MESSAGE_INTERVAL)
            
    except KeyboardInterrupt:
        print(f"\n=== Test interrupted by user ===")
        print_test_summary(message_count, error_count)
    except Exception as e:
        print(f"\n=== Fatal error: {e} ===")
        import traceback
        traceback.print_exc()
        sys.exit(1)
    finally:
        # Clean up resources
        close_connection_safely(connection)
        print(f"\n=== Resources cleaned up ===")


if __name__ == '__main__':
    main()

The expected output is as follows:

image