|
作者:kylinkzhang,腾讯CSIG后台开发工程师一致性Hash算法是解决分布式缓存等问题的一种算法,本文介绍了一致性Hash算法的原理,并给出了一种实现和实际运用的案例;一致性Hash算法背景考虑这么一种场景:我们有三台缓存服务器编号node0、node1、node2,现在有3000万个key,希望可以将这些个key均匀的缓存到三台机器上,你会想到什么方案呢?我们可能首先想到的方案是:取模算法hash(key)%N,即:对key进行hash运算后取模,N是机器的数量;这样,对key进行hash后的结果对3取模,得到的结果一定是0、1或者2,正好对应服务器node0、node1、node2,存取数据直接找对应的服务器即可,简单粗暴,完全可以解决上述的问题;取模算法虽然使用简单,但对机器数量取模,在集群扩容和收缩时却有一定的局限性:因为在生产环境中根据业务量的大小,调整服务器数量是常有的事;而服务器数量N发生变化后hash(key)%N计算的结果也会随之变化!比如:一个服务器节点挂了,计算公式从hash(key)%3变成了hash(key)%2,结果会发生变化,此时想要访问一个key,这个key的缓存位置大概率会发生改变,那么之前缓存key的数据也会失去作用与意义;大量缓存在同一时间失效,造成缓存的雪崩,进而导致整个缓存系统的不可用,这基本上是不能接受的;为了解决优化上述情况,一致性hash算法应运而生~一致性Hash算法详述算法原理一致性哈希算法在1997年由麻省理工学院提出,是一种特殊的哈希算法,在移除或者添加一个服务器时,能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系;一致性哈希解决了简单哈希算法在分布式哈希表(DistributedHashTable,DHT)中存在的动态伸缩等问题;一致性hash算法本质上也是一种取模算法;不过,不同于上边按服务器数量取模,一致性hash是对固定值2^32取模;IPv4的地址是4组8位2进制数组成,所以用2^32可以保证每个IP地址会有唯一的映射;①hash环我们可以将这2^32个值抽象成一个圆环⭕️,圆环的正上方的点代表0,顺时针排列,以此类推:1、2、3…直到2^32-1,而这个由2的32次方个点组成的圆环统称为hash环;②服务器映射到hash环在对服务器进行映射时,使用hash(服务器ip)%2^32,即:使用服务器IP地址进行hash计算,用哈希后的结果对2^32取模,结果一定是一个0到2^32-1之间的整数;而这个整数映射在hash环上的位置代表了一个服务器,依次将node0、node1、node2三个缓存服务器映射到hash环上;③对象key映射到服务器在对对应的Key映射到具体的服务器时,需要首先计算Key的Hash值:hash(key)%2^32;注:此处的Hash函数可以和之前计算服务器映射至Hash环的函数不同,只要保证取值范围和Hash环的范围相同即可(即:2^32);将Key映射至服务器遵循下面的逻辑:从缓存对象key的位置开始,沿顺时针方向遇到的第一个服务器,便是当前对象将要缓存到的服务器;假设我们有"semlinker"、"kakuqo"、"lolo"、"fer"四个对象,分别简写为o1、o2、o3和o4;首先,使用哈希函数计算这个对象的hash值,值的范围是[0,2^32-1]:图中对象的映射关系如下:hash(o1)=k1;hash(o2)=k2;hash(o3)=k3;hash(o4)=k4;同时3台缓存服务器,分别为CS1、CS2和CS3:则可知,各对象和服务器的映射关系如下:K1 => CS1K4 => CS3K2 => CS2K3 => CS1即:以上便是一致性Hash的工作原理;可以看到,一致性Hash就是:将原本单个点的Hash映射,转变为了在一个环上的某个片段上的映射!下面我们来看几种服务器扩缩容的场景;服务器扩缩容场景①服务器减少假设CS3服务器出现故障导致服务下线,这时原本存储于CS3服务器的对象o4,需要被重新分配至CS2服务器,其它对象仍存储在原有的机器上:此时受影响的数据只有CS2和CS3服务器之间的部分数据!②服务器增加假如业务量激增,我们需要增加一台服务器CS4,经过同样的hash运算,该服务器最终落于t1和t2服务器之间,具体如下图所示:此时,只有t1和t2服务器之间的部分对象需要重新分配;在以上示例中只有o3对象需要重新分配,即它被重新到CS4服务器;在前面我们已经说过:如果使用简单的取模方法,当新添加服务器时可能会导致大部分缓存失效,而使用一致性哈希算法后,这种情况得到了较大的改善,因为只有少部分对象需要重新分配!数据偏斜&服务器性能平衡问题引出问题在上面给出的例子中,各个服务器几乎是平均被均摊到Hash环上;但是在实际场景中很难选取到一个Hash函数这么完美的将各个服务器散列到Hash环上;此时,在服务器节点数量太少的情况下,很容易因为节点分布不均匀而造成数据倾斜问题;如下图被缓存的对象大部分缓存在node-4服务器上,导致其他节点资源浪费,系统压力大部分集中在node-4节点上,这样的集群是非常不健康的:同时,还有另一个问题:在上面新增服务器CS4时,CS4只分担了CS1服务器的负载,服务器CS2和CS3并没有因为CS4服务器的加入而减少负载压力;如果CS4服务器的性能与原有服务器的性能一致甚至可能更高,那么这种结果并不是我们所期望的;虚拟节点针对上面的问题,我们可以通过:引入虚拟节点来解决负载不均衡的问题:即将每台物理服务器虚拟为一组虚拟服务器,将虚拟服务器放置到哈希环上,如果要确定对象的服务器,需先确定对象的虚拟服务器,再由虚拟服务器确定物理服务器;如下图所示:在图中:o1和o2表示对象,v1~v6表示虚拟服务器,s1~s3表示实际的物理服务器;虚拟节点的计算虚拟节点的hash计算通常可以采用:对应节点的IP地址加数字编号后缀hash(10.24.23.227#1)的方式;举个例子,node-1节点IP为10.24.23.227,正常计算node-1的hash值:hash(10.24.23.227#1)%2^32假设我们给node-1设置三个虚拟节点,node-1#1、node-1#2、node-1#3,对它们进行hash后取模:hash(10.24.23.227#1)%2^32hash(10.24.23.227#2)%2^32hash(10.24.23.227#3)%2^32注意:分配的虚拟节点个数越多,映射在hash环上才会越趋于均匀,节点太少的话很难看出效果;引入虚拟节点的同时也增加了新的问题,要做虚拟节点和真实节点间的映射,对象key->虚拟节点->实际节点之间的转换;使用场景一致性hash在分布式系统中应该是实现负载均衡的首选算法,它的实现比较灵活,既可以在客户端实现,也可以在中间件上实现,比如日常使用较多的缓存中间件memcached和redis集群都有用到它;memcached的集群比较特殊,严格来说它只能算是伪集群,因为它的服务器之间不能通信,请求的分发路由完全靠客户端来的计算出缓存对象应该落在哪个服务器上,而它的路由算法用的就是一致性hash;还有redis集群中hash槽的概念,虽然实现不尽相同,但思想万变不离其宗,看完本篇的一致性hash,你再去理解redis槽位就轻松多了;其它的应用场景还有很多:RPC框架Dubbo用来选择服务提供者分布式关系数据库分库分表:数据与节点的映射关系LVS负载均衡调度器……一致性Hash算法实现下面我们根据上面的讲述,使用Golang实现一个一致性Hash算法,这个算法具有一些下面的功能特性:一致性Hash核心算法;支持自定义Hash算法;支持自定义虚拟节点个数;具体源代码见:https://github.com/JasonkayZK/consistent-hashing-demo下面开始实现吧!结构体、错误以及常量定义①结构体定义首先定义每一台缓存服务器的数据结构:core/host.gotype Host struct { // the host id: ip:port Name string // the load bound of the host LoadBound int64}其中:Name:缓存服务器的Ip地址+端口,如:127.0.0.1:8000LoadBound:缓存服务器当前处理的“请求”缓存数,这个字段在后文含有负载边界值的一致性Hash中会用到;其次,定义一致性Hash的结构:core/algorithm.go// Consistent is an implementation of consistent-hashing-algorithmtype Consistent struct { // the number of replicas replicaNum int // the total loads of all replicas totalLoad int64 // the hash function for keys hashFunc func(key string) uint64 // the map of virtual nodes to hosts hostMap map[string]*Host // the map of hashed virtual nodes to host name replicaHostMap map[uint64]string // the hash ring sortedHostsHashSet []uint64 // the hash ring lock sync.RWMutex}其中:replicaNum:表示每个真实的缓存服务器在Hash环中存在的虚拟节点数;totalLoad:所有物理服务器对应的总缓存“请求”数(这个字段在后文含有负载边界值的一致性Hash中会用到);hashFunc:计算Hash环映射以及Key映射的散列函数;hostMap:物理服务器名称对应的Host结构体映射;replicaHostMap:Hash环中虚拟节点对应真实缓存服务器名称的映射;sortedHostsHashSet:Hash环;sync.RWMutex:操作Hash环时用到的读写锁;大概的结构如上所示,下面我们来看一些常量和错误的定义;②常量和错误定义常量的定义如下:core/algorithm.goconst ( // The format of the host replica name hostReplicaFormat = `%s%d`)var ( // the default number of replicas defaultReplicaNum = 10 // the load bound factor // ref: https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html loadBoundFactor = 0.25 // the default Hash function for keys defaultHashFunc = func(key string) uint64 { out := sha512.Sum512([]byte(key)) return binary.LittleEndian.Uint64(out[:]) })分别表示:defaultReplicaNum:默认情况下,每个真实的物理服务器在Hash环中虚拟节点的个数;loadBoundFactor:负载边界因数(这个字段在后文含有负载边界值的一致性Hash中会用到);defaultHashFunc:默认的散列函数,这里用到的是SHA512算法,并取的是unsignedint64,这一点和上面介绍的0~2^32-1有所区别!hostReplicaFormat:虚拟节点名称格式,这里的虚拟节点的格式为:%s%d,和上文提到的10.24.23.227#1的格式有所区别,但是道理是一样的!还有一些错误的定义:core/error.govar ( ErrHostAlreadyExists = errors.New("host already exists") ErrHostNotFound = errors.New("host not found"))分别表示服务器已经注册,以及缓存服务器未找到;下面来看具体的方法实现!注册/注销缓存服务器①注册缓存服务器注册缓存服务器的代码如下:core/algorithm.gofunc (c *Consistent) RegisterHost(hostName string) error { c.Lock() defer c.Unlock() if _, ok := c.hostMap[hostName]; ok { return ErrHostAlreadyExists } c.hostMap[hostName] = &Host{ Name: hostName, LoadBound: 0, } for i := 0; i val { r = m - 1 } } if idx != -1 { c.sortedHostsHashSet = append(c.sortedHostsHashSet[:idx], c.sortedHostsHashSet[idx+1:]...) }}和注册缓存服务器相反,将服务器在Map映射以及Hash环中去除即完成了注销;这里的逻辑和上面注册的逻辑极为类似,这里不再赘述!查询Key(核心)查询Key是整个一致性Hash算法的核心,但是实现起来也并不复杂;代码如下:core/algorithm.gofunc (c *Consistent) GetKey(key string) (string, error) { hashedKey := c.hashFunc(key) idx := c.searchKey(hashedKey) return c.replicaHostMap[c.sortedHostsHashSet[idx]], nil}func (c *Consistent) searchKey(key uint64) int { idx := sort.Search(len(c.sortedHostsHashSet), func(i int) bool { return c.sortedHostsHashSet[i] >= key }) if idx >= len(c.sortedHostsHashSet) { // make search as a ring idx = 0 } return idx}代码首先计算key的散列值;随后,在Hash环上“顺时针”寻找可以缓存的第一台缓存服务器:idx := sort.Search(len(c.sortedHostsHashSet), func(i int) bool { return c.sortedHostsHashSet[i] >= key})注意到,如果key比当前Hash环中最大的虚拟节点的hash值还大,则选择当前Hash环中hash值最小的一个节点(即“环形”的逻辑):if idx >= len(c.sortedHostsHashSet) { // make search as a ring idx = 0}searchKey返回了虚拟节点在Hash环数组中的index;随后,我们使用map返回index对应的缓存服务器的名称即可;至此,一致性Hash算法基本实现,接下来我们来验证一下。一致性Hash算法实践与检验算法验证前准备①缓存服务器准备在验证算法之前,我们还需要准备几台缓存服务器;为了简单起见,这里使用了HTTP服务器作为缓存服务器,具体代码如下所示:server/main.gopackage mainimport ( "flag" "fmt" "net/http" "sync" "time")type CachedMap struct { KvMap sync.Map Lock sync.RWMutex}var ( cache = CachedMap{KvMap: sync.Map{}} port = flag.String("p", "8080", "port") regHost = "http://localhost:18888" expireTime = 10)func main() { flag.Parse() stopChan := make(chan interface{}) startServer(*port) = len(c.replicaHostMap) { i = 0 } }}func (c *Consistent) checkLoadCapacity(host string) (bool, error) { // a safety check if someone performed c.Done more than needed if c.totalLoad
|
|