全部产品
Search
文档中心

OpenSearch:Contoh penggunaan OpenSearch SDK untuk Python

更新时间:Jun 26, 2025

Klien Python contoh

OpenSearch menggunakan layanan gateway internal untuk verifikasi tanda tangan dan bergantung pada Alibaba Cloud TeaDSL SDK untuk penandatanganan. Sebelum menggunakan OpenSearch SDK untuk Python, Anda harus menambahkan dependensi berikut:

pip install alibabacloud_tea_util 
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials

Contoh Permintaan: Kode contoh tersedia dalam file BaseRequest.py.

# -*- coding: utf-8 -*-

import time
from typing import Dict, Any

from Tea.core import TeaCore
from Tea.exceptions import TeaException, UnretryableException
from Tea.model import TeaModel
from Tea.request import TeaRequest
from alibabacloud_credentials import models as credential_models
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_opensearch_util.opensearch_util import OpensearchUtil
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Config(TeaModel):
    """
 Config
 Konfigurasi yang terkait dengan lingkungan.
 """
    def __init__(
            self,
            endpoint: str = None,
            protocol: str = None,
            type: str = None,
            security_token: str = None,
            access_key_id: str = None,
            access_key_secret: str = None,
            user_agent: str = "",
    ):
        self.endpoint = endpoint
        self.protocol = protocol
        self.type = type
        self.security_token = security_token
        self.access_key_id = access_key_id
        self.access_key_secret = access_key_secret
        self.user_agent = user_agent


class Client:
    """
    OpensearchClient
    Klien OpenSearch yang digunakan untuk menentukan parameter permintaan dan mengirim permintaan.
    """
    _endpoint: str = None
    _protocol: str = None
    _user_agent: str = None
    _credential: CredentialClient = None

    def __init__(
            self,
            config: Config,
    ):
        if UtilClient.is_unset(config):
            raise TeaException({
                'name': 'ParameterMissing',
                'message': "'config' tidak boleh kosong"
            })
        if UtilClient.empty(config.type):
            config.type = 'access_key'
        credential_config = credential_models.Config(
            access_key_id=config.access_key_id,
            type=config.type,
            access_key_secret=config.access_key_secret,
            security_token=config.security_token
        )
        self._credential = CredentialClient(credential_config)
        self._endpoint = config.endpoint
        self._protocol = config.protocol
        self._user_agent = config.user_agent

    def _request(
            self,
            method: str,
            pathname: str,
            query: Dict[str, Any],
            headers: Dict[str, str],
            body: Any,
            runtime: util_models.RuntimeOptions,
    ) -> Dict[str, Any]:
        """
        Memproses permintaan.
        :param request: TeaRequest
        :param runtime: util_models.RuntimeOptions
        :return: Dict[str, Any]
        """
        runtime.validate()
        _runtime = {
            'timeouted': 'retry',
            'readTimeout': runtime.read_timeout,
            'connectTimeout': runtime.connect_timeout,
            'httpProxy': runtime.http_proxy,
            'httpsProxy': runtime.https_proxy,
            'noProxy': runtime.no_proxy,
            'maxIdleConns': runtime.max_idle_conns,
            'retry': {
                'retryable': runtime.autoretry,
                'maxAttempts': UtilClient.default_number(runtime.max_attempts, 3)
            },
            'backoff': {
                'policy': UtilClient.default_string(runtime.backoff_policy, 'no'),
                'period': UtilClient.default_number(runtime.backoff_period, 1)
            },
            'ignoreSSL': runtime.ignore_ssl
        }
        _last_request = None
        _last_exception = None
        _now = time.time()
        _retry_times = 0
        while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now):
            if _retry_times > 0:
                _backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times)
                if _backoff_time > 0:
                    TeaCore.sleep(_backoff_time)
            _retry_times = _retry_times + 1
            try:
                _request = TeaRequest()
                accesskey_id = self._credential.get_access_key_id()
                access_key_secret = self._credential.get_access_key_secret()
                security_token = self._credential.get_security_token()
                _request.protocol = UtilClient.default_string(self._protocol, 'HTTP')
                _request.method = method
                _request.pathname = pathname
                _request.headers = TeaCore.merge({
                    'user-agent': UtilClient.get_user_agent(self._user_agent),
                    'Content-Type': 'application/json',
                    'Date': OpensearchUtil.get_date(),
                    'host': UtilClient.default_string(self._endpoint, f'opensearch-cn-hangzhou.aliyuncs.com'),
                    'X-Opensearch-Nonce': UtilClient.get_nonce()
                }, headers)
                if not UtilClient.is_unset(query):
                    _request.query = UtilClient.stringify_map_value(query)
                if not UtilClient.is_unset(body):
                    req_body = UtilClient.to_jsonstring(body)
                    _request.headers['Content-MD5'] = OpensearchUtil.get_content_md5(req_body)
                    _request.body = req_body
                if not UtilClient.is_unset(security_token):
                    _request.headers["X-Opensearch-Security-Token"] = security_token
                _request.headers['Authorization'] = OpensearchUtil.get_signature(_request, accesskey_id,
                                                                                 access_key_secret)
                _last_request = _request
                _response = TeaCore.do_action(_request, _runtime)
                obj_str = UtilClient.read_as_string(_response.body)
                if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code):
                    raise TeaException({
                        'message': _response.status_message,
                        'data': obj_str,
                        'code': _response.status_code
                    })
                obj = UtilClient.parse_json(obj_str)
                res = UtilClient.assert_as_map(obj)
                return {
                    'body': res,
                    'headers': _response.headers
                }
            except TeaException as e:
                if TeaCore.is_retryable(e):
                    _last_exception = e
                    continue
                raise e
        raise UnretryableException(_last_request, _last_exception)