All Products
Search
Document Center

OpenSearch:Demo code for pushing documents

Last Updated:Mar 01, 2025

Configure environment variables

Configure the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables.

Important
  • The AccessKey pair of an Alibaba Cloud account can be used to access all API operations. We recommend that you use a Resource Access Management (RAM) user to call API operations or perform routine O&M. For information about how to use a RAM user, see Create a RAM user.

  • For information about how to create an AccessKey pair, see Create an AccessKey pair.

  • If you use the AccessKey pair of a RAM user, make sure that the required permissions are granted to the AliyunServiceRoleForOpenSearch role by using your Alibaba Cloud account. For more information, see AliyunServiceRoleForOpenSearch and Access authorization rules.

  • We recommend that you do not include your AccessKey pair in materials that are easily accessible to others, such as the project code. Otherwise, your AccessKey pair may be leaked and resources in your account become insecure.

  • Linux and macOS

    Run the following commands. Replace <access_key_id> and <access_key_secret> with the AccessKey ID and AccessKey secret of the RAM user that you use.

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> 
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows

    1. Create an environment variable file, add the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables to the file, and then set the environment variables to your AccessKey ID and AccessKey secret.

    2. Restart Windows for the AccessKey pair to take effect.

Add dependencies

pip install alibabacloud_tea_util 
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials

For information about BaseRequest, see Sample code for the Python client.

Sample code

# -*- coding: utf-8 -*-
import time,os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client
class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
            connect_timeout=10000,
            read_timeout=10000,
            autoretry=False,
            ignore_ssl=False,
            max_idle_conns=50,
            max_attempts=3
        )
        self.header = {}
    def docBulk(self, app_name: str, table_name: str, doc_content: list) -> Dict[str, Any]:
        try:
            response = self.Clients._request(method="POST",
                                             pathname=f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk',query={},headers = self.header,
                                             body=doc_content, runtime=self.runtime)
            return response
        except Exception as e:
            print(e)
if __name__ == "__main__":
    # Specify the endpoint of the OpenSearch API. The value does not contain the http:// prefix.
    endpoint = "<endpoint>"
    # Specify the request protocol. Valid values: HTTPS and HTTP.
    endpoint_protocol = "HTTP"
    # Specify your AccessKey pair.
    # Obtain the AccessKey ID and AccessKey secret from environment variables. 
    # You must configure environment variables before you run this code. For more information, see the "Configure environment variables" section of this topic.
    access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
    access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    # Specify the authentication method. Default value: access_key. A value of sts indicates authentication based on Resource Access Management (RAM) and Security Token Service (STS).
    # Valid values: sts and access_key.
    auth_type = "sts"
    # If you use authentication based on RAM and STS, you must specify the security_token parameter. You can call the AssumeRole operation of Alibaba Cloud RAM to obtain an STS token.
    security_token = "<security_token>"
    # Specify common request parameters.
    # Note: The security_token and type parameters are required only if you use the SDK as a RAM user.
    Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
                     security_token=security_token, type=auth_type, protocol=endpoint_protocol)
    # Create an OpenSearch instance.
    ops = opensearch(Configs)
    app_name = "app_name"
    table_name = "table_name"
    
  # --------------- Push documents ---------------
    
    # You can specify the timestamp field to customize the order of document updates. OpenSearch uses the values of this field to update documents with the same primary key field in the specified order.
    # If this field is not specified, OpenSearch updates documents based on the order in which OpenSearch receives the documents. 
    document1 = {"cmd": "ADD", "timestamp": int(time.time() * 1000), "fields": {"id": "1", "title": "opensearch"}}
    document2 = {"cmd": "ADD", "fields": {"id": 2, "describe": "123456"}}
    # Delete documents.
    deletedoc={"cmd": "DELETE", "fields": {"id": 2}}
    # Update documents.
    updatedoc={"cmd": "UPDATE", "fields": {"id": 2, "describe": "6666","title": "OpenSearch"}}
    documents = [document1,document2]
    res5 = ops.docBulk(app_name=app_name, table_name=table_name, doc_content=documents)
    print(res5)
Note

For more information, see Process data.