grpc上手使用

安装

golang版本的grpc要求go版本要在1.6以上

install gRPC

使用go get命令安装grpc

1
$ go get -u google.golang.org/grpc

由于某些不可逆原因,上面命令会报连接超时,可以到github上将项目clone$GOPATH/src/google.golang.org/

1
2
3
> $ cd $GOPATH/src/google.golang.org
> $ git clone git@github.com:grpc/grpc-go.git grpc
>
install Protocol Buffers v3

grpc默认使用protobuf作为序列化工具。

  1. 打开Releases页面,下载对应平台的.zipprotoc-<version>-<platform>.zip
  2. 解压
  3. 添加二进制文件路径导PATH环境变量
install protoc plugin

安装golang版本对应的protobuf生成工具

1
2
$ go get -u github.com/golang/protobuf/protoc-gen-go
$ export PATH=$PATH:$GOPATH/bin

运行demo

进入example目录

1
$ cd $GOPATH/src/google.golang.org/grpc/examples/helloworld

删除原来的helloworld.pb.go文件,并使用protoc生成自己生成一个

1
2
$ rm helloworld/helloworld.pb.go // 删除原来的helloworld.pb.go文件
$ protoc -I helloworld/ helloworld/helloworld.proto --go_out=plugins=grpc:helloworld // 根据 .proto 文件生成对应的.go文件

编写grpc接口时,在.proto文件定义接口通信数据格式和接口信息,然后通过protoc自动生成对应的go代码,大大方便了开发

  • -I PATH:specify the directory in which to search for imports. May be specified multiple times; directories will be searched in order. If not given, the current working directory is used.
  • --go_out:指定输出go代码
  • plugins=grpc.proto中的servicegrpc扩展的功能,需要使用grpc插件进行解析才能生成对应的接口定义代码。

运行 grpc servergrpc client

1
2
$ go run greeter_server/main.go // 启动grpc server
$ go run greeter_client/main.go // 启动grpc client

实践

使用grpc开发一个简单的求和服务。

定义.proto文件

在项目下创建proto/sum.proto文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
syntax = "proto3"; // 使用 proto3

// java生成选项
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package proto; // 生成的go所属的package

message SumResp {
int64 sum = 1;
}

message SumReq {
int64 a = 1;
int64 b = 2;
}


service CalcSvc {
// 每个rpc接口声明都必须有且一个参数和一个返回值
rpc Sum(SumReq) returns (SumResp) {}
}
根据接口描述文件生成源码

进入proto目录,执行

1
$ protoc sum.proto --go_out=plugins=grpc:.

可以看到,在本目录下生成sum.pb.go文件,且packageproto

开发服务端接口

首先查看生成的sum.pb.go文件,可以看到根据sum.proto文件中的CalcSvc接口定义生成了对应的接口:

1
2
3
4
5
// CalcSvcServer is the server API for CalcSvc service.
type CalcSvcServer interface {
// 每个rpc接口声明都必须有且一个参数和一个返回值
Sum(context.Context, *SumReq) (*SumResp, error)
}

开发服务端接口只要就是根据这些接口定义实现具体的业务逻辑

在项目下创建service/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"grpc-demo/proto"
"log"
"net"
)

// 类型断言
var _ proto.CalcSvcServer = new(CalcSvc)

type CalcSvc struct{}

func (CalcSvc) Sum(ctx context.Context, req *proto.SumReq) (resp *proto.SumResp, err error) {
// 建议使用GetA,不要直接使用req.A,可能存在req=nil的情况
a := req.GetA()
b := req.GetB()
log.Println("request coming ...")
return &proto.SumResp{
Sum: a + b,
}, err
}

func main() {
lis, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal(err)
}
// 注册服务到gRPC
s := grpc.NewServer()
proto.RegisterCalcSvcServer(s, &CalcSvc{})
// 启用Server Reflection,可以使用gRPC CLI去检查services
// https://github.com/grpc/grpc-go/blob/master/Documentation/server-reflection-tutorial.md
reflection.Register(s)
// 启动服务
if err := s.Serve(lis); err != nil {
log.Fatal(err)
}
}
客户端访问

在项目下创建client/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"context"
"google.golang.org/grpc"
"grpc-demo/proto"
"log"
)

func main() {
// 创建gRPC连接
// WithInsecure option 指定不启用认证功能
conn, err := grpc.Dial(":8888", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
// 创建gRPC client
client := proto.NewCalcSvcClient(conn)
// 请求gRPC server
resp, err := client.Sum(context.Background(), &proto.SumReq{
A: 5,
B: 10,
})
if err != nil {
log.Fatal(err)
}
log.Printf("5 + 10 = %d", resp.GetSum())
}
运行
1
2
$ go run service/main.go
$ go run client/main.go

grpc连接复用

首先修改服务端代码,添加 1s 的睡眠时间,模拟复杂业务处理场景:

1
2
3
4
5
6
7
8
9
10
func (CalcSvc) Sum(ctx context.Context, req *proto.SumReq) (resp *proto.SumResp, err error) {
a := req.GetA()
b := req.GetB()
log.Println("request coming ...")
// 添加 1s 睡眠,模拟接口执行业务逻辑
time.Sleep(time.Second)
return &proto.SumResp{
Sum: a + b,
}, err
}
http2多路复用

grpc底层使用http2协议进行通信,因此单条连接支持多路复用

修改客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

func main() {
conn ,err:=grpc.Dial(":8888", grpc.WithInsecure())
if err!=nil {
log.Fatal(err)
}
client :=proto.NewCalcSvcClient(conn)

wg := sync.WaitGroup{}
begin := time.Now()
concurrentNum := 1000
wg.Add(concurrentNum)

for i := 0; i < concurrentNum; i++ {
go func() {
resp, err := client.Sum(context.Background(), &proto.SumReq{
A: 5,
B: 10,
})
if err != nil {
log.Fatal(err)
}
log.Printf("5 + 10 = %d", resp.GetSum())
wg.Done()
}()
}
wg.Wait()
log.Printf("用时:%v", time.Now().Sub(begin))
}

在上面代码中,服务端每次都睡眠1s,客户端使用单条连接进行通信,1000个并发请求总共执行时间为1.1s左右

如果是2000个请求,平均在1.2s左右,10000个请求是2s左右。

可见grpc本身单条连接可用提供的并发效果足以满足大部分业务场景。

http2提供了多路复用,即每个连接可以同时创建多个stream,而数据分frame进行传输,每个frame都有streamID来标识属于哪个stream。由于grpc采用http2协议,因此单连接就可以并发发起多个rpc请求。但是,对于同一条连接的所有stream,最终是需要竞争socket的写锁的,因此当并发请求比较高的时候,可以适当添加连接数,来减少锁的竞争。默认grpc是一个后端地址创建一条连接,但是我们可以自己实现Resolver接口,每个后端地址生成多个Addressgrpc实际上是根据Address来创建连接的,每个Address创建一条连接。

连接池

接下来不使用http2的多路复用,采用连接池的方式来创建请求

首先实现一个简单的连接池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package main

import (
"google.golang.org/grpc"
"sync"
"time"
)

// 连接池选项
type Options struct {
Dial Dialer
MaxConn int
MaxIdle int
WaitTimeout time.Duration
}

// 创建连接
type Dialer func() (*grpc.ClientConn, error)

type Pool struct {
dial Dialer
maxConn int // 最大打开连接数
maxIdle int // 最大空闲连接数

waitTimeout time.Duration // 等待连接超时时间
// 等待连接时通过connCh来传输可用连接
connCh chan *grpc.ClientConn

curConnNum int // 记录当前打开的连接数
// 保存空闲连接
freeConn []*grpc.ClientConn
sync.Mutex
}

// 创建连接池
func NewPool(opts Options) *Pool {
if opts.MaxConn <= 0 {
opts.MaxConn = 10
}
if opts.MaxIdle <= 0 {
opts.MaxIdle = 5
}
if opts.MaxIdle > opts.MaxConn {
opts.MaxIdle = opts.MaxIdle
}

return &Pool{
dial: opts.Dial,
maxConn: opts.MaxConn,
maxIdle: opts.MaxIdle,
waitTimeout: opts.WaitTimeout,
connCh: make(chan *grpc.ClientConn),
freeConn: make([]*grpc.ClientConn, 0, opts.MaxIdle),
}

}

// 获取连接
func (p *Pool) Get() (conn *grpc.ClientConn) {
p.Lock()
// 已经到达最大连接数
if p.curConnNum >= p.maxConn {
// 如果等待超时时间为0,直接返回
if p.waitTimeout == 0 {
p.Unlock()
return
}

var tm <-chan time.Time
// 如果等待超时时间小于0,表示无限等待
if p.waitTimeout > 0 {
tm = time.After(p.waitTimeout)
}
p.Unlock()
// 等待可用连接或者超时
select {
case <-tm:
case conn = <-p.connCh:
}
return
}
// 如果存在空闲连接
if ln := len(p.freeConn); ln > 0 {
conn = p.freeConn[0]
p.freeConn[0] = p.freeConn[ln-1]
p.freeConn = p.freeConn[:ln-1]
} else { // 创建新的连接
c, err := p.dial()
if err != nil {
conn = nil
} else {
p.curConnNum++
conn = c
}
}
p.Unlock()
return
}

// 释放连接
func (p *Pool) Put(conn *grpc.ClientConn) error {
if conn == nil {
return nil
}
// 首先判断是否有其他协程在等待连接
select {
case p.connCh <- conn:
return nil
default:
}
p.Lock()
defer p.Unlock()

// 再次判断是否有等待可用连接
select {
case p.connCh <- conn:
return nil
default:
// 放回空闲连接
if len(p.freeConn) < p.maxIdle {
p.freeConn = append(p.freeConn, conn)
return nil
}

// 空闲连接数达到上限,关闭连接
p.curConnNum--
return conn.Close()
}
}

// 统计连接池状态
func (p *Pool) Stat() PoolStat {
p.Lock()
p.Unlock()
return PoolStat{
ConnNum: p.curConnNum,
IdleConnNum: len(p.freeConn),
}
}

type PoolStat struct {
ConnNum int
IdleConnNum int
}

接下来,使用该连接池进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
"context"
"google.golang.org/grpc"
"grpc-demo/proto"
"log"
"sync"
"time"
)

func main() {
opts := Options{
Dial: func() (*grpc.ClientConn, error) {
return grpc.Dial(":8888", grpc.WithInsecure())
},
WaitTimeout: time.Second * 10,
MaxConn: 100, // 设置最大连接数为100
MaxIdle: 50,
}
pool := NewPool(opts)
if pool == nil {
panic("nil pool")
}

wg := sync.WaitGroup{}
begin := time.Now()
concurrentNum := 1000
wg.Add(concurrentNum)
for i := 0; i < concurrentNum; i++ {
go func() {

conn := pool.Get()
if conn == nil {
panic("nil conn")
}
defer pool.Put(conn)
client := proto.NewCalcSvcClient(conn)

resp, err := client.Sum(context.Background(), &proto.SumReq{
A: 5,
B: 10,
})
if err != nil {
log.Fatal(err)
}
log.Printf("5 + 10 = %d", resp.GetSum())
wg.Done()
}()
}
wg.Wait()
log.Printf("用时:%v", time.Now().Sub(begin))
log.Println(pool.Stat())
}

在上面的代码中,每次请求时都从连接池中获取一个连接,请求完成后将其释放。

运行上面代码,1000个并发请求总共需要花费10.15s左右

负载均衡

这里使用dns来进行负载均衡进行演示。

我实验机器上面的本机IP127.0.0.1,虚拟机IP192.168.50.12

首先,修改系统的hosts文件,添加:

1
2
192.168.50.12 www.grpc.com
127.0.0.1 www.grpc.com

然后,同时在本地和虚拟机中启动grpc server

最后,修改grpc client代码:

1
2
3
4
5
conn, err := grpc.Dial("dns:///www.grpc.com:8888", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
log.Fatal(err)
}
client := proto.NewCalcSvcClient(conn)

在创建grpc连接的时候,使用dns:///www.grpc.com:8888,同时指定负载策略为roundrobin

执行grpc client,可用看到两边的grpc server都有打印出请求日志

grpc提供的负载均衡测试是在请求级别上进行负载均衡

grpc会同时为每个grpc server创建一条连接;每次要发起一个请求的时候,都会根据负载策略选择一条连接来发起请求。