实时数据集成的完美搭档:CDC技术与Kafka集成的解决方案

[复制链接]
5774|0
 楼主| ETLCloud 发表于 2023-7-18 18:34 | 显示全部楼层 |阅读模式
一、实时数据同步
随着企业数据不断增长,如何高效地捕获、同步和处理数据成为了业务发展的关键。在这个数字化时代,CDC技术与Kafka集成为企业提供了一种无缝的数据管理方案,为数据的流动和实时处理打开了全新的大门。
CDC技术与Kafka集成能够实现快速、可靠的实时数据同步。CDC技术可以捕获数据库事务日志中的数据变更,并将其转化为可靠的数据流。这些数据流通过Kafka的高吞吐量消息队列进行传输,确保数据的实时性和一致性。无论是从源数据库到目标数据库的同步,还是跨不同数据存储系统的数据传输,CDC技术与Kafka集成提供了高效且无缝的解决方案。
二、可靠的数据传输
Kafka作为一个分布式、可扩展的消息队列系统,提供了高度可靠的数据传输机制。借助Kafka的持久性存储和数据复制机制,数据不会丢失或损坏。即使在高并发的情况下,Kafka集成能够保证数据的完整性和可靠性。这为企业提供了强大的数据传输基础,确保数据在各个环节的安全传输。
三、灵活的数据处理
CDC技术与Kafka集成不仅提供了实时数据同步,还为企业提供了灵活的数据处理能力。Kafka的消息队列和流处理特性使得企业可以在数据传输的同时进行实时的数据处理和分析。借助Kafka的消费者应用程序,企业可以对数据流进行转换、过滤、聚合等操作,实现实时数据的清洗、加工和分析。这种实时数据处理能力为企业提供了即时的洞察力,帮助其做出快速而准确的决策。
四、解耦数据系统
CDC技术与Kafka集成还能帮助企业解耦数据系统。通过将CDC技术与Kafka作为中间层,不同的数据源和目标系统可以独立操作,彼此之间解除了紧耦合的依赖关系。这种解耦带来了极大的灵活性,使得企业能够更加容易地添加、移除或升级数据源和目标系统,而无需对整个数据流程进行重构。
CDC技术与Kafka集成为企业带来了数据管理的全新体验。它提供了高效、可靠的数据同步和实时处理,帮助企业实现数据驱动的成功。无论是数据同步、实时处理还是数据系统的解耦,CDC技术与Kafka集成为企业提供了强大而灵活的解决方案。
五、主流免费CDC工具介绍
介绍两款能够快速且免费实现CDC技术与Kafka集成的主流工具:Flink CDC和ETLCloud CDC。
测试前的环境准备:JDK8以上、Mysql数据库(开启BinLog日志)、kafka
六、Flink CDC安装使用步骤:
下载安装包
进入Flink官网,下载1.13.3版本安装包 flink-1.13.3-bin-scala_2.11.tgz。(Flink1.13.3支持flink cdc2.x版本,为兼容flink cdc)
解压
在服务器上创建安装目录/home/flink,将 flink 安装包放在该目录下,并执行解压命令,解压至当前目录。tar -zxvf flink-1.13.3-bin-scala_2.11.tgz
启动
进入解压后的flink/lib目录,上传mysql和sql-connector驱动包。
6250964b65e0a12aa3.png
进入flink/bin目录,执行启动命令:./start-cluster.sh
9360264b65e17a9238.png
进入Flink可视化界面查看http://ip:8081
382864b65e1b7c4ed.png
测试
下面我们来新建一个maven工程做CDC数据监听的同步测试。
POM依赖
<!--    Flink CDC  -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
</dependency>
新建Flink_CDC2Kafka类
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class Flink_CDC2Kafka {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.1 设置 CK&状态后端
        //略
        //2.通过 FlinkCDC 构建 SourceFunction 并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("ip")  //数据库IP
                .port(3306)        //数据库端口
                .username("admin")  //数据库用户名
                .password("pass")   //数据库密码
                .databaseList("test")   //这个注释,就是多库同步
                .tableList("test.admin") //这个注释,就是多表同步
                .deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
//                .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);
        //3.打印数据并将数据写入 Kafka
        streamSource.print();
        String sinkTopic = "test";
        streamSource.addSink(getKafkaProducer("ip:9092",sinkTopic));
        //4.启动任务
        env.execute("FlinkCDC");
    }
    //kafka 生产者
    public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
        return new FlinkKafkaProducer<String>(brokers,topic,
                new SimpleStringSchema());
    }
}
自定义序列化类
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //1.创建 JSON 对象用于存储最终数据
        JSONObject result = new JSONObject();
        //2.获取库名&表名放入 source
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        JSONObject source = new JSONObject();
        source.put("database",database);
        source.put("table",tableName);
        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }
        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        //5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("insert".equals(type)) {
            type = "c";
        }
        if ("update".equals(type)) {
            type = "u";
        }
        if ("delete".equals(type)) {
            type = "d";
        }
        if ("create".equals(type)) {
            type = "c";
        }
        //6.将字段写入 JSON 对象
        result.put("source", source);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("op", type);
        //7.输出数据
        collector.collect(result.toJSONString());
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
开启CDC监听
4057264b65e3e85adb.png
Mysql中新增一条人员数据
8557564b65e436e013.png
控制台捕获到增量数据
1120464b65e488e252.png
增量数据也成功推送到kafka中
7951564b65e4c908ee.png
至此通过Flink CDC监听数据库增量数据推送到kafka的过程已经完成,可以看到整个过程需要一些编码能力,对于业务人员的使用比较痛苦。
下面我们来介绍ETLCloud这款产品如何通过可视化配置,快速实现上述的场景内容。
七、ETL CDC安装使用步骤
下载安装包
ETLCloud提供了一键快捷部署包,只需运行启动脚本即可完成安装产品部署。部署包下载可以登录ETLCloud官网自行下载。
安装
官网下载linux一键部署包,把一键部署包放到一个目录下解压并进入该目录。
对脚本文件进行赋权
chmod +x restcloud_install.sh
执行脚本
./restcloud_install.sh
4530564b65e5096801.png
等待tomcat启动,当出现这个界面,则restcloud证明启动成功
46064b65e53627e4.png
数据源配置
新增MySql数据源信息
1426664b65e57338c9.png
新增Kafka数据源信息
2347264b65e5bda528.png
测试数据源
9118864b65e5e7579b.png
监听器配置
新增数据库监听器
5195364b65e641024a.png
监听器配置
9595364b65e676dcc6.png
接收端配置(数据传输类型选择kafka)
4967964b65e6a8977e.png
高级配置(默认参数)
2653264b65e6dbe511.png
启动监听
4846064b65e717c893.png
监听成功
3590964b65e7448cf8.png
测试
打开Navicat可视化工具新增并修改一条人员信息
3078164b65e7742185.png
实时数据中可以动态捕捉实时传输数据
6350264b65e7a6fccf.png
Kafka中查看新增数据
3851264b65e7eb675f.png
Kafka中查看修改数据
7184064b65e8447607.png
八、写在最后
上面我们通过两个CDC工具均实现了实时数据同步到kafka的功能,但通过对比Flink CDC和ETLCloud CDC,可以看出ETLCloud CDC提供了可视化配置的方式,使配置过程更加简单快捷,不需要编码能力。而Flink CDC则需要进行编码,对于业务人员可能会有一定的学习成本。
无论选择哪种工具,都可以实现CDC技术与Kafka集成,实时捕获数据库的增量数据变化,提供了方便和高效的数据同步和传输方法。ETLCloud介绍
ETLCloud是一款零代码ETL工具,可以快速对接上百种数据源和应用系统,无需编码即可快速完成数据同步和传输,企业IT人员只需简单几步即可快速完成各种数据抽取同步并配合BI工具实现数据的统计分析。
5278564b65e93d05e8.png
ETLCloud可视化流程同步界面)
ETLCloud社区版本永久免费下载使用https://www.etlcloud.cn
关于RestCloud
谷云科技是一家专注于以链接+数据+AI为核心的技术公司,致力于帮助企业实现各种应用、SaaS、数据和设备之间的无缝连接,构建高效协同的业务环境。致力于为全球大型头部企业及中国500强、世界500强企业提供更高质量、更智能化的数字化解决方案。2022年完成数千万*币Pre-A轮融资,由SIG海纳亚洲创投基金独家投资。
RestCloud产品矩阵
1438064b65e97662ad.png
目前,RestCloud服务超300+大型头部企业客户,产品应用于快销、制造业、通讯业、金融业、军工业、教育及政府机构等各类组织,并与烽火科技、明道云、衡石科技、中数通、航天信息、中国系统、中软国际、中国软件等合作伙伴建立深度合作,持续助力企业数字化转型。
161464b65e9a98155.png

cada8b1ebed14217c1e9d3228640bfb7
41010f73e43d41e5275f3c3b7dc00954
8e6296a400b3f1b768ee55ce3d47d112
8643ac5cf7375be3c8f3fc07632798c2
b1181ea0d4079271e377e174014b7634
e126e25dd95267cfa9cf12fece1b3e8a
ae1871dcd52aaaae7af1e94a1bfb2c8c
490deb714d3de772e168655c2f82d303
d18b5fba35d5b3ee3ea9977a754906b6
3b7fd18fa630ce81d1bfa7b834117d5f
24c03892be4f6fb213211847718d0efd
2dc27902563ae207e6a2a5b6eea9c85b
8d2de310f319fc07282e2d9f4b3eccd5
3052c88409db5ba4a948e681e8670d9b
30d4fd686ef113f7a5b8b23c02c25aad
9a88d6557b8151a43307fe6e39a50bb4
df8b32ec7cd329d809805f2a7b71abf6
450a9fa7186b8977932c28708247c1f0
e7853d67c1088ec6773c993e8f9d670c
e0deacf631badd865b83810b29bad484
47746ae7ca697091115ad8ef57982933
dae0faa7cfad75cb569c696c9f5a0da0
您需要登录后才可以回帖 登录 | 注册

本版积分规则

217

主题

217

帖子

0

粉丝
快速回复 在线客服 返回列表 返回顶部