すべてのプロダクト
Search
ドキュメントセンター

DataHub:OGG for Big Data (Kafka)

最終更新日:Jan 12, 2025

Oracle GoldenGate(OGG)for Big Dataは、データベースデータをビッグデータシステムにリアルタイムでストリーミングするためのOracleが提供するツールです。 OGG for Big Dataは、Oracle Database 19c以前のバージョンに適用できます。 OGG for Big DataはKafkaにデータを書き込むことができ、DataHubはKafka Producer/Consumerプロトコルと互換性があります。 したがって、DataHubプラグインに加えて、OGG for Big Dataを使用して、DataHubのKafkaインターフェース経由でOracleデータをDataHubに書き込むことができます。

1. 環境要件

  • Oracleデータベース: 最新バージョンのOGGでサポートされているバージョンです。

  • ソースデータベースのOGG: Oracleデータベースのバージョン以降のバージョンです。 最新バージョンのOGGを使用することをお勧めします。

  • デスティネーションデータベースのOGG for Big Data: ソースデータベースのOGGのバージョン以降のバージョンです。 最新バージョンのOGG for Big Dataを使用することをお勧めします。

  • OGGの公式ダウンロードリンク

2. インストール

以下のセクションでは、Oracleの設定方法、およびOGGのインストールと設定方法について説明します。 このトピックで使用されているパラメーター設定は参考用です。 実際の業務運用では、O&Mエンジニアが提供する設定を使用してください。

ソースデータベースでのOGGの設定

ソースデータベースへのOGGのインストール方法の詳細については、Oracle GoldenGateのインストールを参照してください。 ソースデータベースのバージョンがOracle Database 11gの場合は、OGG for Oracleも参照できます。

デスティネーションデータベースでのOGG for Big Dataの設定

1. デスティネーションデータベースにOGG for Big Dataをインストールする

デスティネーションデータベースのOGGはOGG for Big Dataであり、手動でインストールする必要はありません。 インストールパッケージを解凍するだけで済みます。 解凍後、Oracle GoldenGate Software Command Interface(GGSCI)を起動し、create subdirsコマンドを入力して、必要なディレクトリを作成する必要があります。 create subdirsコマンドを実行すると、OGGディレクトリにdirで始まる名前の複数のディレクトリが作成されます。

2. Kafka関連のパラメーターを設定する

a. custom_kafka_producer.propertiesファイルを設定します。

custom_kafka_producer.propertiesファイルは、Kafka Producerパラメーターを設定するために使用されます。 次のファイルの内容は、パラメーターの設定方法の例を示しています。 詳細については、「ドキュメント」をご参照ください。

dirprmディレクトリにあるcustom_kafka_producer.propertiesファイルを編集します。 ファイルが存在しない場合は、作成します。

# Kafka のエンドポイント。次の例では、中国 (杭州) リージョンのエンドポイントが使用されています。必要に応じて値を変更できます。
bootstrap.servers=dh-cn-hangzhou.aliyuncs.com:9092

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# パラメーター設定を調整します。必要に応じてパラメーターを変更するか、デフォルト値を使用できます。
# compression.type=lz4
# batch.size=1024000
# max.request.size=41943040
# message.max.bytes=41943040
# linger.ms=5000
# reconnect.backoff.ms=1000

# デフォルトでは、DataHub の Kafka インターフェースは、データ転送に SASL_SSL プロトコルを使用します。必要な構成を以下に示します。
security.protocol=SASL_SSL
sasl.mechanism=PLAIN

b. kafka_client_producer_jaas.confファイルを設定します。

kafka_client_producer_jaas.confファイルは、DataHubにアクセスするためのAccessKeyペアを設定するために使用されます。 dirprmディレクトリにkafka_client_producer_jaas.confファイルを作成し、ファイルを編集します。

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="accessId"
    password="accessKey";
};

c. kafka.propsファイルを設定します。

kafka.propsファイルは、データを書き込むKafkaトピック、データ形式、およびログ設定を構成するために使用されます。 次のファイルの内容は、ファイルの設定方法の例を示しています。 詳細については、「Oracle GoldenGate for Big Dataの使用」をご参照ください。

dirrpmディレクトリにあるkafka.propsファイルを編集します。 ファイルが存在しない場合は、作成します。

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
# データの書き込み先の Kafka トピック。
gg.handler.kafkahandler.TopicName =kafka_topic_name
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.includeCurrentTimestamp=false
gg.handler.kafkahandler.BlockingSend =true
# トークンには、行 ID に関する情報と、指定したトークン情報が含まれています。
gg.handler.kafkahandler.includeTokens=true

# UPDATE ステートメントを使用して主キーを更新します。
gg.handler.kafkahandler.format.pkUpdateHandling =update
#gg.handler.kafkahandler.format.pkUpdateHandling =delete-insert

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
goldengate.userexit.nochkpt=FALSE

gg.log=log4j
gg.log.level=INFO
gg.report.time=120sec

###Kafka クラスパスの設定 ###
gg.classpath=/xxx/lib/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.auth.login.config=dirprm/kafka_client_producer_jaas.conf

注:

  • DataHubはKafka Consumerプロトコルをサポートしていません。 gg.handler.kafkahandler.SchemaTopicNameパラメーターを設定する場合、Kafka Consumerプロトコルが必要です。 したがって、このパラメーターを設定しないでください。

  • java.security.auth.login.configパラメーターは必須です。 このパラメーターを設定しないと、OGG for Big Dataを起動できません。

3. デスティネーションデータベースのManagerプロセスを設定する

edit params mgrコマンドを実行して、Managerプロセスを設定します。

PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS, MINKEEPDAYS 3

start mgrコマンドを実行して、Managerプロセスを起動します。

4. デスティネーションデータベースのReplicatプロセスを設定する

GGSCIで、edit params mqkafkaコマンドを実行して、Replicatプロセスを設定します。

コマンドのmqkafkaは、Replicatプロセスの名前です。 必要に応じて名前をカスタマイズできます。

REPLICAT mqkafka
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
--MAP QASOURCE.*, TARGET QASOURCE.*;
MAP ogg_test.*, TARGET ogg_test.*;

mqkafkaプロセスを追加して起動します。

Kafkaインターフェースを使用してDataHubトピックを作成することはできません。 Replicatプロセスを開始する前に、DataHubトピックが作成されていることを確認してください。 BLOBタイプのトピックを使用することをお勧めします。 詳細については、「Kafkaとの互換性」をご参照ください。

# mqkafka プロセスを追加します。
add replicat mqkafka, exttrail ./dirdat/st

# mqkafka プロセスを開始します。
start mqkafka

3. 例

Kafkaインターフェースを使用してDataHubに書き込まれたデータにはスキーマがありません。 データがMaxComputeに同期された後、ユーザー定義関数(UDF)を開発してデータを解析できます。 以下のセクションでは、JSON形式でDataHubに書き込まれたデータについて説明します。

1. Oracleテーブルのスキーマ

  SQL> desc orders
   Name                                      Null?    Type
   ----------------------------------------- -------- ----------------------------
   OID                                                NUMBER(38)
   PID                                                VARCHAR2(511)
   NUM                                                VARCHAR2(511)

2. DataHubトピックのスキーマ

データのデモを容易にするために、STRINGタイプの2つのフィールドを含むTUPLEタイプのトピックが使用されます。 BLOBタイプのトピックを使用することをお勧めします。 データにバイナリコンテンツが含まれている場合、TUPLEタイプのトピックを使用するとエラーが発生する可能性があります。

{
    "fields":[
        {
            "name":"key",
            "type":"STRING"
        },
        {
            "name":"val",
            "type":"STRING"
        }
    ]
}

3. データの書き込み

a. SQLスクリプトを実行します。

declare
i number;
op_num number;
begin
          op_num := 1;
          for i in 1..op_num loop
        insert into orders(oid,pid,num) values(i,i+1,i+2);
      end loop;

      for i in 1..op_num loop
        update orders set pid=i*2+1 where oid=i;
      end loop;

      for i in 1..op_num loop
        delete from orders where oid=i;
      end loop;
          commit;
end;

b. ロギングを開始します。

対応するテーブルが設定され、デスティネーションデータベースでReplicatプロセスが開始されると、dirrpt/MQKAFKA_info_log4j.logファイルに次のステートメントが記録されます。 このステートメントは、OGG_TEST.ORDERSテーブルのJSONスキーマが生成されたことを示しています。

INFO 2020-05-29 20:23:55,069 [main] Creating JSON schema for table OGG_TEST.ORDERS in file ./dirdef/OGG_TEST.ORDERS.schema.json

dirdef/OGG_TEST.ORDERS.schema.jsonファイルには、次の内容が含まれています。

$cat dirdef/OGG_TEST.ORDERS.schema.json

{
    "$schema":"http://json-schema.org/draft-04/schema#",
    "title":"OGG_TEST.ORDERS",
    "description":"JSON schema for table OGG_TEST.ORDERS",
    "definitions":{
        "row":{
            "type":"object",
            "properties":{
                "OID":{
                    "type":[
                        "string",
                        "null"
                    ]
                },
                "PID":{
                    "type":[
                        "string",
                        "null"
                    ]
                },
                "NUM":{
                    "type":[
                        "string",
                        "null"
                    ]
                }
            },
            "additionalProperties":false
        },
        "tokens":{
            "type":"object",
            "description":"Token keys and values are free form key value pairs.",
            "properties":{
            },
            "additionalProperties":true
        }
    },
    "type":"object",
    "properties":{
        "table":{
            "description":"The fully qualified table name",
            "type":"string"
        },
        "op_type":{
            "description":"The operation type",
            "type":"string"
        },
        "op_ts":{
            "description":"The operation timestamp",
            "type":"string"
        },
        "current_ts":{
            "description":"The current processing timestamp",
            "type":"string"
        },
        "pos":{
            "description":"The position of the operation in the data source",
            "type":"string"
        },
        "tokens":{
            "$ref":"#/definitions/tokens"
        },
        "before":{
            "$ref":"#/definitions/row"
        },
        "after":{
            "$ref":"#/definitions/row"
        }
    },
    "required":[
        "table",
        "op_type",
        "op_ts",
        "current_ts",
        "pos"
    ],
    "additionalProperties":false
}

c. データをサンプリングします。

SQLスクリプトを実行すると、データはDataHubに書き込まれます。 その後、DataHubコンソールでデータをサンプリングして表示できます。 3つのデータが書き込まれます。 次のセクションは、サンプリングされたデータを示しています。

Shard ID    システム時刻    key (STRING)    val (STRING)
0    2020年 5月 29日 18:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
0    2020年 5月 29日 18:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"U","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064000","pos":"00000002790849451514","before":{"OID":"1","PID":"2","NUM":"3"},"after":{"OID":"1","PID":"3","NUM":"3"}}
0    2020年 5月 29日 18:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"D","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064001","pos":"00000002790849451685","before":{"OID":"1","PID":"3","NUM":"3"}}