作者:姚维
这篇文章不是讲 TiDB Binlog 组件的源码,而是讲 TiDB 在执行 DML/DDL 语句过程中,如何将 Binlog 数据 发送给 TiDB Binlog 集群的 Pump 组件。目前 TiDB 在 DML 上的 Binlog 用的类似 Row-based 的格式。具体 Binlog 具体的架构细节可以参考这篇 文章。
这里只描述 TiDB 中的代码实现。
TiDB 采用 protobuf 来编码 binlog,具体的格式可以见 binlog.proto。这里讨论 TiDB 写 Binlog 的机制,以及 Binlog 对 TiDB 写入的影响。
TiDB 会在 DML 语句提交,以及 DDL 语句完成的时候,向 pump 输出 Binlog。
DML 语句包括 Insert/Replace、Update、Delete,这里挑 Insert 语句来阐述,其他的语句行为都类似。首先在 Insert 语句执行完插入(未提交)之前,会把自己新增的数据记录在 binlog.TableMutation
结构体中。
// TableMutation 存储表中数据的变化
message TableMutation {
// 表的 id,唯一标识一个表
optional int64 table_id = 1 [(gogoproto.nullable) = false];
// 保存插入的每行数据
repeated bytes inserted_rows = 2;
// 保存修改前和修改后的每行的数据
repeated bytes updated_rows = 3;
// 已废弃
repeated int64 deleted_ids = 4;
// 已废弃
repeated bytes deleted_pks = 5;
// 删除行的数据
repeated bytes deleted_rows = 6;
// 记录数据变更的顺序
repeated MutationType sequence = 7;
}
这个结构体保存于跟每个 Session 链接相关的事务上下文结构体中 TxnState.mutations
。 一张表对应一个 TableMutation
对象,TableMutation
里面保存了这个事务对这张表的所有变更数据。Insert 会把当前语句插入的行,根据 RowID
+ Row-value
的格式编码之后,追加到 TableMutation.InsertedRows
中:
func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum, colIDs []int64) error {
mutation := t.getMutation(ctx)
pk, err := codec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, types.NewIntDatum(h))
if err != nil {
return errors.Trace(err)
}
value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil)
if err != nil {
return errors.Trace(err)
}
bin := append(pk, value...)
mutation.InsertedRows = append(mutation.InsertedRows, bin)
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Insert)
return nil
}
等到所有的语句都执行完之后,在 TxnState.mutations
中就保存了当前事务对所有表的变更数据。
对于 DML 而言,TiDB 的事务采用 2-phase-commit 算法,一次事务提交会分为 Prewrite 阶段,以及 Commit 阶段。这里分两个阶段来看看 TiDB 具体的行为。
在 session.doCommit
函数中,TiDB 会构造 binlog.PrewriteValue
:
message PrewriteValue {
optional int64 schema_version = 1 [(gogoproto.nullable) = false];
repeated TableMutation mutations = 2 [(gogoproto.nullable) = false];
}
这个 PrewriteValue
中包含了跟这次变动相关的所有行数据,TiDB 会填充一个类型为 binlog.BinlogType_Prewrite
的 Binlog:
info := &binloginfo.BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
PrewriteValue: prewriteData,
},
Client: s.sessionVars.BinlogClient.(binlog.PumpClient),
}
TiDB 这里用一个事务的 Option kv.BinlogInfo
来把 BinlogInfo
绑定到当前要提交的 transaction 对象中:
s.txn.SetOption(kv.BinlogInfo, info)
在 twoPhaseCommitter.execute
中,在把数据 prewrite 到 TiKV 的同时,会调用 twoPhaseCommitter.prewriteBinlog
,这里会把关联的 binloginfo.BinlogInfo
取出来,把 Binlog 的 binlog.PrewriteValue
输出到 Pump。
binlogChan := c.prewriteBinlog()
err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), c.keys)
if binlogChan != nil {
binlogErr := <-binlogChan // 等待 write prewrite binlog 完成
if binlogErr != nil {
return errors.Trace(binlogErr)
}
}
这里值得注意的是,在 prewrite 阶段,是需要等待 write prewrite binlog 完成之后,才能继续做接下去的提交的,这里是为了保证 TiDB 成功提交的事务,Pump 至少一定能收到 Prewrite Binlog。
在 twoPhaseCommitter.execute
事务提交结束之后,事务可能提交成功,也可能提交失败。TiDB 需要把这个状态告知 Pump:
err = committer.execute(ctx)
if err != nil {
committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
return errors.Trace(err)
}
committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
如果发生了 error,那么输出的 Binlog 类型就为 binlog.BinlogType_Rollback
,如果成功提交,那么输出的 Binlog 类型就为 binlog.BinlogType_Commit
。
func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int64) {
if !c.shouldWriteBinlog() {
return
}
binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo)
binInfo.Data.Tp = tp
binInfo.Data.CommitTs = commitTS
go func() {
err := binInfo.WriteBinlog(c.store.clusterID)
if err != nil {
log.Errorf("failed to write binlog: %v", err)
}
}()
}
值得注意的是,这里 WriteBinlog 是单独启动 goroutine 异步完成的,也就是 Commit 阶段,是不再需要等待写 binlog 完成的。这里可以节省一点 commit 的等待时间,这里不需要等待是因为 Pump 即使接收不到这个 Commit Binlog,在超过 timeout 时间后,Pump 会自行根据 Prewrite Binlog 到 TiKV 中确认当条事务的提交状态。
一个 DDL 有如下几个状态:
const (
JobStateNone JobState = 0
JobStateRunning JobState = 1
JobStateRollingback JobState = 2
JobStateRollbackDone JobState = 3
JobStateDone JobState = 4
JobStateSynced JobState = 6
JobStateCancelling JobState = 7
)
这些状态代表了一个 DDL 任务所处的状态:
JobStateNone
,代表 DDL 任务还在处理队列,TiDB 还没有开始做这个 DDL。
JobStateRunning
,当 DDL Owner 开始处理这个任务的时候,会把状态设置为 JobStateRunning
,之后 DDL 会开始变更,TiDB 的 Schema 可能会涉及多个状态的变更,这中间不会改变 DDL job 的状态,只会变更 Schema 的状态。
JobStateDone
, 当 TiDB 完成自己所有的 Schema 状态变更之后,会把 Job 的状态改为 Done。
JobStateSynced
,当 TiDB 每做一次 schema 状态变更,就会需要跟集群中的其他 TiDB 做一次同步,但是当 Job 状态为 JobStateDone
之后,在 TiDB 等到所有的 TiDB 节点同步之后,会将状态修改为 JobStateSynced
。
JobStateCancelling
,TiDB 提供语法 ADMIN CANCEL DDL JOBS job_ids
用于取消某个正在执行或者还未执行的 DDL 任务,当成功执行这个命令之后,DDL 任务的状态会变为 JobStateCancelling
。
JobStateRollingback
,当 DDL Owner 发现 Job 的状态变为 JobStateCancelling
之后,它会将 job 的状态改变为 JobStateRollingback
,以示已经开始处理 cancel 请求。
JobStateRollbackDone
,在做 cancel 的过程,也会涉及 Schema 状态的变更,也需要经历 Schema 的同步,等到状态回滚已经做完了,TiDB 会将 Job 的状态设置为 JobStateRollbackDone
。
对于 Binlog 而言,DDL 的 Binlog 输出机制,跟 DML 语句也是类似的,只有开始处理事务提交阶段,才会开始写 Binlog 出去。那么对于 DDL 来说,跟 DML 不一样,DML 有事务的概念,对于 DDL 来说,SQL 的事务是不影响 DDL 语句的。但是 DDL 里面,上面提到的 Job 的状态变更,是作为一个事务来提交的(保证状态一致性)。所以在每个状态变更,都会有一个事务与之对应,但是上面提到的中间状态,DDL 并不会往外写 Binlog,只有 JobStateRollbackDone
以及 JobStateDone
这两种状态,TiDB 会认为 DDL 语句已经完成,会对外发送 Binlog,发送之前,会把 Job 的状态从 JobStateDone
修改为 JobStateSynced
,这次修改,也涉及一次事务提交。这块逻辑的代码如下:
worker.handleDDLJobQueue():
if job.IsDone() || job.IsRollbackDone() {
binloginfo.SetDDLBinlog(d.binlogCli, txn, job.ID, job.Query)
if !job.IsRollbackDone() {
job.State = model.JobStateSynced
}
err = w.finishDDLJob(t, job)
return errors.Trace(err)
}
type Binlog struct {
DdlQuery []byte
DdlJobId int64
}
DdlQuery
会设置为原始的 DDL 语句,DdlJobId
会设置为 DDL 的任务 ID。
对于最后一次 Job 状态的提交,会有两条 Binlog 与之对应,这里有几种情况:
如果事务提交成功,类型分别为 binlog.BinlogType_Prewrite
和 binlog.BinlogType_Commit
。
如果事务提交失败,类型分别为 binlog.BinlogType_Prewrite
和 binlog.BinlogType_Rollback
。
所以,Pumps 收到的 DDL Binlog,如果类型为 binlog.BinlogType_Rollback
应该只认为如下状态是合法的:
JobStateDone
(因为修改为 JobStateSynced
还未成功)
JobStateRollbackDone
如果类型为 binlog.BinlogType_Commit
,应该只认为如下状态是合法的:
JobStateSynced
JobStateRollbackDone
当 TiDB 在提交最后一个 Job 状态的时候,如果事务提交失败了,那么 TiDB Owner 会尝试继续修改这个 Job,直到成功。也就是对于同一个 DdlJobId
,后续还可能会有多次 Binlog,直到出现 binlog.BinlogType_Commit
。