本文共 10070 字,大约阅读时间需要 33 分钟。
本文是使用 golang 实现 redis 系列的第八篇, 将介绍如何在分布式缓存中使用 Try-Commit-Catch 方式来解决分布式一致性问题。
godis 集群的源码在
在上一篇文章中我们使用一致性 hash 算法将缓存中的 key 分散到不同的服务器节点中,从而实现了分布式缓存。随之而来的问题是:一条指令(比如 MSET)可能需要多个节点同时执行,可能有些节点成功而另一部分节点失败。
对于使用者而言这种部分成功部分失败的情况非常难以处理,所以我们需要保证 MSET 操作要么全部成功要么全部失败。
于是问题来了 DEL、MSET 等命令所涉及的 key 可能分布在不同的节点中,在集群模式下实现这类涉及多个 key 的命令最简单的方式当然是 For-Each 遍历 key 并向它们所在的节点发送相应的操作指令。 以 MGET 命令的实现为例:
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command") } // 从参数列表中取出要读取的 key keys := make([]string, len(args)-1) for i := 1; i < len(args); i++ { keys[i-1] = string(args[i]) } resultMap := make(map[string][]byte) // 计算每个 key 所在的节点,并按照节点分组 groupMap := cluster.groupBy(keys) // groupMap 的类型为 map[string][]string,key 是节点的地址,value 是 keys 中属于该节点的 key 列表 for peer, group := range groupMap { // 向每个节点发送 mget 指令,读取分布在它上面的 key resp := cluster.Relay(peer, c, makeArgs("MGET", group...)) if reply.IsErrorReply(resp) { errReply := resp.(reply.ErrorReply) return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error())) } arrReply, _ := resp.(*reply.MultiBulkReply) // 将每个节点上的结果 merge 到 map 中 for i, v := range arrReply.Args { key := group[i] resultMap[key] = v } } result := make([][]byte, len(keys)) for i, k := range keys { result[i] = resultMap[k] } return reply.MakeMultiBulkReply(result)}// 计算 key 所属的节点,并按节点分组func (cluster *Cluster) groupBy(keys []string) map[string][]string { result := make(map[string][]string) for _, key := range keys { // 使用一致性 hash 计算所属节点 peer := cluster.peerPicker.Get(key) // 将 key 加入到相应节点的分组中 group, ok := result[peer] if !ok { group = make([]string, 0) } group = append(group, key) result[peer] = group } return result}
那么 MSET 命令的实现能否如法炮制呢?答案是否定的。在上面的代码中我们注意到,在向各个节点发送指令时若某个节点读取失败则会直接退出整个 MGET 执行过程。
若在执行 MSET 指令时遇到部分节点失败或超时,则会出现部分 key 设置成功而另一份设置失败的情况。对于缓存使用者而言这种部分成功部分失败的情况非常难以处理,所以我们需要保证 MSET 操作要么全部成功要么全部失败。
两阶段提交(2-Phase Commit, 2PC)算法是解决我们遇到的一致性问题最简单的算法。在 2PC 算法中写操作被分为两个阶段来执行:
2PC是一种简单的一致性协议,它存在一些问题:
首先我们定义事务的描述结构:
type Transaction struct { id string // 事务 ID, 由 snowflake 算法生成 args [][]byte // 命令参数 cluster *Cluster conn redis.Connection keys []string // 事务中涉及的 key undoLog map[string][]byte // 每个 key 在事务执行前的值,用于回滚事务}
先看事务参与者 prepare 阶段的操作:
// prepare 命令的格式是: PrepareMSet TxID key1, key2 ...// TxID 是事务 ID,由协调者决定func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 3 { return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command") } txId := string(args[1]) size := (len(args) - 2) / 2 keys := make([]string, size) for i := 0; i < size; i++ { keys[i] = string(args[2*i+2]) } txArgs := [][]byte{ []byte("MSet"), } // actual args for cluster.db txArgs = append(txArgs, args[2:]...) tx := NewTransaction(cluster, c, txId, txArgs, keys) // 创建新事务 cluster.transactions.Put(txId, tx) // 存储到节点的事务列表中 err := tx.prepare() // 准备事务 if err != nil { return reply.MakeErrReply(err.Error()) } return &reply.OkReply{}}
实际的准备操作在 tx.prepare() 中:
func (tx *Transaction) prepare() error { // 锁定相关 key tx.cluster.db.Locks(tx.keys...) // 准备 undo log tx.undoLog = make(map[string][]byte) for _, key := range tx.keys { entity, ok := tx.cluster.db.Get(key) if ok { blob, err := gob.Marshal(entity) // 将修改之前的状态序列化之后存储作为 undo log if err != nil { return err } tx.undoLog[key] = blob } else { // 若事务执行前 key 是空的,在回滚时应删除它 tx.undoLog[key] = []byte{} } } tx.status = PreparedStatus return nil}
看看协调者在做什么:
func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { // 解析参数 argCount := len(args) - 1 if argCount%2 != 0 || argCount < 1 { return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command") } size := argCount / 2 keys := make([]string, size) valueMap := make(map[string]string) for i := 0; i < size; i++ { keys[i] = string(args[2*i+1]) valueMap[keys[i]] = string(args[2*i+2]) } // 找到所属的节点 groupMap := cluster.groupBy(keys) if len(groupMap) == 1 { // do fast // 若所有的 key 都在同一个节点直接执行,不使用较慢的 2pc 算法 for peer := range groupMap { return cluster.Relay(peer, c, args) } } // 开始准备阶段 var errReply redis.Reply txId := cluster.idGenerator.NextId() // 使用 snowflake 算法决定事务 ID txIdStr := strconv.FormatInt(txId, 10) rollback := false // 向所有参与者发送 prepare 请求 for peer, group := range groupMap { peerArgs := []string{txIdStr} for _, k := range group { peerArgs = append(peerArgs, k, valueMap[k]) } var resp redis.Reply if peer == cluster.self { resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...)) } else { resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...)) } if reply.IsErrorReply(resp) { errReply = resp rollback = true break } } if rollback { // 若 prepare 过程出错则执行回滚 RequestRollback(cluster, c, txId, groupMap) } else { _, errReply = RequestCommit(cluster, c, txId, groupMap) rollback = errReply != nil } if !rollback { return &reply.OkReply{} } return errReply}
事务参与者提交本地事务:
func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command") } // 读取事务信息 txId := string(args[1]) raw, ok := cluster.transactions.Get(txId) if !ok { return reply.MakeIntReply(0) } tx, _ := raw.(*Transaction) // 在提交成功后解锁 key defer func() { cluster.db.UnLocks(tx.keys...) tx.status = CommitedStatus //cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit }() cmd := strings.ToLower(string(tx.args[0])) var result redis.Reply if cmd == "del" { result = CommitDel(cluster, c, tx) } else if cmd == "mset" { result = CommitMSet(cluster, c, tx) } // 提交失败 if reply.IsErrorReply(result) { err2 := tx.rollback() return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result)) } return result}// 执行操作func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply { size := len(tx.args) / 2 keys := make([]string, size) values := make([][]byte, size) for i := 0; i < size; i++ { keys[i] = string(tx.args[2*i+1]) values[i] = tx.args[2*i+2] } for i, key := range keys { value := values[i] cluster.db.Put(key, &db.DataEntity{Data: value}) } cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args)) return &reply.OkReply{}}
协调者的逻辑也很简单:
func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) { var errReply reply.ErrorReply txIdStr := strconv.FormatInt(txId, 10) respList := make([]redis.Reply, 0, len(peers)) for peer := range peers { var resp redis.Reply if peer == cluster.self { resp = Commit(cluster, c, makeArgs("commit", txIdStr)) } else { resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr)) } if reply.IsErrorReply(resp) { errReply = resp.(reply.ErrorReply) break } respList = append(respList, resp) } if errReply != nil { RequestRollback(cluster, c, txId, peers) return nil, errReply } return respList, nil}
回滚本地事务:
func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command") } txId := string(args[1]) raw, ok := cluster.transactions.Get(txId) if !ok { return reply.MakeIntReply(0) } tx, _ := raw.(*Transaction) err := tx.rollback() if err != nil { return reply.MakeErrReply(err.Error()) } return reply.MakeIntReply(1)}func (tx *Transaction) rollback() error { for key, blob := range tx.undoLog { if len(blob) > 0 { entity := &db.DataEntity{} err := gob.UnMarshal(blob, entity) // 反序列化事务前的快照 if err != nil { return err } tx.cluster.db.Put(key, entity) // 写入事务前的数据 } else { tx.cluster.db.Remove(key) // 若事务开始之前 key 不存在则将其删除 } } if tx.status != CommitedStatus { tx.cluster.db.UnLocks(tx.keys...) } tx.status = RollbackedStatus return nil}
协调者的逻辑与 commit 类似:
func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) { txIdStr := strconv.FormatInt(txId, 10) for peer := range peers { if peer == cluster.self { Rollback(cluster, c, makeArgs("rollback", txIdStr)) } else { cluster.Relay(peer, c, makeArgs("rollback", txIdStr)) } }}
转载地址:http://sxqzz.baihongyu.com/