All Products
Document Center

OpenSearch:Demo code for pushing behavioral data

Last Updated:Mar 13, 2024

Configure environment variables


  • The AccessKey pair of an Alibaba Cloud account can be used to access all API operations. We recommend that you use a Resource Access Management (RAM) user to call API operations or perform routine O&M. For information about how to use a RAM user, see Create a RAM user.

  • For information about how to create an AccessKey pair, see Create an AccessKey pair.

  • If you use the AccessKey pair of a RAM user, make sure that the required permissions are granted to the AliyunServiceRoleForOpenSearch role by using your Alibaba Cloud account. For more information, see AliyunServiceRoleForOpenSearch and Access authorization rules.

  • We recommend that you do not include your AccessKey pair in materials that are easily accessible to others, such as the project code. Otherwise, your AccessKey pair may be leaked and resources in your account become insecure.

  • Linux and macOS

    Run the following commands. Replace <access_key_id> and <access_key_secret> with the AccessKey ID and AccessKey secret of the RAM user that you use.

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> 
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows

    1. Create an environment variable file, add the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables to the file, and then set the environment variables to your AccessKey ID and AccessKey secret.

    2. Restart Windows for the AccessKey pair to take effect.

Add dependencies

pip install alibabacloud_tea_util 
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials

Sample code:

# -*- coding: utf-8 -*-

import time, os
from typing import Dict, Any

from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models

from BaseRequest import Config, Client

class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
        self.header = {}

    def behaviorBulk(self, app_name: str, collections_name: str, doc_content: list) -> Dict[str, Any]:
            response = self.Clients._request(method="POST",
                                             pathname=f'/v3/openapi/app-groups/{app_name}/data-collections/{collections_name}/data-collection-type/BEHAVIOR/actions/bulk',query={},headers = self.header,
                                             body=doc_content, runtime=self.runtime)
            return response
        except Exception as e:

if __name__ == "__main__":

    # Specify the endpoint of the OpenSearch API.
    endpoint = "<endpoint>"

    # Specify the request protocol. Valid values: HTTPS and HTTP.
    endpoint_protocol = "HTTP"

    # Specify your AccessKey pair.
    # Obtain the AccessKey ID and AccessKey secret from environment variables. 
    # You must configure environment variables before you run this code. For more information, see the "Configure environment variables" section of this topic.
    access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
    access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    # Specify the authentication method. Default value: access_key. A value of sts indicates authentication based on Resource Access Management (RAM) and Security Token Service (STS).
    # Valid values: sts and access_key.
    auth_type = "sts"

    # If you use authentication based on RAM and STS, you must specify the security_token parameter. You can call the AssumeRole operation of Alibaba Cloud RAM to obtain an STS token.
    security_token = "<security_token>"

 # Specify common request parameters.
    Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
                     security_token=security_token, type=auth_type, protocol=endpoint_protocol)

    # Create an OpenSearch instance.
    ops = opensearch(Configs)
    app_name = "app_name"
  # --------------- Push behavior logs ---------------

    # item_id: the ID of the primary key returned in search results.
    item_id = "358713"

    # ops_request_misc: the request miscellaneous information returned in search results.
    ops_request_misc = "%7B%22request%5Fid%22%3A%22161777635816780357273903%22%2C%22scm%22%3A%2220140713.130149759..%22%7D"

    # bhv_type: the event type of the behavioral data. Valid values:
    # expose: exposes a commodity.
    # cart: adds a commodity to the shopping cart.
    # collect: adds a commodity to favorites.
    # like: likes a commodity.
    # comment: posts a comment on a commodity.
    # buy: purchases a commodity.
    # click: clicks or views a commodity.
    bhv_type = "click"

    # request_id: the request ID returned in search results.
    request_id = "161777635816780357273903"

    # reach_time: The time when the data is received by the server. The value is a UNIX timestamp that is accurate to the second.
    reach_time = "1709708439"

    # user_id: the unique ID of the user who sent the request. 
    # * In most cases, the ID is that of the user that has logged on. 
    # * If the user sent the request from a PC client and has not logged on, the ID is the cookie ID.
    user_id = "a7a0d37c824b659f36a5b9e3b819fcdd"
    behavior_fields1 = behavior_fields2 = {
        "item_id": item_id,
        "sdk_type": "opensearch_sdk",
        "sdk_version": "<sdk_version>", # The version number of the current OpenSearch SDK for Python, which is 3.2.0.
        {"trace_id", "ALIBABA"}, # The service provider.
        "trace_info": ops_request_misc,
        "bhv_type": bhv_type,
        "item_type": "item",
        "rn": request_id,
        "biz_id":"<biz_id>", # The numerical ID that is used by mobile applications or application clients to differentiate business. This parameter can be associated with OpenSearch applications and Artificial Intelligence Recommendation (AIRec) instances.
        "reach_time": reach_time,
        "user_id": user_id,

    behavior_documents = [{"cmd": "add", "fields": behavior_fields1}, {"cmd": "add", "fields": behavior_fields2}]
    res6 = ops.behaviorBulk(app_name=app_name, collections_name=app_name, doc_content=behavior_documents)

For more information, see Push collected data.