
CREATE DATABASE `cdc_database`;CREATE TABLE `cdc_database`.`cdc_source`(`id` BIGINT, `class` VARCHAR(128), `score` INT, PRIMARY KEY(`id`));
CREATE DATABASE cdc_database;CREATE TABLE cdc_database.cdc_sink(id LONG, class STRING, score INT, PRIMARY KEY(id)) using tc_iceberg;CREATE TABLE cdc_database.cdc_compute(class STRING, avg_score INT, PRIMARY KEY(class)) using tc_iceberg;
CREATE CATALOG tc_iceberg_catalog WITH ('type'='mixed_iceberg','catalog-type'='hive','uri'='thrift://xxx:xxx', -- 填写 DLC 外部访问暴露的 Catalog 访问地址'table-formats'='MIXED_ICEBERG');CREATE TABLE `mysql_cdc_source` (`id` BIGINT,`class` STRING,`score` INT,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = 'xxx', -- 数据库的 IP'port' = 'xxx', -- 数据库的访问端口'username' = 'xxx', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'xxx', -- 数据库访问的密码'database-name' = 'cdc_database', -- 需要同步的数据库'table-name' = 'cdc_source' -- 需要同步的数据表名);INSERT INTO `tc_iceberg_catalog`.`cdc_database`.`cdc_sink` SELECT * FROM `mysql_cdc_source`;
CREATE CATALOG tc_iceberg_catalog WITH ('type'='mixed_iceberg','catalog-type'='hive','uri'='thrift://xxx:xxx', -- 填写 DLC 外部访问暴露的 Catalog 访问地址'table-formats'='MIXED_ICEBERG');INSERT INTO `tc_iceberg_catalog`.`cdc_database`.`cdc_compute`SELECT class, avg(score) AS avg_scoreFROM `tc_iceberg_catalog`.`cdc_database`.`cdc_source` GROUP BY `class`;
./flink run --class com.tencent.dlc.tciceberg.flink.FlinkSQLDemo /data/jars/flink-demo-1.0-SNAPSHOT.jar
INSERT INTO `cdc_database`.`cdc_source` VALUES(1, 'class1', 80);INSERT INTO `cdc_database`.`cdc_source` VALUES(2, 'class1', 85);INSERT INTO `cdc_database`.`cdc_source` VALUES(3, 'class2', 85);INSERT INTO `cdc_database`.`cdc_source` VALUES(4, 'class2', 90);DELETE FROM `cdc_database`.`cdc_source` WHERE id = 1;UPDATE `cdc_database`.`cdc_source` SET `score` = 100 where id = 3;
SELECT * FROM cdc_database.cdc_sink;SELECT * FROM cdc_database.cdc_compute;
<?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>fs.lakefs.impl</name><value>org.apache.hadoop.fs.lakefs.CosFileSystem</value></property><property><name>fs.cosn.impl</name><value>org.apache.hadoop.fs.CosFileSystem</value></property><!-- 配置正确的可用域 --><property><name>fs.cosn.bucket.region</name><value>ap-xxx</value></property><property><name>fs.cosn.posix_bucket.fs.impl</name><value>org.apache.hadoop.fs.CosFileSystem</value></property><property><name>fs.cosn.credentials.provider</name><value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value></property><property><name>qcloud.dlc.endpoint</name><value>dlc.tencentcloudapi.com</value></property><property><name>fs.cosn.posix_bucket.fs.userinfo.region</name><value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value></property><!-- 配置用户的 Secret ID --><property><name>fs.cosn.posix_bucket.fs.userinfo.secretId</name><value>xxx</value></property><!-- 配置用户的 Secret KEY --><property><name>fs.cosn.posix_bucket.fs.userinfo.secretKey</name><value>xxx</value></property></configuration>
<properties><flink.version>1.16.3</flink.version></properties><dependencies<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>
public class FlinkSQLDemo {public static void main(String[] args) {// 创建执行环境 和 配置checkpointStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(60000);env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");env.getCheckpointConfig().setCheckpointTimeout(60000);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 替换为要执行的SQLString sql = "SQL to be excuted...";tEnv.executeSql(sourceSql);}}
文档反馈