本文共 3500 字,大约阅读时间需要 11 分钟。
在分布式系统中,缓存操作往往面临一致性问题。尤其是在 Redis 集群环境中,像 DEL、MSET 等命令涉及的 key 可能分布在多个节点上,如何保证一致性操作的原子性便成为一个关键问题。本文将介绍一种基于两阶段提交(2PC)协议的解决方案。
在 Redis 集群中,MSET 命令涉及的 key 可能分布在多个节点上。为了确保操作的原子性,最简单的方法是逐个节点执行操作。例如,MGET 命令的实现方式如下:
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { ... // 计算每个 key 所在的节点,并按照节点分组 groupMap := cluster.groupBy(keys) for peer, group := range groupMap { resp := cluster.Relay(peer, c, makeArgs("MGET", group...)) ... }} 然而,这种方法存在一个问题:如果某个节点操作失败,整个操作会立即终止,导致部分 key 未能正确设置。这对使用者来说,难以处理。因此,我们需要一种方式来保证所有操作要么全部成功要么全部失败。
两阶段提交(2PC)是一种简单且直观的分布式一致性协议。该协议将写操作分为两个阶段:
Prepare 阶段
协调者向所有参与者发送事务内容,询问是否可以执行事务操作。具体步骤如下:Commit 阶段
协调者向所有参与者发送提交请求,参与者执行操作并释放锁。若所有参与者成功提交,协调者认为事务完成。Rollback 阶段
如果协调者在第一阶段未收到所有参与者的确认,或者在第二阶段出现异常,协调者将向所有参与者发送回滚请求。我们定义了一个事务描述结构 Transaction,用于存储事务的相关信息:
type Transaction struct { id string // 事务 ID,由 snowflake 算法生成 args [][]byte // 命令参数 cluster *Cluster conn redis.Connection keys []string // 事务中涉及的 key undoLog map[string][]byte // 每个 key 在事务执行前的值,用于回滚事务} 在 Prepare 阶段,参与者执行以下操作:
func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { ... tx := NewTransaction(cluster, c, txId, txArgs, keys) cluster.transactions.Put(txId, tx) err := tx.prepare() ...} 事务准备逻辑如下:
func (tx *Transaction) prepare() error { // 锁定相关 key tx.cluster.db.Locks(tx.keys...) // 准备 undo log tx.undoLog = make(map[string][]byte) for _, key := range tx.keys { entity, ok := tx.cluster.db.Get(key) if ok { blob, err := gob.Marshal(entity) if err != nil { return err } tx.undoLog[key] = blob } else { tx.undoLog[key] = []byte{} } } tx.status = PreparedStatus return nil} 在 Commit 阶段,参与者执行以下操作:
func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply { ... for i, key := range keys { value := values[i] cluster.db.Put(key, &db.DataEntity{Data: value}) } ...} 协调者的逻辑如下:
func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) { ... for peer := range peers { if peer == cluster.self { resp = Commit(cluster, c, makeArgs("commit", txIdStr)) } else { resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr)) } ... } ...} 在 Rollback 阶段,参与者执行以下操作:
func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { ... for key, blob := range tx.undoLog { if len(blob) > 0 { entity := &db.DataEntity{} err := gob.UnMarshal(blob, entity) ... } else { cluster.db.Remove(key) } } ...} 协调者的逻辑如下:
func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) { ... for peer := range peers { if peer == cluster.self { Rollback(cluster, c, makeArgs("rollback", txIdStr)) } else { cluster.Relay(peer, c, makeArgs("rollback", txIdStr)) } }} 两阶段提交(2PC)协议是一种简单有效的分布式一致性解决方案。它通过锁定资源和准备回滚日志,确保事务要么全部成功要么全部失败。在实际实现中,我们需要注意以下几点:
通过上述方法,我们可以在分布式 Redis 集群中实现高可靠的 MSET 操作,确保缓存一致性。
转载地址:http://sxqzz.baihongyu.com/