すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:OSS を使用してチェックポイントとセーブポイントを保存する

最終更新日:Jan 11, 2025

このトピックでは、Container Service for Kubernetes (ACK) クラスタにデプロイされた Flink クラスタの Flink ジョブのチェックポイントとセーブポイントを保存するために、オブジェクトストレージサービス (OSS) を構成する方法について説明します。

前提条件

新しい EMR コンソールの EMR on ACK ページで Flink クラスタが作成されます。 詳細については、「はじめに」をご参照ください。

手順

デフォルトでは、OSS データの読み取りと書き込みに使用される依存関係は、Flink クラスタのデフォルトイメージで構成されています。 このトピックで説明されているパラメータのみを構成する必要があります。

  1. kubectl を使用して Alibaba Cloud Container Service for Kubernetes (ACK) クラスタに接続します。 詳細については、「クラスタの kubeconfig ファイルを取得し、kubectl を使用してクラスタに接続する」をご参照ください。

    API オペレーションを呼び出すことで、ACK クラスタに接続することもできます。 詳細については、「Kubernetes API を使用する」をご参照ください。

  2. basic-emr-oss-example.yaml という名前のファイルを作成します。 ファイルには次の情報が含まれています。

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: basic-emr-oss-example
    spec:
      flinkVersion: v1_13
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
        state.savepoints.dir: oss://xxxxx
        state.checkpoints.dir: oss://xxxxx
        fs.oss.endpoint: <endpoint, e.g. oss-cn-hangzhou-internal.aliyuncs.com>  // エンドポイント。例:oss-cn-hangzhou-internal.aliyuncs.com
        fs.oss.accessKeyId: <accessKeyId> // アクセスキー ID
        fs.oss.accessKeySecret: <accessKeySecret> // アクセスキーシークレット
      serviceAccount: flink
      podTemplate:
        spec:
          serviceAccount: flink
          containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-volume
          volumes:
            - name: flink-volume
              emptyDir: {}
    
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
        upgradeMode: stateless
    説明
    • ファイルの名前は変更できます。 この例では、basic-emr-oss-example.yaml が使用されています。

    • この例では、Flink 1.13 が使用されています。 別のバージョンの Flink を使用する場合は、ビジネス要件に基づいて flinkVersion パラメータを構成します。

    ビジネス要件に基づいて、次のパラメータの構成を実際の値に置き換える必要があります。

    パラメータ

    説明

    state.savepoints.dir

    セーブポイントが保存されるディレクトリ。

    state.checkpoints.dir

    チェックポイントが保存されるディレクトリ。

    fs.oss.endpoint

    OSS のエンドポイント。 例:oss-cn-***-internal.aliyuncs.com。

    fs.oss.accessKeyId

    OSS へのアクセスに使用する AccessKey ID。

    fs.oss.accessKeySecret

    OSS へのアクセスに使用する AccessKey シークレット。

  3. 次のコマンドを実行してジョブを送信します。

    kubectl apply -f basic-emr-oss-example.yaml

    ジョブが送信された後、OSS または Flink の Web UI を使用して、チェックポイントを表示、使用、および更新できます。 Flink の Web UI へのアクセス方法の詳細については、「Flink ジョブの Web UI にアクセスする」をご参照ください。