如果您通过DataStream的方式读写Hologres数据,则需要使用Hologres DataStream Connector连接Flink全托管。本文为您介绍如何在Flink全托管控制台上使用Hologres DataStream Connector来读写Hologres数据。

背景信息

Hologres DataStream Connector的连接配置与对应Hologres SQL Connector配置完全相同,SQL配置详情请参见Hologres源表WITH参数Hologres结果表WITH参数Hologres维表WITH参数

Maven中央库中已经放置了VVR Connector,以供您在作业开发时直接使用。您可以通过以下任何一种方式来使用DataStream Connector:

(推荐)直接将Connector作为项目依赖打进作业JAR包

  1. 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. 检查您的settings.xml配置文件中是否存在<mirrorOf>*</mirrorOf>配置。

    如果存在<mirrorOf>*</mirrorOf>配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。

  3. 在作业的Maven POM文件中添加您需要的Connector作为项目依赖。
    <dependency>
       <groupId>com.alibaba.ververica</groupId>
       <artifactId>ververica-connector-hologres</artifactId>
       <version>${connector.version}</version>
    </dependency>
    每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表。完整的依赖信息请参见Hologres示例代码中的pom.xml文件。
    注意
    • 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
    • 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
      <transformers>
        <!-- The service transformer is needed to merge META-INF/services files -->
        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
          <projectName>Apache Flink</projectName>
          <encoding>UTF-8</encoding>
        </transformer>
      </transformers>
  4. 修改Hologres连接配置和Schema信息。
    类别 说明
    Hologres连接配置 替换成您Hologres的连接配置信息。与对应SQL配置完全相同,详情请参见Hologres源表WITH参数Hologres结果表WITH参数Hologres维表WITH参数
    Schema信息 表的模式信息。表的类型映射与SQL中表的映射关系完全一致,详情请参见Hologres源表类型映射Hologres结果表类型映射Hologres维表类型映射
    • Hologres源表
      VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。
      // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
      TableSchema schema = TableSchema.builder()
          .field("a", DataTypes.INT())
          .build();
      
      // Hologres的相关参数,具体参数含义请参见SQL档。
      Configuration config = new Configuration();
      config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
      config.setString(HologresConfigs.USERNAME, "yourUserName");
      config.setString(HologresConfigs.PASSWORD, "yourPassword");
      config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
      config.setString(HologresConfigs.TABLE, "yourTableName");
      
      // 构建JDBC Options。
      JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
      
      String query = JDBCUtils.getSimpleSelectFromStatement(
          jdbcOptions.getTable(), schema.getFieldNames());
       
      // 构建HologresBulkreadInputFormat读取表数据。
      HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
      env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
          .print();
      env.execute();
    • Hologres Binlog源表
      VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。
      // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
      TableSchema schema = TableSchema.builder()
          .field("a", DataTypes.INT())
          .build();
        
      // Hologres的相关参数,具体参数含义请参见SQL文档。
      Configuration config = new Configuration();
      config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
      config.setString(HologresConfigs.USERNAME, "yourUserName");
      config.setString(HologresConfigs.PASSWORD, "yourPassword");
      config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
      config.setString(HologresConfigs.TABLE, "yourTableName");
      config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
      
      // 构建JDBC Options。
      JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
      jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
      
      RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
      
      // 构建Hologres Binlog Source。
      long startTimeMs = 0;
      HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
          schema,
          config,
          jdbcOptions,
          recordConverter,
          startTimeMs);
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
      env.execute();
      说明
      • 方法buildRecordConverter不在VVR Connector依赖中,是示例代码中提供的方法。
      • Hologres Binlog注意事项和实现原理等详情,请参见Flink实时消费Hologres
    • Hologres结果表
      VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。
      // 初始化读取的表的Schema。
      TableSchema schema = TableSchema.builder()
          .field("a", DataTypes.INT())
        .field("b", DataTypes.STRING())
          .build();
      
      // Hologres的相关参数,具体参数含义请参见SQL文档。
      Configuration config = new Configuration();
      config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
      config.setString(HologresConfigs.USERNAME, "yourUserName");
      config.setString(HologresConfigs.PASSWORD, "yourPassword");
      config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
      config.setString(HologresConfigs.TABLE, "yourTableName");
      config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
      HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
      
      // 构建Hologres Writer,以RowData的方式写入数据。
      AbstractHologresWriter<RowData> hologresWriter = 
          buildHologresWriter(schema, config, hologresConnectionParam);
      
      // 构建Hologres Sink。
      HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
      
      int offset = (int) System.currentTimeMillis();
      env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
          .addSink(sinkFunction);
      
      env.execute();
      说明 方法buildHologresWriter不在VVR Connector依赖中,是示例代码中提供的方法。

上传Connector JAR包到Flink全托管开发控制台后,填写配置信息

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的目标Connector的JAR包。

    您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。Flink全托管产品提供的Connector官方JAR包的下载地址,请参见Connector列表

  5. 在目标作业开发页面附加依赖文件项,选择目标Connector的JAR包。