|
ES分片均衡策略分析与改进
2024年01月10日 09:00
416 从故障说起ES分片均衡策略概念均衡策略触发条件如何改进分片数量调整自研分片均衡工具未来展望总结一下参考文献从故障说起某日早高峰收到 Elasticsearch 大量查询超时告警,不同于以往,查看 Elasticsearch 查询队列监控后发现,仅123节点存在大量查询请求堆积。各节点查询队列堆积情况查看节点监控发现,123节点的 IO 占用远高于其他节点。节点间IO占用对比最终查明原因是,某些高负载(高读写 qps、大数据量、复杂查询)的索引分片相对集中在123节点上,导致整体集群节点间负载不均衡,严重时甚至引发故障。那么这种现象是普遍存在的吗?在观察了其他集群后发现,各个集群均有不同程度的“负载不均衡”现象,同为数据节点,有的 IO 比其他节点高出一倍以上,有的 CPU 占用大幅超过其他节点。我们知道 Elasticsearch 的读写速度遵循木桶原理,即某台节点如果表现不佳的话会拖慢整个请求的 RT。严重时甚至出现线上故障。因此,尽可能的保持节点间负载的均衡,既能保证集群性能处于最优状态,又能最大限度利用硬件资源,保障集群稳定性。各节点IO占用情况各节点CPU占用情况然而,其实 Elasticsearch 自己是有集群分配自动均衡能力的,但是为什么还会出现上述情况呢?这里需要我们深入分析一下。ES分片均衡策略概念首先来看下 LLM 对 Elasticsearch 分片均衡的解释:Elasticsearch 的分片均衡是指将索引的分片均匀地分布在集群中的各个节点上,以实现负载均衡和高可用性。在 Elasticsearch 中,索引被分为多个分片,每个分片可以在不同的节点上进行存储和处理。分片均衡的目标是确保每个节点上的分片数量大致相等,避免某些节点负载过重,而其他节点负载较轻的情况。似乎只提到了“确保每个节点上的分片数量大致相等”,事实是这样吗?不多说,直接翻源码一探究竟。均衡策略这里展示了核心的分片权重计算部分代码,indexBalance 和 shardBalance 分别表示索引维度权重因子和分片维度权重因子。node.numShards() 表示该节点目前总共的分片数。balancer.avgShardsPerNode() 表示总分片数/总结点数,也就是理想状态下单个节点应该被分配的分片数。node.numShards(index) 表示该索引在当前节点的分片数。balancer.avgShardsPerNode(index) 表示该索引总分片数/总结点数,也就是理想状态下单个节点该索引应该被分配的分片数。 WeightFunction(float indexBalance, float shardBalance) { float sum = indexBalance + shardBalance; if (sum 0 but was: " + sum); } theta0 = shardBalance / sum; theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; } float weight(Balancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); return theta0 * weightShard + theta1 * weightIndex; }即权重公式为:分片权重因子 * (当前节点分片数 - 理想平均分片数) + 索引权重因子 * (当前节点该索引分片数 - 理想该索引平均分片数) // get the delta between the weights of the node we are checking and the node that holds the shard float currentDelta = absDelta(nodeWeight, currentWeight); // checks if the weight delta is above a certain threshold; if it is not above a certain threshold, // then even though the node we are examining has a better weight and may make the cluster balance // more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless // the gains make it worth it, as defined by the threshold boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false;当我们得到节点当前的权重后,下一步需要和其他节点权重相比较,若权重差值超过 threshold ,则认为当前分片分布不均匀,需要重新均衡。相关配置上述逻辑涉及到以下几个配置:cluster.routing.allocation.balance.shard: 0.45f(默认)分片权重因子 越高节点层面分片数量的均衡权重越高cluster.routing.allocation.balance.index: 0.55f(默认)索引权重因子 越高索引层面分片数量的均衡权重越高cluster.routing.allocation.balance.threshold: 1.0f(默认)重新分配分片阈值 越高触发自均衡的条件越苛刻除此之外还有以下配置涉及分片分布、均衡:cluster.routing.allocation.total_shards_per_node: -1(默认)单节点最大允许的分片数量,默认不限制cluster.routing.allocation.cluster_concurrent_rebalance: 2(默认)集群内因“不均衡”而触发的最大分片迁移并发数量。cluster.routing.allocation.node_concurrent_recoveries: 2(默认)单个节点因“不均衡”而触发的最大分片迁移并发数量。举个栗子分片均衡前假设当前集群有两个节点,两个索引(A索引4分片,B索引2分片,C索引2分片),分布状态如上图。此时Node 1的权重为 0.45 * (4 - 3) + 0.55 * (2 - 1.5) = 0.725,Node 2的权重为0.45 * (2 - 3) + 0.55 * (1 - 1.5) = -0.725。Node 1和Node 2的权重差为 1.45 超过了阈值 1,此时就可能出发索引分片迁移。将分配从高权重节点挪到低权重节点。分片均衡后触发条件在实际生产环境中,有时尽管满足了上述的“权重公式”,但却没有发生分配自动均衡。这是因为除了满足“权重公式”以外,还需要满足以下条件。均衡范围cluster.routing.rebalance.enable: all public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { if (allocation.ignoreDisable()) { return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of rebalancing"); } Settings indexSettings = allocation.metadata().getIndexSafe(shardRouting.index()).getSettings(); final Rebalance enable; final boolean usedIndexSetting; if (INDEX_ROUTING_REBALANCE_ENABLE_SETTING.exists(indexSettings)) { enable = INDEX_ROUTING_REBALANCE_ENABLE_SETTING.get(indexSettings); usedIndexSetting = true; } else { enable = this.enableRebalance; usedIndexSetting = false; } switch (enable) { case ALL: return allocation.decision(Decision.YES, NAME, "all rebalancing is allowed"); case NONE: return allocation.decision(Decision.NO, NAME, "no rebalancing is allowed due to %s", setting(enable, usedIndexSetting)); case RIMARIES: if (shardRouting.primary()) { return allocation.decision(Decision.YES, NAME, "primary rebalancing is allowed"); } else { return allocation.decision( Decision.NO, NAME, "replica rebalancing is forbidden due to %s", setting(enable, usedIndexSetting) ); } case REPLICAS: if (shardRouting.primary() == false) { return allocation.decision(Decision.YES, NAME, "replica rebalancing is allowed"); } else { return allocation.decision( Decision.NO, NAME, "primary rebalancing is forbidden due to %s", setting(enable, usedIndexSetting) ); } default: throw new IllegalStateException("Unknown rebalance option"); } }all:允许所有分片自动均衡(默认)primaries:只允许主分片自动均衡replicas:只允许副本分片自动均衡none:不允许自动均衡分片状态不健康的分片状态,可能会导致分片均衡失败,甚至因失败而丢失数据。cluster.routing.allocation.allow_rebalance: indices_all_active public Decision canRebalance(RoutingAllocation allocation) { final RoutingNodes routingNodes = allocation.routingNodes(); switch (type) { case INDICES_PRIMARIES_ACTIVE: // check if there are unassigned primaries. if (routingNodes.hasUnassignedPrimaries()) { return NO_UNASSIGNED_PRIMARIES; } // check if there are initializing primaries that don't have a relocatingNodeId entry. if (routingNodes.hasInactivePrimaries()) { return NO_INACTIVE_PRIMARIES; } return YES_ALL_PRIMARIES_ACTIVE; case INDICES_ALL_ACTIVE: // check if there are unassigned shards. if (routingNodes.hasUnassignedShards()) { return NO_UNASSIGNED_SHARDS; } // in case all indices are assigned, are there initializing shards which // are not relocating? if (routingNodes.hasInactiveShards()) { return NO_INACTIVE_SHARDS; } // fall-through default: // all shards active from above or type == Type.ALWAYS return YES_ALL_SHARDS_ACTIVE; } }always:无视分片状态,始终允许自动均衡indices_primaries_active:仅在所有主分片可用时indices_all_active:仅当所有分片都激活时(默认)磁盘水位如果分片均衡的目标节点磁盘使用率过高,此时不应该继续往改节点分配分片。cluster.routing.allocation.disk.threshold_enabled: true(默认)启用基于磁盘的分发策略cluster.routing.allocation.disk.watermark.low: "85%"(默认)硬盘使用率高于这个值的节点,则不会分配分片cluster.routing.allocation.disk.watermark.high: "90%"(默认)如果硬盘使用率高于这个值,则会重新分片该节点的分片到别的节点 public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { ... ... if (freeDiskPercentage
|
|