All Products
Search
Document Center

DataHub:OGG for Big Data (Kafka)

Last Updated:Oct 26, 2021

Oracle GoldenGate (OGG) for Big Data is a tool provided by Oracle for streaming database data to big data systems in real time. OGG for Big Data is applicable to Oracle Database 19c and earlier. OGG for Big Data can write data to Kafka, and DataHub is compatible with the Kafka Producer protocol. Therefore, you can write Oracle data to DataHub over the Kafka interface of DataHub by using OGG for Big Data in addition to DataHub plug-ins.

OGG for Big Data is a tool provided by Oracle for streaming database data to big data systems in real time. OGG for Big Data is applicable to Oracle Database 19c and earlier. OGG for Big Data can write data to Kafka, and DataHub is compatible with the Kafka Producer protocol. Therefore, you can write Oracle data to DataHub over the Kafka interface of DataHub by using OGG for Big Data in addition to DataHub plug-ins.

1. Environment requirements

  • Oracle database: The version is supported by the latest version of OGG.

  • OGG in the source database: The version is later than or equal to that of the Oracle database. We recommend that you use the latest version of OGG.

  • OGG for Big Data in the destination database: The version is later than or equal to that of OGG in the source database. We recommend that you use the latest version of OGG for Big Data.

  • Official download link of OGG.

2. Installation

The following part describes how to configure Oracle, and install and configure OGG. The parameter configurations used in this topic are only for your reference. Use the configurations provided by the O&M engineers for actual business operations.

Configure OGG in the source database

For more information about how to install OGG in the source database, see Installing Oracle GoldenGate. If the version of the source database is Oracle Database 11g, you can also refer to OGG for Oracle.

Configure OGG for Big Data in the destination database

1. Install OGG for Big Data in the destination database

OGG in the destination database is OGG for Big Data and does not need to be manually installed. You need only to decompress the installation package. After the decompression, you must start Oracle GoldenGate Software Command Interface (GGSCI) and enter the create subdirs command to create the required directories. After the create subdirs command is run, multiple directories whose names start with dir are created in the OGG directory.

2. Set Kafka-related parameters

a. Configure the custom_kafka_producer.properties file.

The custom_kafka_producer.properties file is used to set Kafka Producer parameters. The following file content provides an example on how to set the parameters. For more information, see Documentation.

Edit the custom_kafka_producer.properties file in the dirprm directory. If the file does not exist, create it.

# The endpoint of Kafka. In the following example, the endpoint in the China (Hangzhou) region is used. You can modify the value as required.
bootstrap.servers=dh-cn-hangzhou.aliyuncs.com:9092

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# Tune the parameter settings. You can modify the parameters as required or use the default values.
# compression.type=lz4
# batch.size=1024000
# max.request.size=41943040
# message.max.bytes=41943040
# linger.ms=5000
# reconnect.backoff.ms=1000

# By default, the Kafka interface of DataHub uses the SASL_SSL protocol for data transfer. The following code shows required configurations:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN

b. Configure the kafka_client_producer_jaas.conf file.

The kafka_client_producer_jaas.conf file is used to configure the AccessKey pair for accessing DataHub. Create a kafka_client_producer_jaas.conf file in the dirprm directory, and then edit the file.

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="accessId"
    password="accessKey";
};

c. Configure the kafka.props file.

The kafka.props file is used to configure the Kafka topic to which data is to be written, data format, and log settings. The following file content provides an example on how to configure the file. For more information, see Using Oracle GoldenGate for Big Data.

Edit the kafka.props file in the dirrpm directory. If the file does not exist, create it.

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
# The Kafka topic to which data is to be written.
gg.handler.kafkahandler.TopicName =kafka_topic_name
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.includeCurrentTimestamp=false
gg.handler.kafkahandler.BlockingSend =true
# The token contains the information about row IDs and the token information that you specify.
gg.handler.kafkahandler.includeTokens=true

# Use the UPDATE statement to update a primary key.
gg.handler.kafkahandler.format.pkUpdateHandling =update
#gg.handler.kafkahandler.format.pkUpdateHandling =delete-insert

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
goldengate.userexit.nochkpt=FALSE

gg.log=log4j
gg.log.level=INFO
gg.report.time=120sec

###Kafka Classpath settings ###
gg.classpath=/xxx/lib/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.auth.login.config=dirprm/kafka_client_producer_jaas.conf

Note:

  • DataHub does not support the Kafka Consumer protocol. The Kafka Consumer protocol is required if you set the gg.handler.kafkahandler.SchemaTopicName parameter. Therefore, do not set this parameter.

  • The java.security.auth.login.config parameter is required. If you do not set this parameter, OGG for Big Data cannot be started.

3. Configure the Manager process in the destination database

Run the edit params mgr command to configure the Manager process.

PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS, MINKEEPDAYS 3

Run the start mgr command to start the Manager process.

4. Configure the Replicat process in the destination database

In GGSCI, run the edit params mqkafka command to configure the Replicat process.

In the command, mqkafka is the name of the Replicat process. You can customize the name as required.

REPLICAT mqkafka
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
--MAP QASOURCE.*, TARGET QASOURCE.*;
MAP ogg_test.*, TARGET ogg_test.*;

Add and start the mqkafka process.

You cannot use the Kafka interface to create a DataHub topic. Before you start the Replicat process, make sure that a DataHub topic is created. We recommend that you use a topic of the BLOB type. For more information, see Compatibility with Kafka.

# Add the mqkafka process.
add replicat mqkafka, exttrail ./dirdat/st

# Start the mqkafka process.
start mqkafka

3. Examples

Data written by using the Kafka interface to DataHub has no schema. After the data is synchronized to MaxCompute, you can develop user-defined functions (UDFs) to parse the data. The following part describes the data written to DataHub in the JSON format.

1. Schema of an Oracle table

  SQL> desc orders
   Name                                      Null?    Type
   ----------------------------------------- -------- ----------------------------
   OID                                                NUMBER(38)
   PID                                                VARCHAR2(511)
   NUM                                                VARCHAR2(511)

2. Schema of a DataHub topic

To facilitate data demonstration, a topic of the TUPLE type that contains two fields of the STRING type is used. We recommend that you use a topic of the BLOB type. If the data contains binary content, errors may occur when a topic of the TUPLE type is used.

{
    "fields":[
        {
            "name":"key",
            "type":"STRING"
        },
        {
            "name":"val",
            "type":"STRING"
        }
    ]
}

3. Write data

a. Run an SQL script.

declare
i number;
op_num number;
begin
          op_num := 1;
          for i in 1..op_num loop
        insert into orders(oid,pid,num) values(i,i+1,i+2);
      end loop;

      for i in 1..op_num loop
        update orders set pid=i*2+1 where oid=i;
      end loop;

      for i in 1..op_num loop
        delete from orders where oid=i;
      end loop;
          commit;
end;

b. Start logging.

After the corresponding table is configured and the Replicat process is started in the destination database, the following statement is recorded in the dirrpt/MQKAFKA_info_log4j.log file. The statement indicates that the JSON schema of the OGG_TEST.ORDERS table is generated.

INFO 2020-05-29 20:23:55,069 [main] Creating JSON schema for table OGG_TEST.ORDERS in file ./dirdef/OGG_TEST.ORDERS.schema.json

The dirdef/OGG_TEST.ORDERS.schema.json file contains the following content:

$cat dirdef/OGG_TEST.ORDERS.schema.json

{
    "$schema":"http://json-schema.org/draft-04/schema#",
    "title":"OGG_TEST.ORDERS",
    "description":"JSON schema for table OGG_TEST.ORDERS",
    "definitions":{
        "row":{
            "type":"object",
            "properties":{
                "OID":{
                    "type":[
                        "string",
                        "null"
                    ]
                },
                "PID":{
                    "type":[
                        "string",
                        "null"
                    ]
                },
                "NUM":{
                    "type":[
                        "string",
                        "null"
                    ]
                }
            },
            "additionalProperties":false
        },
        "tokens":{
            "type":"object",
            "description":"Token keys and values are free form key value pairs.",
            "properties":{
            },
            "additionalProperties":true
        }
    },
    "type":"object",
    "properties":{
        "table":{
            "description":"The fully qualified table name",
            "type":"string"
        },
        "op_type":{
            "description":"The operation type",
            "type":"string"
        },
        "op_ts":{
            "description":"The operation timestamp",
            "type":"string"
        },
        "current_ts":{
            "description":"The current processing timestamp",
            "type":"string"
        },
        "pos":{
            "description":"The position of the operation in the data source",
            "type":"string"
        },
        "tokens":{
            "$ref":"#/definitions/tokens"
        },
        "before":{
            "$ref":"#/definitions/row"
        },
        "after":{
            "$ref":"#/definitions/row"
        }
    },
    "required":[
        "table",
        "op_type",
        "op_ts",
        "current_ts",
        "pos"
    ],
    "additionalProperties":false
}

c. Sample the data.

After you run the SQL script, the data is written to DataHub. Then, you can sample and view the data in the DataHub console. Three pieces of data are written. The following part shows the sampled data:

Shard ID    System Time    key (STRING)    val (STRING)
0    May 29, 2020 18:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
0    May 29, 2020 18:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"U","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064000","pos":"00000002790849451514","before":{"OID":"1","PID":"2","NUM":"3"},"after":{"OID":"1","PID":"3","NUM":"3"}}
0    May 29, 2020 18:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"D","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064001","pos":"00000002790849451685","before":{"OID":"1","PID":"3","NUM":"3"}}