限流需要我们根据不同的硬件条件做好压测,压测出一个接口或者一个服务在某种硬件配置下最大能承受的 QPS,根据这个结果配置限流规则,并且在后期需求的不断叠加,也需要对接口重新做压测,或者根据线上的实际表现不断调整限流的阈值。因此,限流可能很少使用,或者限流的阈值都会配置的比压测结果略大,这时就需要结合熔断降级做兜底。
Sentinel 支持对同一个资源配置多个相同类型或不同类型的规则,在配置了限流规则的基础上,我们还可以为同一资源配置熔断降级规则。当接口的 QPS 未达限流阈值却已经有很多请求超时的情况下,就可能达到熔断降级规则的阈值从而触发熔断,这就能很好地保护服务自身。
DegradeRule 规则类声明的字段如下:
public class DegradeRule extends AbstractRule {
// 可配置字段
private double count;
private int timeWindow;
private int grade = RuleConstant.DEGRADE_GRADE_RT;
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
// 非配置字段
private AtomicLong passCount = new AtomicLong(0);
private final AtomicBoolean cut = new AtomicBoolean(false);
}
DegradeSlot 是实现熔断降级的切入点,它作为 ProcessorSlot 插入到 ProcessorSlotChain 链表中,在 entry 方法中调用 Checker 去判断是否熔断当前请求,如果熔断则抛出 Block 异常。
Checker 并不是一个接口,而是一种检测行为,限流的 ckeck 由 FlowRuleChecker 实现,而熔断的 check 行为则由 DegradeRuleManager 负责,真正 check 逻辑判断由 DegradeRule 实现,流程如下图所示。
当 DegradeSlot#entry 方法被调用时,由 DegradeSlot 调用 DegradeRuleManager#checkDegrade 方法检查当前请求是否满足某个熔断降级规则。熔断规则配置由 DegradeRuleManager 加载,所以 DegradeSlot 将 check 逻辑交给 DegradeRuleManager 去完成,checkDegrade 方法的源码如下:
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
// 因为我们可以对同一个资源配置多个熔断降级规则,所以返回的将是一个集合。
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
DegradeRuleManager 首先根据资源名称获取配置的熔断降级规则,然后遍历熔断降级规则,调用 DegradeRule#passCheck 方法将检查是否需要触发熔断的逻辑交给 DegradeRule 完成。如果对一个资源配置多个熔断降级规则,那么只要有一个熔断降级规则满足条件,就会触发熔断。
DegradeRule#passCheck 方法源码如下:
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
if (cut.get()) {
return false;
}
// (1)
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
// (2)
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
}
// (3)
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
if (total < minRequestAmount) {
return true;
}
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
}
// (4)
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
// (5)
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
// 熔断
return false;
}
官方文档在介绍 DEGRADE_GRADE_EXCEPTION_COUNT 策略的地方加了使用注意说明,内容为:
注意由于统计时间窗口是分钟级别的,若 timeWindow 小于 60s,则结束熔断状态后仍可能再进入熔断状态。
这句话并不难理解,因为调用 ClusterNode#totalException 方法获取的是一分钟内的总异常数。StatisticNode 的 totalException 源码如下:
// 数组大小为 60,窗口时间长度为 1000 毫秒
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
@Override
public long totalException() {
// 获取 1 分钟的总异常数
return rollingCounterInMinute.exception();
}
也因如此,DEGRADE_GRADE_EXCEPTION_COUNT 这个熔断降级策略似乎使用场景不多,笔者也未曾使用过。
timeWindow、passCount、cut 是作者出于性能考虑而添加的,在配置熔断规则时,建议不要将 timeWindow 配置为 0 或者小于 0,可将 timeWindow 配置为 1000 毫秒,一个窗口时间长度,能减少一点计算就能降低一点 Sentinel 对性能的影响。
系统自适应限流就是在系统负载过高的情况下,自动切断后续请求,以保证服务的稳定运行。系统自适应限流也属于熔断降级的一种实现,而非限流降级,它与熔断降级都有一个共性,在保证服务稳定运行的情况下尽最大可能处理更多请求,一旦系统负载达到阈值就熔断请求。
SystemSlot 是实现系统自适应限流的切入点。DegradeSlot 在 ProcessorSlotChain 链表中被放在 FlowSlot 的后面,作为限流的兜底解决方案,而 SystemSlot 在 ProcessorSlotChain 链表中被放在 FlowSlot 的前面,强制优先考虑系统目前的情况能否处理当前请求,让系统尽可能跑在最大吞吐量的同时保证系统的稳定性。
系统自适应限流规则针对所有流量类型为 IN 的资源生效,因此不需要配置规则的资源名称。SystemRule 定义的字段如下:
public class SystemRule extends AbstractRule {
private double highestSystemLoad = -1;
private double highestCpuUsage = -1;
private double qps = -1;
private long avgRt = -1;
private long maxThread = -1;
}
如果配置了多个 SystemRule,则每个配置项只取最小值。例如三个 SystemRule 都配置了 qps,则取这三个规则中最小的 qps 作为限流阈值,这在调用 SystemRuleManager#loadRules 方法加载规则时完成。
public static void loadSystemConf(SystemRule rule) {
// 是否开启系统自适应限流判断功能
boolean checkStatus = false;
// highestSystemLoad
if (rule.getHighestSystemLoad() >= 0) {
// 多个规则都配置则取最小值
highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
highestSystemLoadIsSet = true;
// 开启系统自适应限流检查功能
checkStatus = true;
}
// highestCpuUsage
if (rule.getHighestCpuUsage() >= 0) {
if (rule.getHighestCpuUsage() > 1) {}
// [0,1)
else {
// 多个规则都配置则取最小值
highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
highestCpuUsageIsSet = true;
checkStatus = true;
}
}
// avgRt
if (rule.getAvgRt() >= 0) {
// 多个规则都配置则取最小值
maxRt = Math.min(maxRt, rule.getAvgRt());
maxRtIsSet = true;
checkStatus = true;
}
// maxThread
if (rule.getMaxThread() >= 0) {
// 多个规则都配置则取最小值
maxThread = Math.min(maxThread, rule.getMaxThread());
maxThreadIsSet = true;
checkStatus = true;
}
// qps
if (rule.getQps() >= 0) {
// 多个规则都配置则取最小值
qps = Math.min(qps, rule.getQps());
qpsIsSet = true;
checkStatus = true;
}
checkSystemStatus.set(checkStatus);
}
当 SystemSlot#entry 方法被调用时,由 SystemSlot 调用 SystemRuleManager#checkSystem 方法判断是否需要限流,流程如下图所示:
SystemRuleManager#checkSystem 方法从全局的资源指标数据统计节点 Constans.ENTRY_NODE 读取当前时间窗口的指标数据,判断总的 QPS、平均耗时这些指标数据是否达到阈值,或者总占用的线程数是否达到阈值,如果达到阈值则抛出 Block 异常(SystemBlockException)。除此之外,checkSystem 方法还实现了根据系统当前 Load 和 CPU 使用率限流。
SystemRuleManager#checkSystem 方法源码如下:
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// 如果有配置 SystemRule,则 checkSystemStatus 为 true
if (!checkSystemStatus.get()) {
return;
}
// 只限流类型为 IN 的流量
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// qps 限流
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// 占用线程数限流
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
// 平均耗时限流
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// 系统平均负载限流
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu 使用率限流
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
使用 TOP 命令可查看系统的平均负载(Load)和 CPU 使用率,如下图所示:
Sentinel 通过定时任务每秒钟使用 OperatingSystemMXBean API 获取这两个指标数据的值,代码如下:
@Override
public void run() {
try {
OperatingSystemMXBean osBean = ManagementFactory
.getPlatformMXBean(OperatingSystemMXBean.class);
// getSystemLoadAverage
currentLoad = osBean.getSystemLoadAverage();
// getSystemCpuLoad
currentCpuUsage = osBean.getSystemCpuLoad();
if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {
writeSystemStatusLog();
}
} catch (Throwable e) {
RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
}
}
private static boolean checkBbr(int currentThread) {
if (currentThread > 1 &&
currentThread >
Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
return false;
}
return true;
}
假设某接口的最大 QPS 为 800,处理一次请求的最小耗时为 5ms,那么至少需要并行的线程数与 Min RT 和 Max QPS 的关系为:
Max QPS = Threads * (1000/Min Rt)
推出:
Threads = Max QPS/(1000/Min Rt) = Max QPS * Min Rt/1000
替换 Min Rt 为 5ms、Max QPS 为 800,计算结果:
Threads = 800 * 5/1000 = 4
所以,checkBbr 方法中,(minRt/1000) 是将最小耗时的单位由毫秒转为秒,表示系统处理最多请求时的最小耗时,maxSuccessQps * (minRt/1000) 表示至少需要每秒多少个线程并行才能达到 maxSuccessQps。在系统负载比较高的情况下,只要并行占用的线程数超过该值就限流。但如果 Load 高不是由当前进程引起的,checkBbr 的效果就不明显。
参考文献: