随着微服务架构的流行,服务与服务之间交互的稳定性越来越重要。
当生产服务挂掉的时候,我们首先想到的是:激增流量、被其它服务拖垮、异常没处理。总之就是缺乏高可用防护和容错机制,尤其是针对流量的防护。
Sentinel是面向分布式服务架构的轻量级流量控制组件,以流量为切入点,从流量控制、熔断降级、自适应的系统保护等多个维度来保证系统的稳定性。
客户端和服务端进行通信时,两方的速率并不一定相等,如果客户端某一时间内发来的请求过多,会导致服务端处理不过来。
这时,服务端只能把处理不过来的数据放到缓存区里。如果缓存区满了,客户端还在发送数据,服务端没法存放只能把收到的数据包丢掉,而大量的丢包造成大量的网络资源浪费。
所以,需要控制客户端的发送速率,让发送和接收处于一定的平衡。这种控制手段,也就是流量控制。
在股市交易中的熔断是,当行情波动过大,终止交易15分钟,如果开盘后继续大幅波动,触发熔断机制,当天终止交易。
而程序中的熔断,是因为某个服务故障或者异常引起的,类似于股市熔断,当触达某个异常条件,直接熔断整个服务,而不是一直等到这个服务的响应。也是为了防止连锁反应造成雪崩,而采用的一种保护措施。
一般在接口响应速度很慢、超时的时候,触发熔断。
当服务器压力剧增的时候(系统压力过载),根据当前业务情况及流量,对一些服务和页面进行有策略的降级。以此来缓解服务器资源的的压力,保证核心业务的正常运行。“弃车保帅”的思想。
自动降级:1)通过配置的请求超时时间;2)不稳定API调用次数达到一定数量;3)所调用的远程服务出现故障
人工降级:1)秒杀场景;2)双十一大促场景
对系统维度进行保护,当系统负载很高时,如果仍持续让请求进入,可能会导致系统崩溃,无法响应。
即使是集群环境,当把压力过载的服务机器上的流量转发到别的机器上,如果别的机器也处于过载状态,则有可能把这台机器直接搞崩溃,最后导致整个集群不可用。
Sentinel提供的保护机制,让系统的入口流量和系统的负载达到一个平衡,进而保证系统在能力范围之内处理最多的请求。
Hystrix主要是以隔离和熔断为主的容错机制,超时或被熔断的调用将会快速失败,并可以提供 fallback 机制。用线程池隔离的方式,对资源进行完全的隔离,但是会引起大量的线程切换开销。性能并不是很高。
Sentinel侧重于多样化的流量控制,熔断降级,系统负载保护。是通过对并发线程数和服务的响应时间,来对流量进行控制和熔断降级,因此也就没有线程切换上的开销了,也不用预留线程池资源。性能更好。
xxxxxxxxxx
611package main
2
3import (
4 "fmt"
5 "log"
6 "net/http"
7
8 sentinel "github.com/alibaba/sentinel-golang/api"
9 "github.com/alibaba/sentinel-golang/core/base"
10 "github.com/alibaba/sentinel-golang/core/config"
11 "github.com/alibaba/sentinel-golang/core/flow"
12 "github.com/alibaba/sentinel-golang/logging"
13)
14
15func init() {
16 // 初始化 sentinel
17 conf := config.NewDefaultConfig()
18 conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
19 err := sentinel.InitWithConfig(conf)
20 if err != nil {
21 log.Fatal(err)
22 }
23
24 // 创建限流规则:资源名 testFlowQPSRule 阈值 2 统计间隔 1000毫秒 ==》 每秒最多两次请求
25 _, err = flow.LoadRules([]*flow.Rule{
26 {
27 Resource: "testFlowQPSRule", // 埋点资源名
28 TokenCalculateStrategy: flow.Direct, // 计算策略:Direct-目录,WarmUp-预热流控
29 ControlBehavior: flow.Reject, // 控制行为:Reject-拒绝,Throttling-匀速限流
30 Threshold: 2, // 阀值
31 StatIntervalInMs: 1000, // 统计间隔
32 },
33 })
34 if err != nil {
35 log.Fatalf("Unexpected error: %+v", err)
36 return
37 }
38}
39
40func main() {
41 server := http.Server{
42 Addr: "127.0.0.1:8080",
43 }
44
45 http.HandleFunc("/test_flow_rule", testFlowQPSRule)
46 server.ListenAndServe()
47}
48
49// 测试流控规则
50func testFlowQPSRule(w http.ResponseWriter, r *http.Request) {
51 resourceName := "testFlowQPSRule"
52
53 // 埋点(流控规则方式)
54 e, b := sentinel.Entry(resourceName, sentinel.WithTrafficType(base.Inbound))
55 if b != nil {
56 fmt.Fprintf(w, "限流")
57 } else {
58 fmt.Fprintf(w, "不限流")
59 e.Exit()
60 }
61}
xxxxxxxxxx
631package main
2
3import (
4 "fmt"
5 "log"
6 "net/http"
7
8 sentinel "github.com/alibaba/sentinel-golang/api"
9 "github.com/alibaba/sentinel-golang/core/base"
10 "github.com/alibaba/sentinel-golang/core/config"
11 "github.com/alibaba/sentinel-golang/core/flow"
12 "github.com/alibaba/sentinel-golang/logging"
13)
14
15func init() {
16 // 初始化 sentinel
17 conf := config.NewDefaultConfig()
18 conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
19 err := sentinel.InitWithConfig(conf)
20 if err != nil {
21 log.Fatal(err)
22 }
23
24 // 创建限流规则:资源名 testFlowWarmUpRule 阈值 2 统计间隔 1000毫秒 ==》 每秒最多两次请求
25 _, err = flow.LoadRules([]*flow.Rule{
26 {
27 Resource: "testFlowWarmUpRule", // 埋点资源名
28 TokenCalculateStrategy: flow.WarmUp, // 计算策略:Direct-目录,WarmUp-预热流控
29 ControlBehavior: flow.Reject, // 控制行为:Reject-拒绝,Throttling-匀速限流
30 Threshold: 10, // 阀值
31 WarmUpPeriodSec: 3, // 预热周期(秒)
32 WarmUpColdFactor: 3, // 预热冷因子
33 StatIntervalInMs: 1000, // 统计间隔
34 },
35 })
36 if err != nil {
37 log.Fatalf("Unexpected error: %+v", err)
38 return
39 }
40}
41
42func main() {
43 server := http.Server{
44 Addr: "127.0.0.1:8080",
45 }
46
47 http.HandleFunc("/test_flow_rule", testFlowWarmUpRule)
48 server.ListenAndServe()
49}
50
51// 测试流控规则
52func testFlowWarmUpRule(w http.ResponseWriter, r *http.Request) {
53 resourceName := "testFlowWarmUpRule"
54
55 // 埋点(流控规则方式)
56 e, b := sentinel.Entry(resourceName, sentinel.WithTrafficType(base.Inbound))
57 if b != nil {
58 fmt.Fprintf(w, "限流")
59 } else {
60 fmt.Fprintf(w, "不限流")
61 e.Exit()
62 }
63}
xxxxxxxxxx
731package main
2
3import (
4 "fmt"
5 "log"
6 "math/rand"
7 "net/http"
8 "time"
9
10 sentinel "github.com/alibaba/sentinel-golang/api"
11 "github.com/alibaba/sentinel-golang/core/config"
12 "github.com/alibaba/sentinel-golang/core/isolation"
13 "github.com/alibaba/sentinel-golang/logging"
14)
15
16func init() {
17 // 初始化 sentinel
18 conf := config.NewDefaultConfig()
19 conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
20 err := sentinel.InitWithConfig(conf)
21 if err != nil {
22 logging.Error(err, "fail")
23 return
24 }
25 logging.ResetGlobalLoggerLevel(logging.DebugLevel)
26
27 // 创建并发规则:资源名 testConcurrencyLimitationRule 并发请求数5
28 _, err = isolation.LoadRules([]*isolation.Rule{
29 {
30 Resource: "testConcurrencyLimitationRule", // 埋点资源名
31 MetricType: isolation.Concurrency, // Concurrency-并发
32 Threshold: 5, // 阈值,数值代表并发数
33 },
34 })
35 if err != nil {
36 log.Fatalf("Unexpected error: %+v", err)
37 return
38 }
39}
40
41func main() {
42 server := http.Server{
43 Addr: "127.0.0.1:8080",
44 }
45
46 http.HandleFunc("/test_concurrency_limitation_rule", testConcurrencyLimitationRule)
47 server.ListenAndServe()
48}
49
50// 测试并发规则
51func testConcurrencyLimitationRule(w http.ResponseWriter, r *http.Request) {
52 resourceName := "testConcurrencyLimitationRule"
53 ch := make(chan struct{})
54 for i := 0; i < 10; i++ { // 利用循环goroutine并发
55 go func() {
56 for j := 0; j < 10; j++{
57 // 埋点
58 e, b := sentinel.Entry(resourceName, sentinel.WithBatchCount(1))
59 if b != nil {
60 sleepTime := time.Duration(rand.Uint64()%20) * time.Millisecond
61 time.Sleep(sleepTime)
62 fmt.Fprintf(w, "[Isolation] Blocked reason %v %v %v %v %v\n", b.BlockType().String(), "rule", b.TriggeredRule(), "snapshot", b.TriggeredValue())
63 } else {
64 sleepTime := time.Duration(rand.Uint64()%80+10) * time.Millisecond
65 time.Sleep(sleepTime)
66 fmt.Fprintf(w, "[Isolation] Passed\n")
67 e.Exit()
68 }
69 }
70 }()
71 }
72 <-ch
73}
xxxxxxxxxx
881package main
2
3import (
4 "fmt"
5 "log"
6 "math/rand"
7 "net/http"
8 "time"
9
10 sentinel "github.com/alibaba/sentinel-golang/api"
11 "github.com/alibaba/sentinel-golang/core/base"
12 "github.com/alibaba/sentinel-golang/core/circuitbreaker"
13 "github.com/alibaba/sentinel-golang/core/config"
14 "github.com/alibaba/sentinel-golang/logging"
15 "github.com/alibaba/sentinel-golang/util"
16)
17
18type slowRtRatioStateChangeListener struct {
19}
20
21func (s *slowRtRatioStateChangeListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
22 fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
23}
24
25func (s *slowRtRatioStateChangeListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
26 fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
27}
28
29func (s *slowRtRatioStateChangeListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
30 fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
31}
32
33func init() {
34 // 初始化 sentinel
35 conf := config.NewDefaultConfig()
36 conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
37 err := sentinel.InitWithConfig(conf)
38 if err != nil {
39 log.Fatal(err)
40 }
41
42 circuitbreaker.RegisterStateChangeListeners(&slowRtRatioStateChangeListener{})
43
44 // 创建熔断规则:资源名 testCircuitBreakerSlowRtRatioRule 最大慢请求40ms 慢请求率50% 统计间隔 5000ms 恢复时间 3000ms
45 _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
46 {
47 Resource: "testCircuitBreakerSlowRtRatioRule", // 埋点资源名
48 Strategy: circuitbreaker.SlowRequestRatio, // lowRequestRatio-慢请求,ErrorRatio-请求错误率,ErrorCount-请求错误量
49 RetryTimeoutMs: 3000, // 熔断时长
50 MinRequestAmount: 10, // 最小请求数
51 StatIntervalMs: 5000, // 统计时间
52 MaxAllowedRtMs: 40, // 慢请求的时间阈值
53 Threshold: 0.5, // 阈值,数值根据strategy分别代表:最大慢请求比率(SlowRequestRatio);最大错误请求比率(ErrorRatio);最大错误请求计数(ErrorCount)
54 },
55 })
56 if err != nil {
57 log.Fatalf("Unexpected error: %+v", err)
58 return
59 }
60}
61
62func main() {
63 server := http.Server{
64 Addr: "127.0.0.1:8080",
65 }
66
67 http.HandleFunc("/test_circuit_breaker_slow_rt_ratio_rule", testCircuitBreakerSlowRtRatioRule)
68 server.ListenAndServe()
69}
70
71// 测试熔断规则
72func testCircuitBreakerSlowRtRatioRule(w http.ResponseWriter, r *http.Request) {
73 resourceName := "testCircuitBreakerSlowRtRatioRule"
74
75 // 埋点
76 e, b := sentinel.Entry(resourceName, sentinel.WithTrafficType(base.Inbound))
77 if b != nil {
78 sleepTime := time.Duration(rand.Uint64()%20) * time.Millisecond
79 time.Sleep(sleepTime)
80 fmt.Fprintf(w, "熔断")
81 } else {
82 // 模拟随机响应时间
83 sleepTime := time.Duration(rand.Uint64()%80+10) * time.Millisecond
84 time.Sleep(sleepTime)
85 fmt.Fprintf(w, "不熔断")
86 e.Exit()
87 }
88}
xxxxxxxxxx
771package main
2
3import (
4 "fmt"
5 "log"
6 "math/rand"
7 "net/http"
8 "time"
9
10 sentinel "github.com/alibaba/sentinel-golang/api"
11 "github.com/alibaba/sentinel-golang/core/config"
12 "github.com/alibaba/sentinel-golang/core/hotspot"
13 "github.com/alibaba/sentinel-golang/logging"
14)
15
16func init() {
17 // 初始化 sentinel
18 conf := config.NewDefaultConfig()
19 conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
20 err := sentinel.InitWithConfig(conf)
21 if err != nil {
22 logging.Error(err, "fail")
23 return
24 }
25 logging.ResetGlobalLoggerLevel(logging.DebugLevel)
26
27 // 创建并发规则:资源名 testHotSpotParamsQpsRejectRule 并发请求数5
28 _, err = hotspot.LoadRules([]*hotspot.Rule{
29 {
30 Resource: "testHotSpotParamsQpsRejectRule", // 埋点资源名
31 MetricType: hotspot.QPS, // qps-请求量,Concurrency-并发
32 ControlBehavior: hotspot.Reject, // 控制行为:Reject-拒绝,Throttling-匀速限流
33 ParamIndex: 1, // 上下文参数切片中的索引
34 Threshold: 5, // 阈值,数值代表请求量或并发数
35 BurstCount: 0, // 沉默计数,QPS和Reject时有用
36 DurationInSec: 1, // 统计间隔,QPS时有用
37 },
38 })
39 if err != nil {
40 log.Fatalf("Unexpected error: %+v", err)
41 return
42 }
43}
44
45func main() {
46 server := http.Server{
47 Addr: "127.0.0.1:8080",
48 }
49
50 http.HandleFunc("/test_hot_spot_params_qps_reject_rule", testHotSpotParamsQpsRejectRule)
51 server.ListenAndServe()
52}
53
54// 测试热点防护规则
55func testHotSpotParamsQpsRejectRule(w http.ResponseWriter, r *http.Request) {
56 resourceName := "testHotSpotParamsQpsRejectRule"
57 ch := make(chan struct{})
58 for i := 0; i < 10; i++ { // 利用循环goroutine并发
59 go func() {
60 for j := 0; j < 10; j++ {
61 // 埋点
62 e, b := sentinel.Entry(resourceName, sentinel.WithArgs(true, rand.Uint32()%30, "sentinel"))
63 if b != nil {
64 sleepTime := time.Duration(rand.Uint64()%20) * time.Millisecond
65 time.Sleep(sleepTime)
66 fmt.Fprintf(w, "[HotSpot Reject] Blocked reason %v %v %v %v %v\n", b.BlockType().String(), "rule", b.TriggeredRule(), "snapshot", b.TriggeredValue())
67 } else {
68 sleepTime := time.Duration(rand.Uint64()%20) * time.Millisecond
69 time.Sleep(sleepTime)
70 fmt.Fprintf(w, "[HotSpot Reject] Passed\n")
71 e.Exit()
72 }
73 }
74 }()
75 }
76 <-ch
77}
1、初始化
使用sentinel,需要在程序启动时对sentinel进行相关初始化配置。
2、配置规则
根据业务场景的要求来配置相应的规则。
3、埋点
通过sentinel的Entry()方法,将业务逻辑封装起来,也就是“埋点”。每个埋点都需要指定规则的资源名称(resource),代表触发了这个规则的调用或访问。