View on GitHub

个人笔记

SongPinru 的小仓库

原理

1)Master主库将改变记录写到二进制日志(binary log)中;

2)Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);

3)Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

Canal的工作原理很简单,就是把自己伪装成Slave,假装从Master复制数据。

安装

Binlog的分类设置

修改MySQL的/usr/my.cnf配置文件。

[mysqld]

#开启binlog
log_bin = mysql-bin
#binlog日志类型
binlog_format = row
#MySQL服务器唯一id
server_id = 1
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

MySQL Binlog的格式,那就是有三种,分别是STATEMENT,MIXED,ROW。

区别:

1)statement

​ 语句级,binlog会记录每次一执行写操作的语句。

​ 相对row模式节省空间,但是可能产生不一致性,比如1

update tt set create_date=now()

​ 如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。

​ 优点:节省空间

​ 缺点:有可能造成数据不一致。

2)row

​ 行级,binlog会记录每次操作后每行记录的变化。

​ 优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。

​ 缺点:占用较大空间。

3)mixed

​ statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题

​ 在某些情况下譬如:

​ 当函数中包含 UUID() 时;

​ 包含 AUTO_INCREMENT 字段的表被更新时;

​ 执行 INSERT DELAYED 语句时;

​ 用 UDF 时;

​ 会按照 ROW的方式进行处理

​ 优点:节省空间,同时兼顾了一定的一致性。

缺点:还有些极个别情况依旧会造成不一致,

另外statement和mixed对于需要对binlog的监控的情况都不方便。

Canal部署

解压

修改配置

#canal的配置
vim conf/canal.properties

canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# 当前server上部署的instance列表
canal.destinations=
# instance配置,即每个应用的具体配置
vim conf/example/instance.properties

# 不能与mysql的server-id相同
canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info,数据库信息
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false

参数配置

启动

./bin/startup.sh

API

对象名称 介绍 包含内容
message 一次canal从日志中抓取的信息,一个message包含多个sql(event) 包含 一个Entry集合
entry 对应一个sql命令,一个sql可能会对多行记录造成影响。 序列化的数据内容storeValue
rowchange 是把entry中的storeValue反序列化的对象。 1 rowdatalist 行集
2 eventType(数据的变化类型: insert update delete create alter drop)
RowData 出现变化的数据行信息 1 afterColumnList (修改后)
2 beforeColumnList(修改前)
column 一个RowData里包含了多个column,每个column包含了 name和 value 1 columnName
2 columnValue

POM:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.utils.MyKafkaSender;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CanalClient {
    public static void main(String[] args){
        //1、获得单连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop102", 11111), "example", "", "");
        //2、长轮询
        while (true){
            canalConnector.connect();
            canalConnector.subscribe("gmall.*");
            Message message = canalConnector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if(entries.size()<=0){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                for (CanalEntry.Entry entry : entries) {
                    if (CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())  ) {
                            try {
                                ByteString storeValue = entry.getStoreValue();
                                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                                List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                                String tableName = entry.getHeader().getTableName();
                                CanalEntry.EventType eventType = rowChange.getEventType();
                                if ("order_info"==tableName && CanalEntry.EventType.INSERT.equals(eventType)) {
                                    handler(GmallConstants.GMALL_ORDER_INFO,rowDatasList);
                                }else if("order_detail"==tableName && CanalEntry.EventType.INSERT.equals(eventType)){
                                    handler(GmallConstants.GMALL_ORDER_DETAIL,rowDatasList);
                                }else if("user_info"==tableName && (CanalEntry.EventType.INSERT.equals(eventType) ||CanalEntry.EventType.UPDATE.equals(eventType)) ){
                                    handler(GmallConstants.GMALL_USER_INFO,rowDatasList);
                                }
                            } catch (InvalidProtocolBufferException e) {
                                e.printStackTrace();
                            }
                    }
                }
            }
        }
    }

    private static void handler(String topic, List<CanalEntry.RowData> rowDatasList) {
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                Map<String, String> map = new HashMap<>();
//                JSONObject map = new JSONObject();
                for (CanalEntry.Column column : afterColumnsList) {
                    map.put(column.getName(),column.getValue());
                }
                MyKafkaSender.send(topic, JSON.toJSONString(map));
            }
    }
}