Oracle GoldenGate(OGG)for Big Dataは、データベースデータをビッグデータシステムにリアルタイムでストリーミングするためのOracleが提供するツールです。 OGG for Big Dataは、Oracle Database 19c以前のバージョンに適用できます。 OGG for Big DataはKafkaにデータを書き込むことができ、DataHubはKafka Producer/Consumerプロトコルと互換性があります。 したがって、DataHubプラグインに加えて、OGG for Big Dataを使用して、DataHubのKafkaインターフェース経由でOracleデータをDataHubに書き込むことができます。
1. 環境要件
Oracleデータベース: 最新バージョンのOGGでサポートされているバージョンです。
ソースデータベースのOGG: Oracleデータベースのバージョン以降のバージョンです。 最新バージョンのOGGを使用することをお勧めします。
デスティネーションデータベースのOGG for Big Data: ソースデータベースのOGGのバージョン以降のバージョンです。 最新バージョンのOGG for Big Dataを使用することをお勧めします。
2. インストール
以下のセクションでは、Oracleの設定方法、およびOGGのインストールと設定方法について説明します。 このトピックで使用されているパラメーター設定は参考用です。 実際の業務運用では、O&Mエンジニアが提供する設定を使用してください。
ソースデータベースでのOGGの設定
ソースデータベースへのOGGのインストール方法の詳細については、Oracle GoldenGateのインストールを参照してください。 ソースデータベースのバージョンがOracle Database 11gの場合は、OGG for Oracleも参照できます。
デスティネーションデータベースでのOGG for Big Dataの設定
1. デスティネーションデータベースにOGG for Big Dataをインストールする
デスティネーションデータベースのOGGはOGG for Big Dataであり、手動でインストールする必要はありません。 インストールパッケージを解凍するだけで済みます。 解凍後、Oracle GoldenGate Software Command Interface(GGSCI)を起動し、create subdirsコマンドを入力して、必要なディレクトリを作成する必要があります。 create subdirsコマンドを実行すると、OGGディレクトリにdirで始まる名前の複数のディレクトリが作成されます。
2. Kafka関連のパラメーターを設定する
a. custom_kafka_producer.propertiesファイルを設定します。
custom_kafka_producer.propertiesファイルは、Kafka Producerパラメーターを設定するために使用されます。 次のファイルの内容は、パラメーターの設定方法の例を示しています。 詳細については、「ドキュメント」をご参照ください。
dirprmディレクトリにあるcustom_kafka_producer.propertiesファイルを編集します。 ファイルが存在しない場合は、作成します。
# Kafka のエンドポイント。次の例では、中国 (杭州) リージョンのエンドポイントが使用されています。必要に応じて値を変更できます。
bootstrap.servers=dh-cn-hangzhou.aliyuncs.com:9092
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# パラメーター設定を調整します。必要に応じてパラメーターを変更するか、デフォルト値を使用できます。
# compression.type=lz4
# batch.size=1024000
# max.request.size=41943040
# message.max.bytes=41943040
# linger.ms=5000
# reconnect.backoff.ms=1000
# デフォルトでは、DataHub の Kafka インターフェースは、データ転送に SASL_SSL プロトコルを使用します。必要な構成を以下に示します。
security.protocol=SASL_SSL
sasl.mechanism=PLAINb. kafka_client_producer_jaas.confファイルを設定します。
kafka_client_producer_jaas.confファイルは、DataHubにアクセスするためのAccessKeyペアを設定するために使用されます。 dirprmディレクトリにkafka_client_producer_jaas.confファイルを作成し、ファイルを編集します。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};c. kafka.propsファイルを設定します。
kafka.propsファイルは、データを書き込むKafkaトピック、データ形式、およびログ設定を構成するために使用されます。 次のファイルの内容は、ファイルの設定方法の例を示しています。 詳細については、「Oracle GoldenGate for Big Dataの使用」をご参照ください。
dirrpmディレクトリにあるkafka.propsファイルを編集します。 ファイルが存在しない場合は、作成します。
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
# データの書き込み先の Kafka トピック。
gg.handler.kafkahandler.TopicName =kafka_topic_name
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.includeCurrentTimestamp=false
gg.handler.kafkahandler.BlockingSend =true
# トークンには、行 ID に関する情報と、指定したトークン情報が含まれています。
gg.handler.kafkahandler.includeTokens=true
# UPDATE ステートメントを使用して主キーを更新します。
gg.handler.kafkahandler.format.pkUpdateHandling =update
#gg.handler.kafkahandler.format.pkUpdateHandling =delete-insert
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
goldengate.userexit.nochkpt=FALSE
gg.log=log4j
gg.log.level=INFO
gg.report.time=120sec
###Kafka クラスパスの設定 ###
gg.classpath=/xxx/lib/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.auth.login.config=dirprm/kafka_client_producer_jaas.conf注:
DataHubはKafka Consumerプロトコルをサポートしていません。
gg.handler.kafkahandler.SchemaTopicNameパラメーターを設定する場合、Kafka Consumerプロトコルが必要です。 したがって、このパラメーターを設定しないでください。java.security.auth.login.configパラメーターは必須です。 このパラメーターを設定しないと、OGG for Big Dataを起動できません。
3. デスティネーションデータベースのManagerプロセスを設定する
edit params mgrコマンドを実行して、Managerプロセスを設定します。
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS, MINKEEPDAYS 3start mgrコマンドを実行して、Managerプロセスを起動します。
4. デスティネーションデータベースのReplicatプロセスを設定する
GGSCIで、edit params mqkafkaコマンドを実行して、Replicatプロセスを設定します。
コマンドのmqkafkaは、Replicatプロセスの名前です。 必要に応じて名前をカスタマイズできます。
REPLICAT mqkafka
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
--MAP QASOURCE.*, TARGET QASOURCE.*;
MAP ogg_test.*, TARGET ogg_test.*;mqkafkaプロセスを追加して起動します。
Kafkaインターフェースを使用してDataHubトピックを作成することはできません。 Replicatプロセスを開始する前に、DataHubトピックが作成されていることを確認してください。 BLOBタイプのトピックを使用することをお勧めします。 詳細については、「Kafkaとの互換性」をご参照ください。
# mqkafka プロセスを追加します。
add replicat mqkafka, exttrail ./dirdat/st
# mqkafka プロセスを開始します。
start mqkafka3. 例
Kafkaインターフェースを使用してDataHubに書き込まれたデータにはスキーマがありません。 データがMaxComputeに同期された後、ユーザー定義関数(UDF)を開発してデータを解析できます。 以下のセクションでは、JSON形式でDataHubに書き込まれたデータについて説明します。
1. Oracleテーブルのスキーマ
SQL> desc orders
Name Null? Type
----------------------------------------- -------- ----------------------------
OID NUMBER(38)
PID VARCHAR2(511)
NUM VARCHAR2(511)2. DataHubトピックのスキーマ
データのデモを容易にするために、STRINGタイプの2つのフィールドを含むTUPLEタイプのトピックが使用されます。 BLOBタイプのトピックを使用することをお勧めします。 データにバイナリコンテンツが含まれている場合、TUPLEタイプのトピックを使用するとエラーが発生する可能性があります。
{
"fields":[
{
"name":"key",
"type":"STRING"
},
{
"name":"val",
"type":"STRING"
}
]
}3. データの書き込み
a. SQLスクリプトを実行します。
declare
i number;
op_num number;
begin
op_num := 1;
for i in 1..op_num loop
insert into orders(oid,pid,num) values(i,i+1,i+2);
end loop;
for i in 1..op_num loop
update orders set pid=i*2+1 where oid=i;
end loop;
for i in 1..op_num loop
delete from orders where oid=i;
end loop;
commit;
end;b. ロギングを開始します。
対応するテーブルが設定され、デスティネーションデータベースでReplicatプロセスが開始されると、dirrpt/MQKAFKA_info_log4j.logファイルに次のステートメントが記録されます。 このステートメントは、OGG_TEST.ORDERSテーブルのJSONスキーマが生成されたことを示しています。
INFO 2020-05-29 20:23:55,069 [main] Creating JSON schema for table OGG_TEST.ORDERS in file ./dirdef/OGG_TEST.ORDERS.schema.jsondirdef/OGG_TEST.ORDERS.schema.jsonファイルには、次の内容が含まれています。
$cat dirdef/OGG_TEST.ORDERS.schema.json
{
"$schema":"http://json-schema.org/draft-04/schema#",
"title":"OGG_TEST.ORDERS",
"description":"JSON schema for table OGG_TEST.ORDERS",
"definitions":{
"row":{
"type":"object",
"properties":{
"OID":{
"type":[
"string",
"null"
]
},
"PID":{
"type":[
"string",
"null"
]
},
"NUM":{
"type":[
"string",
"null"
]
}
},
"additionalProperties":false
},
"tokens":{
"type":"object",
"description":"Token keys and values are free form key value pairs.",
"properties":{
},
"additionalProperties":true
}
},
"type":"object",
"properties":{
"table":{
"description":"The fully qualified table name",
"type":"string"
},
"op_type":{
"description":"The operation type",
"type":"string"
},
"op_ts":{
"description":"The operation timestamp",
"type":"string"
},
"current_ts":{
"description":"The current processing timestamp",
"type":"string"
},
"pos":{
"description":"The position of the operation in the data source",
"type":"string"
},
"tokens":{
"$ref":"#/definitions/tokens"
},
"before":{
"$ref":"#/definitions/row"
},
"after":{
"$ref":"#/definitions/row"
}
},
"required":[
"table",
"op_type",
"op_ts",
"current_ts",
"pos"
],
"additionalProperties":false
}c. データをサンプリングします。
SQLスクリプトを実行すると、データはDataHubに書き込まれます。 その後、DataHubコンソールでデータをサンプリングして表示できます。 3つのデータが書き込まれます。 次のセクションは、サンプリングされたデータを示しています。
Shard ID システム時刻 key (STRING) val (STRING)
0 2020年 5月 29日 18:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
0 2020年 5月 29日 18:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"U","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064000","pos":"00000002790849451514","before":{"OID":"1","PID":"2","NUM":"3"},"after":{"OID":"1","PID":"3","NUM":"3"}}
0 2020年 5月 29日 18:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"D","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064001","pos":"00000002790849451685","before":{"OID":"1","PID":"3","NUM":"3"}}