1.5 Canal 数据同步工具详细教程

欢迎来到我的博客,很高兴能够在这里和您见面!欢迎订阅相关专栏:

⭐️ 全网最全IT互联网公司面试宝典:收集整理全网各大IT互联网公司技术、项目、HR面试真题.
⭐️ AIGC时代的创新与未来:详细讲解AIGC的概念、核心技术、应用领域等内容。
⭐️ 全流程数据技术实战指南:全面讲解从数据采集到数据可视化的整个过程,掌握构建现代化数据平台和数据仓库的核心技术和方法。

文章目录

      • Canal概述
      • 架构
      • 基本工作流程
      • 使用场景
      • 优缺点
      • 部署安装
      • 使用案例
        • 实时数据同步
      • 性能优化
      • 总结

Canal概述

Canal是一款由阿里巴巴开源的、用于MySQL数据库binlog增量订阅和消费的中间件。它的设计灵感来源于MySQL主从复制机制,通过模拟MySQL Slave与Master进行交互,从而解析并获取数据库的实时变更数据。Canal可以将这些变更数据实时推送到其他系统,从而实现数据同步、数据监控等功能。

架构

Canal的架构主要包括以下几个组件:

  1. Canal Server:核心组件,负责与MySQL进行交互,解析binlog日志。
  2. Canal Client:消费者,订阅并消费Canal Server推送的binlog数据。
  3. Zookeeper:用于管理Canal Server的集群状态及分布式协调。

架构图如下:

+---------------+     +-------------+
|               |     |             |
|  MySQL Server |<--->| Canal Server|
|               |     |             |
+---------------+     +-------------+
                          |
                          v
                   +-------------+
                   | Canal Client|
                   +-------------+
                          |
                          v
                   +-------------+
                   | Other System|
                   +-------------+

基本工作流程

  1. 连接MySQL:Canal Server以MySQL Slave的身份连接到MySQL Master,获取binlog位置信息。
  2. 拉取binlog:Canal Server从MySQL Master拉取binlog日志。
  3. 解析binlog:Canal Server解析binlog日志,提取数据库变更事件。
  4. 推送数据:Canal Server将解析后的变更事件推送给Canal Client。
  5. 处理数据:Canal Client消费变更事件,并根据需要将数据同步到其他系统。

使用场景

  1. 数据同步:将MySQL数据实时同步到其他数据库或大数据平台,如Elasticsearch、Hadoop等。
  2. 数据监控:实时监控MySQL数据库的变更,进行数据统计、报警等。
  3. 缓存更新:数据库变更后,实时更新缓存数据,确保数据一致性。

优缺点

优点

  1. 实时性强:能够实时获取MySQL数据库的变更数据。
  2. 高效:直接读取binlog日志,性能开销小。
  3. 灵活性高:支持自定义数据处理逻辑,适用于多种使用场景。

缺点

  1. 复杂度高:需要对MySQL binlog机制有一定了解,配置相对复杂。
  2. 依赖性强:依赖于MySQL主从复制机制,MySQL版本不兼容可能会带来问题。

部署安装

  1. 下载Canal:从Canal GitHub下载最新版本。
  2. 配置Canal Server:修改conf目录下的配置文件,配置MySQL连接信息、binlog位置信息等。
  3. 启动Canal Server:通过命令bin/startup.sh启动Canal Server。
  4. 配置Canal Client:编写Canal Client代码,订阅Canal Server的变更事件。

使用案例

实时数据同步

假设我们要将MySQL数据库的订单数据实时同步到Elasticsearch。首先,我们需要在MySQL中配置binlog,并启动Canal Server。

MySQL配置(my.cnf)

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW

Canal Server配置(example/instance.properties)

canal.instance.mysql.slaveId=1234
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=yourpassword
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=true

Canal Client代码

我们使用Java编写一个Canal Client,将MySQL数据同步到Elasticsearch。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentType;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    private static final String INDEX_NAME = "orders";

    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        RestHighLevelClient esClient = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));

        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();

            while (true) {
                Message message = connector.getWithoutAck(1000);
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();

                if (batchId != -1 && entries.size() > 0) {
                    processEntries(entries, esClient);
                }

                connector.ack(batchId);
            }
        } finally {
            connector.disconnect();
            try {
                esClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static void processEntries(List<CanalEntry.Entry> entries, RestHighLevelClient esClient) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                CanalEntry.RowChange rowChange;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry.toString(), e);
                }

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                        handleInsert(rowData, esClient);
                    } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                        handleUpdate(rowData, esClient);
                    } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                        handleDelete(rowData, esClient);
                    }
                }
            }
        }
    }

    private static void handleInsert(CanalEntry.RowData rowData, RestHighLevelClient esClient) {
        // Assuming the table has columns id, order_id, and amount
        String id = "";
        String orderId = "";
        String amount = "";

        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
            switch (column.getName()) {
                case "id":
                    id = column.getValue();
                    break;
                case "order_id":
                    orderId = column.getValue();
                    break;
                case "amount":
                    amount = column.getValue();
                    break;
            }
        }

        IndexRequest request = new IndexRequest(INDEX_NAME).id(id).source(
                "{ \"order_id\": \"" + orderId + "\", \"amount\": \"" + amount + "\" }", XContentType.JSON);

        try {
            esClient.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void handleUpdate(CanalEntry.RowData rowData, RestHighLevelClient esClient) {
        // Handle update similarly to insert, adjusting for changes
        handleInsert(rowData, esClient);
    }

    private static void handleDelete(CanalEntry.RowData rowData, RestHighLevelClient esClient) {
        String id = "";

        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
            if (column.getName().equals("id")) {
                id = column.getValue();
                break;
            }
        }

        DeleteRequest request = new DeleteRequest(INDEX_NAME, id);

        try {
            esClient.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

性能优化

  1. 增加Canal Server实例:通过增加Canal Server实例,提高数据处理能力。
  2. 优化binlog解析:定期清理无用的binlog文件,减少解析时间。
  3. 合理配置内存和线程:根据业务需求,合理配置Canal Server的内存和线程数,提高并发处理能力。

总结

Canal是一款强大的MySQL binlog增量订阅和消费中间件,通过模拟MySQL Slave与Master的交互,实现实时数据同步和监控。它具有高效、实时的优点,适用于多种数据同步和监控场景。然而,Canal的配置和使用相对复杂,用户需要对MySQL binlog机制有一定了解。通过合理的配置和性能优化,可以充分发挥Canal的优势,实现高效的数据处理和同步。


💗💗💗 如果觉得这篇文对您有帮助,请给个点赞、关注、收藏吧,谢谢!💗💗💗

👇扫👇 码👇+ V👇获取👇更多👇福利👇
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/762455.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【你也能从零基础学会网站开发】关系型数据库中的表(Table)设计结构以及核心组成部分

&#x1f680; 个人主页 极客小俊 ✍&#x1f3fb; 作者简介&#xff1a;程序猿、设计师、技术分享 &#x1f40b; 希望大家多多支持, 我们一起学习和进步&#xff01; &#x1f3c5; 欢迎评论 ❤️点赞&#x1f4ac;评论 &#x1f4c2;收藏 &#x1f4c2;加关注 关系型数据库中…

FTP 文件传输协议:概念、工作原理;上传下载操作步骤

目录 FTP 概念 工作原理 匿名用户 授权用户 FTP软件包 匿名用户上传下载实验步骤 环境配置 下载 上传 wget 授权用户上传下载步骤 root用户登录FTP步骤 监听 设置端口号范围 修改用户家目录 匿名用户 授权用户 FTP 概念 FTP&#xff08;File Transfer Prot…

C语言之线程的学习

线程属于某一个进程 共同点&#xff1a;都能并发 线程共享变量&#xff0c;进程不共享。 多线程任务中&#xff0c;其中某一个线程调用了exit了&#xff0c;其他线程会跟着一起退出 如果是特定的线程就调用pthread_exit 失败返回的是错误号 下面也是

VSCode无法识别 node、npm

一、前提 电脑新安装了node.js&#xff0c;在cmd查看node和npm版本没有问题&#xff0c;但是在VSCode无法识别 1.cmd查看版本&#xff1a; 2.VSCode报错信息&#xff1a; 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写&#xff0c;如果…

C#——Property属性详情

属性 属性&#xff08;Property&#xff09;是类&#xff08;class&#xff09;、结构体&#xff08;structure&#xff09;和接口&#xff08;interface&#xff09;的成员&#xff0c;类或结构体中的成员变量称为字段&#xff0c;属性是字段的扩展&#xff0c;使用访问器&am…

视频转音频:怎样提取视频中的音频?6个提取音频的小技巧(建议收藏)

怎样提取视频中的音频&#xff1f;当我们想从视频中提取出声音时&#xff0c;通常会遇到很多问题。无论是想单独提取出视频里的音频&#xff0c;还是把它转成方便储存或者分享的音频格式&#xff0c;这都会涉及到视频转音频的一个需求。因此&#xff0c;在这篇指南里&#xff0…

如何编写高质量更优雅的代码(Java)

1、函数式接口—FunctionalInterface 好处&#xff1a;高逼格、代码收拢、解藕、统一处理 适用范围&#xff1a;具有共性的接口调用代码 举个栗子&#xff1a; 在我们平时的微服务开发中&#xff0c;调用其他服务的接口&#xff0c;通常要把接口调用部分做异常处理(try catch…

为何交易价格可能超出预期?

当你尝试执行订单时&#xff0c;如果收到“报价超出”的提示&#xff0c;这通常意味着交易无法按你的预期价格成交。对于某些交易者来说&#xff0c;这可能会带来一些困扰&#xff0c;但在外汇等流动性极高的市场中&#xff0c;这种情况是相当常见的。 外汇市场之所以吸引众多…

Python系统教程01

Python 是一门解释性语言&#xff0c;相对更简单、易学&#xff0c;它可以用于解决数学问题、获取与分 析数据、爬虫爬取网络数据、实现复制数学算法等等。 1、print()函数&#xff1a; print()书写时注意所有的符号都是英文符号。print()输出内容时&#xff0c;若要输出字符…

A股低开高走,近3000点,行情要启动了吗?

A股低开高走&#xff0c;近3000点&#xff0c;行情要启动了吗&#xff1f; 今天的A股&#xff0c;让人瞪目结舌了&#xff0c;你们知道是为什么吗&#xff1f;盘面上出现2个重要信号&#xff0c;一起来看看&#xff1a; 1、今天两市低开高走&#xff0c;银行板块护盘指数&…

如何使用AI学习一门编程语言?

无论你是软件开发新手还是拥有几十年的丰富经验&#xff0c;总是需要学习新知识。TIOBE Index追踪50种最受欢迎的编程语言&#xff0c;许多生态系统为职业发展和横向转型提供了机会。鉴于现有技术具有的广度&#xff0c;抽空学习一项新技能并有效运用技能可能困难重重。 最近我…

Linux启动elasticsearch,提示权限不够

Linux启动elasticsearch&#xff0c;提示权限不够&#xff0c;如下图所示&#xff1a; 解决办法&#xff1a; 设置文件所有者&#xff0c;即使用户由权限访问文件 sudo chown -R 用户名[:新组] ./elasticsearch-8.10.4 //切换到elasticsearch-8.10.4目录同级 chown详细格式…

关于vue创建项目失败报错(镜像过期)的解决方案

在新建vue项目时出现以下错误&#xff1a; 原因&#xff1a; npm.taobao.org和registry.npm.taobao.org旧域名于2021年官方公告域名更换事件&#xff0c;已于2022年05月31日零时起停止服务&#xff0c;域名HTTPS证书于2024年1月22日正式到期&#xff0c;不可再用。 解决方案:…

昇思MindSpore学习总结七——模型训练

1、模型训练 模型训练一般分为四个步骤&#xff1a; 构建数据集。定义神经网络模型。定义超参、损失函数及优化器。输入数据集进行训练与评估。 现在我们有了数据集和模型后&#xff0c;可以进行模型的训练与评估。 2、构建数据集 首先从数据集 Dataset加载代码&#xff0…

gdb及其使用

gdb调试一&#xff1a; 首先进入gdb&#xff0c;确定好进程&#xff0c;输入进程号 确定要调试哪个文件&#xff0c;然后输入&#xff1a;&#xff08;b为打断点&#xff09; (gdb) b serialization_protobuffer.h:write<ros::serialization::OStream>(ros::serializat…

python自动化办公之shutil

目录 1复制文件&#xff0c;此时存在2份相同文件 2移动文件&#xff0c;此时仅有1份文件 3删除文件&#xff0c;此时0份文件 用到的库&#xff1a;shutil&#xff0c;os 实现的效果&#xff1a;复制文件&#xff0c;移动文件&#xff0c;删除文件 代码&#xff1a; 1复制…

【机器学习】FFmpeg+Whisper:二阶段法视频理解(video-to-text)大模型实战

目录 一、引言 二、FFmpeg工具介绍 2.1 什么是FFmpeg 2.2 FFmpeg核心原理 2.3 FFmpeg使用示例 三、FFmpegWhisper二阶段法视频理解实战 3.1 FFmpeg安装 3.2 Whisper模型下载 3.3 FFmpeg抽取视频的音频 3.3.1 方案一&#xff1a;命令行方式使用ffmpeg 3.3.2 方案二&a…

Error: A JNl error has occurred, please check your installation and try again.

Eclipse 运行main方法的时候报错&#xff1a;Error: A JNl error has occurred, please check your installation and try again. 一、问题分析 导致这个问题&#xff0c;主要原因&#xff0c;我认为是在新版本中&#xff0c;默认的JDK编译版本与我们配置的JDK版本不一致导致的…

【Linux】:命令行参数

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux命令行参数的相关知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入…

代码随想录算法训练营第59天:动态[1]

代码随想录算法训练营第59天&#xff1a;动态 两个字符串的删除操作 力扣题目链接(opens new window) 给定两个单词 word1 和 word2&#xff0c;找到使得 word1 和 word2 相同所需的最小步数&#xff0c;每步可以删除任意一个字符串中的一个字符。 示例&#xff1a; 输入: …