阻塞式重试; 2PC、3PC 传统事务; 使用队列,后台异步处理; TCC 补偿事务; 本地消息表(异步确保); MQ 事务。
阻塞式重试
m := db.Insert(sql)
err := request(B-Service,m)
func request(url string,body interface{}){
for i:=; i<3; i ++ {
result, err = request.POST(url,body)
if err == nil {
break
}else {
log.Print()
}
}
}
调用 B 服务成功,但由于网络超时原因,当前服务认为其失败了,继续重试,这样 B 服务会产生 2 条一样的数据。 调用 B 服务失败,由于 B 服务不可用,重试 3 次依然失败,当前服务在前面代码中插入到 DB 的一条记录,就变成了脏数据。 重试会增加上游对本次调用的延迟,如果下游负载较大,重试会放大下游服务的压力。
异步队列
m := db.Insert(sql)
err := mq.Publish("B-Service-topic",m)
TCC 补偿事务
阶段一、Try 操作:对业务资源做检测、资源预留,比如对库存的检查、预扣。 阶段二、Confirm 操作:提交确认 Try 操作的资源预留。比如把库存预扣更新为扣除。 阶段二、Cancel 操作:Try 操作失败后,释放其预扣的资源。比如把库存预扣的加回去。
m := db.Insert(sql)
aResult, aErr := A.Try(m)
bResult, bErr := B.Try(m)
cResult, cErr := C.Try(m)
if cErr != nil {
A.Cancel()
B.Cancel()
C.Cancel()
} else {
A.Confirm()
B.Confirm()
C.Confirm()
}
空释放
时序
调用失败
阻塞式重试。但有同样的问题,比如宕机、一直失败的情况。 写入日志、队列,然后有单独的异步服务自动或人工介入处理。但一样会有问题,写日志或队列时,会存在失败的情况。
本地消息表
配合MQ
messageTx := tc.NewTransaction("order")
messageTxSql := tx.TryPlan("content")
m,err := db.InsertTx(sql,messageTxSql)
if err!=nil {
return err
}
aErr := mq.Publish("B-Service-topic",m)
if aErr!=nil { // 推送到 MQ 失败
messageTx.Confirm() // 更新消息的状态为 confirm
}else {
messageTx.Cancel() // 删除消息
}
// 异步处理 confirm 的消息,继续推送
func OnMessage(task *Task){
err := mq.Publish("B-Service-topic", task.Value())
if err==nil {
messageTx.Cancel()
}
}
上面代码中其 messageTxSql 是插入本地消息表的一段 SQL :
insert into `tcc_async_task` (`uid`,`name`,`value`,`status`) values ('?','?','?','?')
confirm
。本地消息表中 status
有 2 种状态 try
、confirm
, 无论哪种状态在 OnMessage
都可以监听到,从而发起重试。配合服务调用
messageTx := tc.NewTransaction("order")
messageTxSql := tx.TryPlan("content")
body,err := db.InsertTx(sql,messageTxSql)
if err!=nil {
return err
}
aErr := request.POST("B-Service",body)
if aErr!=nil { // 调用 B-Service 失败
messageTx.Confirm() // 更新消息的状态为 confirm
}else {
messageTx.Cancel() // 删除消息
}
// 异步处理 confirm 或 try 的消息,继续调用 B-Service
func OnMessage(task *Task){
// request.POST("B-Service",body)
}
OnMessage
中处理:messageTx := tc.NewTransaction("order")
messageTx := tx.Try("content")
aErr := request.POST("B-Service",body)
// ....
消息过期
Try
和 Confirm
消息的处理器:TCC.SetTryHandler(OnTryMessage())
TCC.SetConfirmHandler(OnConfirmMessage())
func OnConfirmMessage(task *tcc.Task) {
if time.Now().Sub(task.CreatedAt) > time.Hour {
err := task.Cancel() // 删除该消息,停止重试。
// doSomeThing() 告警,人工介入
return
}
}
Try
处理函数中,还要单独判断当前消息任务是否存在过短,因为 Try
状态的消息,可能才刚刚创建,还没被确认提交或删除。这会和正常业务逻辑的执行重复,意味着成功的调用,也会被重试;为尽量避免这种情况,可以检测消息的创建时间是否很短,短的话可以跳过。独立消息服务
err := request.POST("Message-Service",body)
if err!=nil {
return err
}
aErr := request.POST("B-Service",body)
if aErr!=nil {
return aErr
}
try
和 confirm
,消息服务在前面多了一种状态 prepare
。MQ 事务
Confirm
确认提交消息,失败则Cancel
删除消息。MQ 事务也会存在 prepare
状态,需要 MQ 的消费处理逻辑来确认业务是否成功。