All Products
Search
Document Center

OpenSearch:Data push demo

Last Updated:May 13, 2025

This topic provides sample code to show how to use OpenSearch Retrieval Engine Edition SDK for Python clients to synchronize data to an OpenSearch Retrieval Engine Edition instance in real time. You can upload and delete documents.

Upload a document

from alibabacloud_ha3engine import models
from alibabacloud_ha3engine.client import Client
from alibabacloud_ha3engine.models import Config
from alibabacloud_tea_util import models as util_models
from Tea.exceptions import TeaException, RetryError

config = Config(
    # The API endpoint of the instance. You can view the API endpoint in the API Endpoint section of the Instance Details page.
    endpoint="http://ha-cn-i7*****605.public.ha.aliyuncs.com",
    # The username. You can view the username in the API Endpoint section of the Instance Details page.
    access_user_name="username",
    # The password. You can modify the password in the API Endpoint section of the Instance Details page.
    access_pass_word="password")
# Initialize the engine client.
client = Client(config)

# If the request takes an extended period of time to complete, you can configure this parameter to increase the wait time for the request. Unit: milliseconds.
runtime = util_models.RuntimeOptions(
    connect_timeout=5000,
    read_timeout=10000,
    autoretry=False,
    ignore_ssl=False,
    max_idle_conns=50
)
client._runtime_options = runtime

def pushDoc():
    optionsHeaders = {}
    # The table name of the document whose data is to be pushed.
    tableName = "<table_name>"
    # The primary key field of the document whose data is to be pushed.
    pkField = "<field_pk>"
    try:
        # The outer structure that is used to push document data. You can specify one or more document operations in the structure.
        documentArrayList = []

        # The document to be uploaded.
        # If the document already exists, the existing document is deleted, and the specified document is uploaded. 
        # =====================================================
        add2Document = dict()
        add2DocumentFields = dict()

        # The content of the document. Keys must be paired with values.
        # The value of the field_pk field must be the same as the value of the pkField field.
        add2DocumentFields.__setitem__("<field_pk>", "<field_pk_value>")
        add2DocumentFields.__setitem__("<field_map_key_1>", "<field_map_value_1>")
        add2DocumentFields.__setitem__("<field_map_key_2>", "<field_map_value_2>")

        # The content can be of multi-value attribute types supported by OpenSearch Retrieval Engine Edition.
        # Set multi_value to true in the index.
        addDocmentMultifieldsList = []
        addDocmentMultifieldsList.append("multi_value_1")
        addDocmentMultifieldsList.append("multi_value_2")
        add2DocumentFields.__setitem__("<multi_value_key>", addDocmentMultifieldsList)

        # Add the document content to an add2Document structure.
        add2Document.__setitem__("fields", add2DocumentFields)
        # Run the add command to upload the document.
        add2Document.__setitem__("cmd", "add")

        # Update a document.
        # Update a document. Only attribute fields can be updated. 
        # =====================================================
        update2Document = dict()
        update2DocumentFields = dict()

        # The content of the document. Keys must be paired with values.
        # The value of the field_pk field must be the same as the value of the pkField field.
        update2DocumentFields.__setitem__("<field_pk>", "<field_pk_value>")
        update2DocumentFields.__setitem__("<field_map_key_1>", "<field_map_value_1>")
        update2DocumentFields.__setitem__("<field_map_key_2>", "<field_map_value_2>")

        # The content can be of multi-value attribute types supported by OpenSearch Retrieval Engine Edition.
        # Set both multi_value and updatable_multi_value to true in the index.
        updateDocmentMultifieldsList = []
        updateDocmentMultifieldsList.append("multi_value_1")
        updateDocmentMultifieldsList.append("multi_value_2")
        add2DocumentFields.__setitem__("<multi_value_key>", updateDocmentMultifieldsList)

        # Add the document content to an update2Document structure.
        update2Document.__setitem__("fields", update2DocumentFields)
        # Run the update_field command to update the document.
        update2Document.__setitem__("cmd", "update_field")

        documentArrayList.append(add2Document)
        documentArrayList.append(update2Document)

        pushDocumentsRequestModel = models.PushDocumentsRequestModel(optionsHeaders, documentArrayList)
        # By default, whether the primary key field exists is checked when data is pushed. To disable the check, set the request header X-Opensearch-Validate-Data to false.
        # pushDocumentsRequestModel.headers = {"X-Opensearch-Validate-Data": "false"}

        # Use the default runtime parameters for the request.
        responseModel = client.push_documents(tableName, pkField, pushDocumentsRequestModel)
        print(responseModel)
    except TeaException as e:
        print(f"send request with TeaException : {e}")
    except RetryError as e:
        print(f"send request with Connection Exception  : {e}")

if __name__ == "__main__":
    pushDoc()

Delete a document

from alibabacloud_ha3engine import models, client
from alibabacloud_ha3engine.models import Config
from alibabacloud_ha3engine.client import Client
from Tea.exceptions import TeaException, RetryError

config = Config(
    # The API endpoint of the instance. You can view the API endpoint in the API Endpoint section of the Instance Details page.
    endpoint="http://ha-cn-i7*****605.public.ha.aliyuncs.com",
    # The username. You can view the username in the API Endpoint section of the Instance Details page.
    access_user_name="username",
    # The password. You can modify the password in the API Endpoint section of the Instance Details page.
    access_pass_word="password")
# Initialize the engine client.
client = Client(config)

def pushDoc():
    optionsHeaders = {}

    # The table name of the document whose data is to be pushed.
    tableName = "<table_name>"
    # The primary key field of the document whose data is to be pushed.
    pkField = "<field_pk>"

    try:
        # The outer structure that is used to push document data. You can specify one or more document operations in the structure.
        documentArrayList = []

        # Delete a document.
        # When you delete a document, you must specify the primary key field of the document. If multi-level hash partitioning is performed to build an index, you must specify the primary key field of each partition level. 
        delete2Document = dict()
        delete2DocumentFields = dict()

        # The content of the document. Keys must be paired with values.
        # The value of the field_pk field must be the same as the value of the pkField field.
        delete2DocumentFields.__setitem__("<field_pk>", "<field_pk_value>")

        # Add the document content to a delete2Document structure.
        delete2Document.__setitem__("fields", delete2DocumentFields)
        # Run the delete command to delete the document.
        delete2Document.__setitem__("cmd", "delete")

        documentArrayList.append(delete2Document)

        pushDocumentsRequestModel = models.PushDocumentsRequestModel(
            optionsHeaders, documentArrayList
        )

        # Use the default runtime parameters for the request.
        responseModel = client.push_documents(tableName, pkField, pushDocumentsRequestModel)
        print(responseModel)

    except TeaException as e:
        print(f"send request with TeaException : {e}")
    except RetryError as e:
        print(f"send request with Connection Exception  : {e}")

if __name__ == "__main__":
    pushDoc()