一、实时数据同步 随着企业数据不断增长,如何高效地捕获、同步和处理数据成为了业务发展的关键。在这个数字化时代,CDC技术与Kafka集成为企业提供了一种无缝的数据管理方案,为数据的流动和实时处理打开了全新的大门。 CDC技术与Kafka集成能够实现快速、可靠的实时数据同步。CDC技术可以捕获数据库事务日志中的数据变更,并将其转化为可靠的数据流。这些数据流通过Kafka的高吞吐量消息队列进行传输,确保数据的实时性和一致性。无论是从源数据库到目标数据库的同步,还是跨不同数据存储系统的数据传输,CDC技术与Kafka集成提供了高效且无缝的解决方案。 二、可靠的数据传输Kafka作为一个分布式、可扩展的消息队列系统,提供了高度可靠的数据传输机制。借助Kafka的持久性存储和数据复制机制,数据不会丢失或损坏。即使在高并发的情况下,Kafka集成能够保证数据的完整性和可靠性。这为企业提供了强大的数据传输基础,确保数据在各个环节的安全传输。 三、灵活的数据处理CDC技术与Kafka集成不仅提供了实时数据同步,还为企业提供了灵活的数据处理能力。Kafka的消息队列和流处理特性使得企业可以在数据传输的同时进行实时的数据处理和分析。借助Kafka的消费者应用程序,企业可以对数据流进行转换、过滤、聚合等操作,实现实时数据的清洗、加工和分析。这种实时数据处理能力为企业提供了即时的洞察力,帮助其做出快速而准确的决策。 四、解耦数据系统CDC技术与Kafka集成还能帮助企业解耦数据系统。通过将CDC技术与Kafka作为中间层,不同的数据源和目标系统可以独立操作,彼此之间解除了紧耦合的依赖关系。这种解耦带来了极大的灵活性,使得企业能够更加容易地添加、移除或升级数据源和目标系统,而无需对整个数据流程进行重构。 CDC技术与Kafka集成为企业带来了数据管理的全新体验。它提供了高效、可靠的数据同步和实时处理,帮助企业实现数据驱动的成功。无论是数据同步、实时处理还是数据系统的解耦,CDC技术与Kafka集成为企业提供了强大而灵活的解决方案。 五、主流免费CDC工具介绍介绍两款能够快速且免费实现CDC技术与Kafka集成的主流工具:Flink CDC和ETLCloud CDC。 测试前的环境准备:JDK8以上、Mysql数据库(开启BinLog日志)、kafka 六、Flink CDC安装使用步骤:下载安装包 进入Flink官网,下载1.13.3版本安装包 flink-1.13.3-bin-scala_2.11.tgz。(Flink1.13.3支持flink cdc2.x版本,为兼容flink cdc) 解压 在服务器上创建安装目录/home/flink,将 flink 安装包放在该目录下,并执行解压命令,解压至当前目录。tar -zxvf flink-1.13.3-bin-scala_2.11.tgz 启动 进入解压后的flink/lib目录,上传mysql和sql-connector驱动包。 进入flink/bin目录,执行启动命令:./start-cluster.sh 进入Flink可视化界面查看http://ip:8081 测试 下面我们来新建一个maven工程做CDC数据监听的同步测试。 POM依赖 <!-- Flink CDC -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency> 新建Flink_CDC2Kafka类
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class Flink_CDC2Kafka {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 设置 CK&状态后端
//略
//2.通过 FlinkCDC 构建 SourceFunction 并读取数据
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("ip") //数据库IP
.port(3306) //数据库端口
.username("admin") //数据库用户名
.password("pass") //数据库密码
.databaseList("test") //这个注释,就是多库同步
.tableList("test.admin") //这个注释,就是多表同步
.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.打印数据并将数据写入 Kafka
streamSource.print();
String sinkTopic = "test";
streamSource.addSink(getKafkaProducer("ip:9092",sinkTopic));
//4.启动任务
env.execute("FlinkCDC");
}
//kafka 生产者
public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
return new FlinkKafkaProducer<String>(brokers,topic,
new SimpleStringSchema());
}
} 自定义序列化类 import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
JSONObject source = new JSONObject();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
} 开启CDC监听 Mysql中新增一条人员数据 控制台捕获到增量数据 增量数据也成功推送到kafka中 至此通过Flink CDC监听数据库增量数据推送到kafka的过程已经完成,可以看到整个过程需要一些编码能力,对于业务人员的使用比较痛苦。 下面我们来介绍ETLCloud这款产品如何通过可视化配置,快速实现上述的场景内容。 七、ETL CDC安装使用步骤下载安装包 ETLCloud提供了一键快捷部署包,只需运行启动脚本即可完成安装产品部署。部署包下载可以登录ETLCloud官网自行下载。 安装 官网下载linux一键部署包,把一键部署包放到一个目录下解压并进入该目录。 对脚本文件进行赋权 chmod +x restcloud_install.sh 执行脚本 ./restcloud_install.sh 等待tomcat启动,当出现这个界面,则restcloud证明启动成功 数据源配置 新增MySql数据源信息 新增Kafka数据源信息 测试数据源 监听器配置 新增数据库监听器 监听器配置 接收端配置(数据传输类型选择kafka) 高级配置(默认参数) 启动监听 监听成功 测试 打开Navicat可视化工具新增并修改一条人员信息 实时数据中可以动态捕捉实时传输数据 Kafka中查看新增数据 Kafka中查看修改数据 八、写在最后上面我们通过两个CDC工具均实现了实时数据同步到kafka的功能,但通过对比Flink CDC和ETLCloud CDC,可以看出ETLCloud CDC提供了可视化配置的方式,使配置过程更加简单快捷,不需要编码能力。而Flink CDC则需要进行编码,对于业务人员可能会有一定的学习成本。 无论选择哪种工具,都可以实现CDC技术与Kafka集成,实时捕获数据库的增量数据变化,提供了方便和高效的数据同步和传输方法。ETLCloud介绍 ETLCloud是一款零代码ETL工具,可以快速对接上百种数据源和应用系统,无需编码即可快速完成数据同步和传输,企业IT人员只需简单几步即可快速完成各种数据抽取同步并配合BI工具实现数据的统计分析。 (ETLCloud可视化流程同步界面) ETLCloud社区版本永久免费下载使用https://www.etlcloud.cn 关于RestCloud 谷云科技是一家专注于以链接+数据+AI为核心的技术公司,致力于帮助企业实现各种应用、SaaS、数据和设备之间的无缝连接,构建高效协同的业务环境。致力于为全球大型头部企业及中国500强、世界500强企业提供更高质量、更智能化的数字化解决方案。2022年完成数千万*币Pre-A轮融资,由SIG海纳亚洲创投基金独家投资。 RestCloud产品矩阵 目前,RestCloud服务超300+大型头部企业客户,产品应用于快销、制造业、通讯业、金融业、军工业、教育及政府机构等各类组织,并与烽火科技、明道云、衡石科技、中数通、航天信息、中国系统、中软国际、中国软件等合作伙伴建立深度合作,持续助力企业数字化转型。
|