grpc上手使用
安装
golang
版本的grpc
要求go
版本要在1.6
以上
install gRPC
使用go get
命令安装grpc
包
1 | $ go get -u google.golang.org/grpc |
由于某些不可逆原因,上面命令会报连接超时,可以到
github
上将项目clone
到$GOPATH/src/google.golang.org/
下
1
2
3 > $ cd $GOPATH/src/google.golang.org
> $ git clone git@github.com:grpc/grpc-go.git grpc
>
install Protocol Buffers v3
grpc
默认使用protobuf
作为序列化工具。
- 打开Releases页面,下载对应平台的
.zip
包protoc-<version>-<platform>.zip
- 解压
- 添加二进制文件路径导
PATH
环境变量
install protoc plugin
安装golang
版本对应的protobuf
生成工具
1 | $ go get -u github.com/golang/protobuf/protoc-gen-go |
运行demo
进入example
目录
1 | $ cd $GOPATH/src/google.golang.org/grpc/examples/helloworld |
删除原来的helloworld.pb.go
文件,并使用protoc
生成自己生成一个
1 | $ rm helloworld/helloworld.pb.go // 删除原来的helloworld.pb.go文件 |
编写grpc
接口时,在.proto
文件定义接口通信数据格式和接口信息,然后通过protoc
自动生成对应的go
代码,大大方便了开发
-I PATH
:specify the directory in which to search for imports. May be specified multiple times; directories will be searched in order. If not given, the current working directory is used.--go_out
:指定输出go
代码plugins=grpc
:.proto
中的service
是grpc
扩展的功能,需要使用grpc
插件进行解析才能生成对应的接口定义代码。
运行 grpc server
和 grpc client
1 | $ go run greeter_server/main.go // 启动grpc server |
实践
使用grpc
开发一个简单的求和服务。
定义.proto文件
在项目下创建proto/sum.proto
文件:
1 | syntax = "proto3"; // 使用 proto3 |
根据接口描述文件生成源码
进入proto
目录,执行
1 | $ protoc sum.proto --go_out=plugins=grpc:. |
可以看到,在本目录下生成sum.pb.go
文件,且package
为proto
开发服务端接口
首先查看生成的sum.pb.go
文件,可以看到根据sum.proto
文件中的CalcSvc
接口定义生成了对应的接口:
1 | // CalcSvcServer is the server API for CalcSvc service. |
开发服务端接口只要就是根据这些接口定义实现具体的业务逻辑
在项目下创建service/main.go
:
1 | package main |
客户端访问
在项目下创建client/main.go
:
1 | package main |
运行
1 | $ go run service/main.go |
grpc连接复用
首先修改服务端代码,添加 1s
的睡眠时间,模拟复杂业务处理场景:
1 | func (CalcSvc) Sum(ctx context.Context, req *proto.SumReq) (resp *proto.SumResp, err error) { |
http2多路复用
grpc
底层使用http2
协议进行通信,因此单条连接支持多路复用
修改客户端代码:
1 |
|
在上面代码中,服务端每次都睡眠1s
,客户端使用单条连接进行通信,1000个并发请求总共执行时间为1.1s
左右
如果是2000
个请求,平均在1.2s
左右,10000
个请求是2
s左右。
可见grpc
本身单条连接可用提供的并发效果足以满足大部分业务场景。
http2
提供了多路复用,即每个连接可以同时创建多个stream
,而数据分frame
进行传输,每个frame
都有streamID
来标识属于哪个stream
。由于grpc
采用http2
协议,因此单连接就可以并发发起多个rpc请求。但是,对于同一条连接的所有stream,最终是需要竞争socket的写锁的,因此当并发请求比较高的时候,可以适当添加连接数,来减少锁的竞争。默认grpc是一个后端地址创建一条连接,但是我们可以自己实现Resolver
接口,每个后端地址生成多个Address
,grpc
实际上是根据Address
来创建连接的,每个Address
创建一条连接。
连接池
接下来不使用http2
的多路复用,采用连接池的方式来创建请求
首先实现一个简单的连接池:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144package main
import (
"google.golang.org/grpc"
"sync"
"time"
)
// 连接池选项
type Options struct {
Dial Dialer
MaxConn int
MaxIdle int
WaitTimeout time.Duration
}
// 创建连接
type Dialer func() (*grpc.ClientConn, error)
type Pool struct {
dial Dialer
maxConn int // 最大打开连接数
maxIdle int // 最大空闲连接数
waitTimeout time.Duration // 等待连接超时时间
// 等待连接时通过connCh来传输可用连接
connCh chan *grpc.ClientConn
curConnNum int // 记录当前打开的连接数
// 保存空闲连接
freeConn []*grpc.ClientConn
sync.Mutex
}
// 创建连接池
func NewPool(opts Options) *Pool {
if opts.MaxConn <= 0 {
opts.MaxConn = 10
}
if opts.MaxIdle <= 0 {
opts.MaxIdle = 5
}
if opts.MaxIdle > opts.MaxConn {
opts.MaxIdle = opts.MaxIdle
}
return &Pool{
dial: opts.Dial,
maxConn: opts.MaxConn,
maxIdle: opts.MaxIdle,
waitTimeout: opts.WaitTimeout,
connCh: make(chan *grpc.ClientConn),
freeConn: make([]*grpc.ClientConn, 0, opts.MaxIdle),
}
}
// 获取连接
func (p *Pool) Get() (conn *grpc.ClientConn) {
p.Lock()
// 已经到达最大连接数
if p.curConnNum >= p.maxConn {
// 如果等待超时时间为0,直接返回
if p.waitTimeout == 0 {
p.Unlock()
return
}
var tm <-chan time.Time
// 如果等待超时时间小于0,表示无限等待
if p.waitTimeout > 0 {
tm = time.After(p.waitTimeout)
}
p.Unlock()
// 等待可用连接或者超时
select {
case <-tm:
case conn = <-p.connCh:
}
return
}
// 如果存在空闲连接
if ln := len(p.freeConn); ln > 0 {
conn = p.freeConn[0]
p.freeConn[0] = p.freeConn[ln-1]
p.freeConn = p.freeConn[:ln-1]
} else { // 创建新的连接
c, err := p.dial()
if err != nil {
conn = nil
} else {
p.curConnNum++
conn = c
}
}
p.Unlock()
return
}
// 释放连接
func (p *Pool) Put(conn *grpc.ClientConn) error {
if conn == nil {
return nil
}
// 首先判断是否有其他协程在等待连接
select {
case p.connCh <- conn:
return nil
default:
}
p.Lock()
defer p.Unlock()
// 再次判断是否有等待可用连接
select {
case p.connCh <- conn:
return nil
default:
// 放回空闲连接
if len(p.freeConn) < p.maxIdle {
p.freeConn = append(p.freeConn, conn)
return nil
}
// 空闲连接数达到上限,关闭连接
p.curConnNum--
return conn.Close()
}
}
// 统计连接池状态
func (p *Pool) Stat() PoolStat {
p.Lock()
p.Unlock()
return PoolStat{
ConnNum: p.curConnNum,
IdleConnNum: len(p.freeConn),
}
}
type PoolStat struct {
ConnNum int
IdleConnNum int
}
接下来,使用该连接池进行测试:
1 | package main |
在上面的代码中,每次请求时都从连接池中获取一个连接,请求完成后将其释放。
运行上面代码,1000
个并发请求总共需要花费10.15s
左右。
负载均衡
这里使用dns
来进行负载均衡进行演示。
我实验机器上面的本机IP
是127.0.0.1
,虚拟机IP
是192.168.50.12
首先,修改系统的hosts
文件,添加:
1 | 192.168.50.12 www.grpc.com |
然后,同时在本地和虚拟机中启动grpc server
最后,修改grpc client
代码:
1 | conn, err := grpc.Dial("dns:///www.grpc.com:8888", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
在创建grpc
连接的时候,使用dns:///www.grpc.com:8888
,同时指定负载策略为roundrobin
。
执行grpc client
,可用看到两边的grpc server
都有打印出请求日志。
grpc
提供的负载均衡测试是在请求级别上进行负载均衡。
grpc
会同时为每个grpc server
创建一条连接;每次要发起一个请求的时候,都会根据负载策略选择一条连接来发起请求。