RRSA lets you bind an independent RAM role to each pod in an ACK cluster for fine-grained permission isolation. Instead of hard coding an AccessKey, your application uses a mounted OpenID Connect (OIDC) token to request temporary identity credentials from the Security Token Service (STS). This method provides secure access to ApsaraMQ for RabbitMQ and reduces the risk of AccessKey leaks.
How it works
-
Submit a pod: Deploy a pod with the service account token volume projection feature enabled.
-
Mount automatically: The ACK cluster automatically creates and mounts a signed OIDC token file for the pod.
-
Assume a role: The application in the pod reads the token and calls the STS
AssumeRoleWithOIDCAPI. This securely obtains temporary identity credentials for the specified RAM role. -
Establish a connection: The application generates a dynamic username and password from the temporary identity credentials. It then establishes a connection to ApsaraMQ for RabbitMQ to send and receive messages and perform metadata operations.
Scope
The RRSA feature is available only for clusters of version 1.22 or later. This includes ACK Managed Cluster Basic Edition, ACK Managed Cluster Pro Edition, ACK Serverless Cluster Basic Edition, ACK Serverless Cluster Pro Edition, and ACK Edge Cluster Pro Edition.
Step 1: Enable the RRSA feature
Enable the RRSA feature for your cluster and inject the following configurations into the pod. For more information, see Pod permission isolation based on RRSA.
|
Category |
Configuration item name |
Description |
|
Environment variable |
ALIBABA_CLOUD_ROLE_ARN |
The ARN of the RAM role to assume. |
|
ALIBABA_CLOUD_OIDC_PROVIDER_ARN |
The ARN of the OIDC IdP. |
|
|
ALIBABA_CLOUD_OIDC_TOKEN_FILE |
The file path that contains the OIDC token. |
Step 2: Obtain temporary identity credentials
Next, you can obtain the temporary identity credentials based on the environment variables injected into the pod. The following code provides an example:
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
def get_sts_credentials(region_id):
# Get the three authentication-related pieces of information from the environment variables
oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')
if not oidc_provider_arn:
raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
if not role_arn:
raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
if not token_file_path:
raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")
# Read the content of the token file
try:
with open(token_file_path, 'r') as f:
oidc_token = f.read().strip()
except Exception as e:
raise RuntimeError(f"Failed to read OIDC token file: {e}")
# Call the AssumeRoleWithOIDC API to obtain temporary identity credentials
# Note: Fill in the endpoint based on your region.
config = open_api_models.Config(signature_algorithm='v2')
config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
client = OpenApiClient(config)
params = open_api_models.Params(
action='AssumeRoleWithOIDC',
version='2015-04-01',
protocol='HTTPS',
method='POST',
auth_type='AK',
style='RPC',
pathname='/',
req_body_type='json',
body_type='json'
)
queries = {
'OIDCProviderArn': oidc_provider_arn,
'RoleArn': role_arn,
'OIDCToken': oidc_token,
'RoleSessionName': 'test'
}
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(queries)
)
runtime = util_models.RuntimeOptions()
response = client.call_api(params, request, runtime)
body = response.get('body')
if not body:
raise RuntimeError("Response body is empty")
credentials = body.get('Credentials')
if not credentials:
raise RuntimeError("Credentials not found in response")
# Print the obtained temporary accessKeyId, accessKeySecret, and securityToken
security_token = credentials.get('SecurityToken')
access_key_id = credentials.get('AccessKeyId')
access_key_secret = credentials.get('AccessKeySecret')
print("SecurityToken:", security_token)
print("AccessKeyId:", access_key_id)
print("AccessKeySecret:", access_key_secret)
return access_key_id, access_key_secret, security_token
Step 3: Generate a username and password
Next, you can use the temporary identity credentials to generate a username and password for accessing ApsaraMQ for RabbitMQ. The following code provides an example:
def get_user_name(ak, instance_id, sts_token):
# Concatenate the instance ID, AK, and STS token, and then perform Base64 encoding to get the username
buf = f"0:{instance_id}:{ak}:{sts_token}"
return base64.b64encode(buf.encode('utf-8')).decode('utf-8')
def get_password(sk):
# Get the current timestamp
timestamp = int(time.time() * 1000)
# Use HmacSHA1 to hash the SK and the timestamp
signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
# Concatenate the hash result with the time and return the Base64-encoded result
result = f"{signature}:{timestamp}"
return base64.b64encode(result.encode('utf-8')).decode('utf-8')
def hmac_sha1(data, key):
mac = hmac.new(key, data, hashlib.sha1)
return mac.digest().hex().upper()
-
This username and password are not displayed in the console. They are temporary and have the same lifecycle as the temporary identity credentials.
-
For Java, you can use the
com.alibaba.mq.amqp.utils.UserUtilsclass to generate the username and password. For more information, see Sample code. -
For other languages, use the same logic as the code above to generate the username and password.
Step 4: Establish a connection
After you generate the username and password, use them to establish a connection. The following code provides an example:
import pika
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=rabbitmq_host,
port=port,
virtual_host=vhost,
credentials=credentials
)
)
channel = connection.channel()
print("Connect success.")
Complete code example
The following complete Python code example shows how to:
-
Obtain temporary identity credentials from the pod's environment variables.
-
Create a username and password.
-
Establish a connection to ApsaraMQ for RabbitMQ.
-
Send and receive one message per second.
-
Rotate the temporary identity credentials every 15 seconds.
# -*- coding: utf-8 -*-
import os
import sys
import base64
import hmac
import hashlib
import time
import pika
from datetime import datetime
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
# ==================== Configuration Parameters ====================
# Modify the following configurations based on your environment
# STS-related configurations
REGION_ID = "cn-hangzhou" # Alibaba Cloud region ID
# RabbitMQ connection configurations
RABBITMQ_HOST = "xxx.net.mq.amqp.aliyuncs.com" # RabbitMQ endpoint address
INSTANCE_ID = "rabbitmq-xxx-xxxx" # RabbitMQ instance ID
VHOST = "test" # Virtual host name
PORT = 5672 # Connection port (default for AMQP protocol is 5672)
# Queue and exchange configurations
QUEUE_NAME = "test_queue" # Queue name
EXCHANGE_NAME = '' # Exchange name, an empty string for the default exchange
# Test run configurations
TOTAL_DURATION = 300 # Total run time: 5 minutes (in seconds)
REFRESH_INTERVAL = 15 # STS credential refresh interval: 15 seconds
MESSAGE_INTERVAL = 1 # Message sending interval: 1 second
# ==================================================
def get_sts_credentials(region_id):
"""
Obtain temporary STS credentials through OIDC
Args:
region_id: Alibaba Cloud region ID
Returns:
tuple: (access_key_id, access_key_secret, security_token)
"""
# Get the three authentication-related pieces of information from the environment variables
oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')
if not oidc_provider_arn:
raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
if not role_arn:
raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
if not token_file_path:
raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")
# Read the content of the token file
try:
with open(token_file_path, 'r') as f:
oidc_token = f.read().strip()
except Exception as e:
raise RuntimeError(f"Failed to read OIDC token file: {e}")
# Call the AssumeRoleWithOIDC API to obtain temporary identity credentials
config = open_api_models.Config(signature_algorithm='v2')
config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
client = OpenApiClient(config)
params = open_api_models.Params(
action='AssumeRoleWithOIDC',
version='2015-04-01',
protocol='HTTPS',
method='POST',
auth_type='AK',
style='RPC',
pathname='/',
req_body_type='json',
body_type='json'
)
queries = {
'OIDCProviderArn': oidc_provider_arn,
'RoleArn': role_arn,
'OIDCToken': oidc_token,
'RoleSessionName': 'test'
}
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(queries)
)
runtime = util_models.RuntimeOptions()
response = client.call_api(params, request, runtime)
body = response.get('body')
if not body:
raise RuntimeError("Response body is empty")
credentials = body.get('Credentials')
if not credentials:
raise RuntimeError("Credentials not found in response")
# Get the temporary accessKeyId, accessKeySecret, and securityToken
security_token = credentials.get('SecurityToken')
access_key_id = credentials.get('AccessKeyId')
access_key_secret = credentials.get('AccessKeySecret')
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] STS Credentials refreshed")
print(f" AccessKeyId: {access_key_id[:10]}...")
print(f" SecurityToken: {security_token[:20]}...")
return access_key_id, access_key_secret, security_token
def get_user_name(ak, instance_id, sts_token):
"""
Generate a RabbitMQ username
Args:
ak: AccessKey ID
instance_id: RabbitMQ instance ID
sts_token: Security Token
Returns:
str: Base64-encoded username
"""
buf = f"0:{instance_id}:{ak}:{sts_token}"
return base64.b64encode(buf.encode('utf-8')).decode('utf-8')
def get_password(sk):
"""
Generate a RabbitMQ password
Args:
sk: AccessKey Secret
Returns:
str: Base64-encoded password
"""
timestamp = int(time.time() * 1000)
signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
result = f"{signature}:{timestamp}"
return base64.b64encode(result.encode('utf-8')).decode('utf-8')
def hmac_sha1(data, key):
"""
Calculate the HMAC-SHA1 signature
Args:
data: The data to sign
key: The signature key
Returns:
str: Uppercase hexadecimal signature string
"""
mac = hmac.new(key, data, hashlib.sha1)
return mac.digest().hex().upper()
def connect_to_rabbitmq(ak, sk, sts, rabbitmq_host, instance_id, vhost, port):
"""
Connect to RabbitMQ
Args:
ak: AccessKey ID
sk: AccessKey Secret
sts: Security Token
rabbitmq_host: RabbitMQ endpoint address
instance_id: RabbitMQ instance ID
vhost: Virtual host name
port: Connection port
Returns:
tuple: (connection, channel)
"""
username = get_user_name(ak, instance_id, sts)
password = get_password(sk)
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=rabbitmq_host,
port=port,
virtual_host=vhost,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
)
channel = connection.channel()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connected to RabbitMQ successfully")
print(f" Host: {rabbitmq_host}:{port}")
print(f" Instance: {instance_id}")
print(f" VHost: {vhost}")
return connection, channel
def close_connection_safely(connection):
"""
Safely close the RabbitMQ connection
Args:
connection: RabbitMQ connection object
"""
try:
if connection and connection.is_open:
connection.close()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connection closed")
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error closing connection: {e}")
def send_receive_messages(connection, channel, queue_name, exchange_name=''):
"""
Send and receive a message
Args:
connection: RabbitMQ connection object
channel: RabbitMQ channel object
queue_name: Queue name
exchange_name: Exchange name
Returns:
bool: Whether the operation was successful
"""
try:
# Make sure the queue exists
channel.queue_declare(queue=queue_name, durable=True)
# Send a message
message = f"Test message at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
channel.basic_publish(
exchange=exchange_name,
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Make the message persistent
)
)
# Receive a message
method_frame, header_frame, body = channel.basic_get(queue=queue_name, auto_ack=True)
if method_frame:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent and received: {body.decode()[:50]}...")
return True
else:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent message (no message in queue to receive)")
return True
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error in send/receive: {e}")
return False
def print_config_summary():
"""Print the configuration summary"""
print(f"=== Configuration Summary ===")
print(f"Region: {REGION_ID}")
print(f"RabbitMQ Host: {RABBITMQ_HOST}:{PORT}")
print(f"Instance ID: {INSTANCE_ID}")
print(f"VHost: {VHOST}")
print(f"Queue: {QUEUE_NAME}")
print(f"Exchange: {EXCHANGE_NAME if EXCHANGE_NAME else '(default)'}")
print(f"Total duration: {TOTAL_DURATION} seconds")
print(f"STS refresh interval: {REFRESH_INTERVAL} seconds")
print(f"Message interval: {MESSAGE_INTERVAL} second(s)")
print()
def print_test_summary(message_count, error_count):
"""
Print the test summary
Args:
message_count: Number of successful messages
error_count: Number of errors
"""
total = message_count + error_count
success_rate = (message_count / total * 100) if total > 0 else 0
print(f"\n=== Test Summary ===")
print(f"Total messages: {message_count}")
print(f"Total errors: {error_count}")
print(f"Success rate: {success_rate:.2f}%")
def main():
"""Main function: Run RabbitMQ connection tests and message sending/receiving"""
print("=== Starting RabbitMQ Test with STS Credentials ===\n")
print_config_summary()
start_time = time.time()
last_refresh_time = 0
connection = None
channel = None
message_count = 0
error_count = 0
try:
# Initially obtain STS credentials and establish a connection
print("=== Initial STS credentials and connection ===")
ak, sk, sts = get_sts_credentials(REGION_ID)
connection, channel = connect_to_rabbitmq(
ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
)
last_refresh_time = time.time()
while True:
current_time = time.time()
elapsed_time = current_time - start_time
# Check if the total run time has been exceeded
if elapsed_time >= TOTAL_DURATION:
print(f"\n=== Test completed ===")
print_test_summary(message_count, error_count)
break
# Check if the STS credentials need to be refreshed
if current_time - last_refresh_time >= REFRESH_INTERVAL:
print(f"\n=== Refreshing STS credentials (elapsed: {int(elapsed_time)}s) ===")
# Close the old connection
close_connection_safely(connection)
try:
# Obtain new credentials and re-establish the connection
ak, sk, sts = get_sts_credentials(REGION_ID)
connection, channel = connect_to_rabbitmq(
ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
)
last_refresh_time = current_time
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Failed to refresh credentials/connection: {e}")
error_count += 1
time.sleep(1)
continue
# Send and receive messages
try:
if send_receive_messages(connection, channel, QUEUE_NAME, EXCHANGE_NAME):
message_count += 1
else:
error_count += 1
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Message operation failed: {e}")
error_count += 1
# Wait for the specified interval
time.sleep(MESSAGE_INTERVAL)
except KeyboardInterrupt:
print(f"\n=== Test interrupted by user ===")
print_test_summary(message_count, error_count)
except Exception as e:
print(f"\n=== Fatal error: {e} ===")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# Clean up resources
close_connection_safely(connection)
print(f"\n=== Resources cleaned up ===")
if __name__ == '__main__':
main()
The expected output is as follows:
