This topic describes how to run a Flink DataStream job to read data from Alibaba Cloud DataHub and write data to ApsaraDB for HBase.
Prerequisites
- Java Development Kit (JDK) 8 is installed on your machine.
- Maven 3.X is installed on your machine.
- An integrated development environment (IDE) for Java or Scala is installed on your machine. We recommend that you use IntelliJ IDEA. The JDK and Maven are configured.
- A topic is created in DataHub, and test data exists in the topic.
Note The test data must contain three fields, whose data types are BOOLEAN, STRING, and STRING in sequence.
- An ApsaraDB for HBase instance is created. The ApsaraDB for HBase instance resides
in the same region and the same virtual private cloud (VPC) as your Realtime Compute
for Apache Flink cluster in exclusive mode. A table with several column families is
created in the ApsaraDB for HBase instance. To use Shell to access ApsaraDB for HBase,see
Use HBase Shell to access ApsaraDB for HBase.
Note
- ApsaraDB for HBase Standard Edition is used in this topic.
- You must add the IP address of your Realtime Compute for Apache Flink cluster to a whitelist of ApsaraDB for HBase.
Background information
Notice Only Blink 3.X supports this demo.
Develop a job
Publish a job
For more information about how to publish a job, see Publish a job.
The following example shows the job content:
Note Before you publish the job, set the Parallelism parameter for the source table on
the Configurations tab of the Development page. The parallelism setting of the source table cannot be greater than the number
of shards in the source table. Otherwise, a JobManager error occurs when the job starts.
-- Required. The full name of the main class.
blink.main.class=Hbase_Demo.HbaseDemo
-- The name of the job.
blink.job.name=datahub_demo
-- The resource name of the JAR package that contains the full name of the main class. If multiple JAR packages exist, you must specify this parameter.
--blink.main.jar=Hbase_Demo-1.0-snapshot.jar
-- The default state backend configuration. The configuration takes effect when the job code is not explicitly configured.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
-- The default checkpoint configuration. The configuration takes effect when the job code is not explicitly configured.
blink.checkpoint.interval.ms=180000
Note You can configure custom parameters. For more information, see Set custom parameters.
Verify the test result
FAQ
Q: If an error similar to the following one appears when a job is running, a JAR package conflict occurs. What do I do?java.lang.AbstractMethodError:com.alibaba.fastjson.support.jaxrs.FastJsonAutoDiscoverable.configure(Lcom/alibaba/blink/shaded/datahub/javax/ws/rs/core/FeatureContext;)

A: We recommend that you use the relocation feature of maven-shade-plugin to resolve the JAR package conflict.
<relocations combine.self="override">
<relocation>
<pattern>org.glassfish.jersey</pattern>
<shadedPattern>com.alibaba.blink.shaded.datahub.org.glassfish.jersey</shadedPattern>
</relocation>
</relocations>