整理一下之前工作中用到的GRPC相关内容
grpc是一种接口描述协议,我们可以通过定义proto文件来自动实现相关代码逻辑,举个例子:
bashsyntax = "proto3";
// 标记所属包名
package grpc_test;
// 标记生成的go代码目录:即生成代码后会把代码文件丢到当前目录下的grpc_test文件夹中(如果是java-grpc,则是java_package)
option go_package = "./grpc_test";
// 可以把这个messsage理解为结构体定义,这个message映射为代码类或实体结构体,这里的=1和=2是在标记这个字段,从1开始,第几个字段标记为几
message GrpcHelloWorldReq {
string message = 1;
// 可以定义string list
repeated string messageList = 2;
}
message GrpcHelloWorldResp {
string message = 1;
// 可以定义结构体list
repeated GrpcHelloWorldReq req = 2;
}
service MeetingDeviceService {
rpc HelloWorld(GrpcHelloWorldReq) returns (GrpcHelloWorldResp){}
// 定义一个rpc接口,输入GrpcHelloWorldReq结构体,返回GrpcHelloWorldResp
}
在proto文件目录下运行下面的指令
bashprotoc --go_out=. *.proto protoc --go-grpc_out=. *.proto
成功运行后你就会在目录/grpc_test/下看到.pb.go的文件,这个里面包含了根据定义的proto文件生成的代码
服务端:
gopackage main
import (
"context"
"fmt"
"net"
"kafka-test/main/grpc_test"
"google.golang.org/grpc"
)
type server struct {
grpc_test.MeetingDeviceServiceServer
}
func (s *server) HelloWorld(ctx context.Context, req *grpc_test.GrpcHelloWorldReq) (*grpc_test.GrpcHelloWorldResp, error) {
fmt.Println("request:", req.Message)
return &grpc_test.GrpcHelloWorldResp{Message: "hello " + req.Message}, nil
}
func main() {
listen, err := net.Listen("tcp", ":8001")
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer()
//reflection.Register(s)
defer func() {
s.Stop()
listen.Close()
}()
fmt.Println("Serving 8001...")
err = s.Serve(listen)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
客户端:
gopackage main
import (
"context"
"fmt"
"sync"
"kafka-test/main/grpc_test"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
var (
meetingRoomRpcClient grpc_test.MeetingDeviceServiceClient
once = sync.Once{}
)
func init() {
var serviceHost = "127.0.0.1:8001"
conn, err := grpc.Dial(serviceHost, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println(err)
}
defer conn.Close()
// 初始化client(这里client作为调用方,所以只需要初始化一次)
once.Do(func() {
meetingRoomRpcClient = grpc_test.NewMeetingDeviceServiceClient(conn)
//meetingRoomRpcClient = grpc_test.NewMeetingDeviceServiceClient("meeting-device-service")
})
}
func RpcHellWorld() {
worldResp, err := meetingRoomRpcClient.HelloWorld(context.Background(), &grpc_test.GrpcHelloWorldReq{
Message: "hello world",
MessageList: []string{"hello world", "hello world"},
})
if err != nil {
return
}
fmt.Println(worldResp.Message)
}
启动服务后,rpc服务会运行在本地的8001端口,因此作为下游客户端的rpc服务只需要监听这个8001端口即可实现rpc调用。当然,下游的rpc客户端服务也可以定义自己的proto rpc文件,然后被其他服务调用自己的rpc方法。
在rpc服务调用过程中,谁提供方法接口,谁是server,谁调用方法接口,谁是client,只是一个相对概念。
当RPC服务多了以后,我们需要配置每个RPC服务的IP和端口,非常麻烦,因此这就需要用到注册中心来简化RPC服务管理。用注册中心的话,我们只需要记住每个服务的服务名即可,就不用再挨个记IP了。
放一个官方文档:https://etcd.io/docs/v3.5/quickstart/ 安装请自行参考官方文档
ETCD注册服务端:
golang
import (
"context"
"flag"
"fmt"
"net"
"time"
grpc_test2 "kafka-test/rpc/grpc_test"
// etcd
eclient "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"google.golang.org/grpc"
)
const (
// grpc 服务名
GrpcTestServiceName = "umb/grpc-test"
// etcd 端口
ETCD_ADDR = "http://localhost:2379"
)
type server struct {
grpc_test2.UnimplementedGrpcHelloWorldServiceServer
}
func (s *server) HelloWorld(ctx context.Context, req *grpc_test2.GrpcHelloWorldReq) (*grpc_test2.GrpcHelloWorldResp, error) {
fmt.Println("request:", req.Message)
return &grpc_test2.GrpcHelloWorldResp{Message: "hello " + req.Message}, nil
}
func registerEndPointToEtcd(ctx context.Context, addr string) {
// 创建 etcd 客户端
etcdClient, _ := eclient.NewFromURL(ETCD_ADDR)
etcdManager, _ := endpoints.NewManager(etcdClient, GrpcTestServiceName)
// 创建一个租约,每隔 10s 需要向 etcd 汇报一次心跳,证明当前节点仍然存活
var ttl int64 = 10
lease, _ := etcdClient.Grant(ctx, ttl)
// 添加注册节点到 etcd 中,并且携带上租约 id
_ = etcdManager.AddEndpoint(ctx, fmt.Sprintf("%s/%s", GrpcTestServiceName, addr), endpoints.Endpoint{Addr: addr}, eclient.WithLease(lease.ID))
// 每隔 5 s进行一次延续租约的动作
for {
select {
case <-time.After(5 * time.Second):
// 续约操作
resp, _ := etcdClient.KeepAliveOnce(ctx, lease.ID)
fmt.Printf("keep alive resp: %+v", resp)
case <-ctx.Done():
return
}
}
}
func InitRpcServer() {
var port int
flag.IntVar(&port, "port", 8001, "port")
flag.Parse()
addr := fmt.Sprintf("http://localhost:%d", port)
listen, err := net.Listen("tcp", addr)
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer()
//reflection.Register(s)
grpc_test2.RegisterGrpcHelloWorldServiceServer(s, &server{})
defer func() {
s.Stop()
listen.Close()
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 注册 grpc 服务节点到 etcd 中
go registerEndPointToEtcd(ctx, addr)
fmt.Println("Serving 8001...")
err = s.Serve(listen)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
ETCD注册客户端:
golangimport (
"context"
"fmt"
"sync"
grpc_test2 "kafka-test/rpc/grpc_test"
// etcd
eclient "go.etcd.io/etcd/client/v3"
eresolver "go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
var (
meetingRoomRpcClient grpc_test2.GrpcHelloWorldServiceClient
once = sync.Once{}
)
func init() {
// 创建 etcd 客户端
etcdClient, _ := eclient.NewFromURL(ETCD_ADDR)
// 创建 etcd 实现的 grpc 服务注册发现模块 resolver
etcdResolverBuilder, _ := eresolver.NewBuilder(etcdClient)
// 拼接服务名称,需要固定义 etcd:/// 作为前缀
etcdTarget := fmt.Sprintf("etcd:///%s", GrpcTestServiceName)
// 创建 grpc 连接代理
conn, _ := grpc.Dial(
// 服务名称
etcdTarget,
// 注入 etcd resolver
grpc.WithResolvers(etcdResolverBuilder),
// 声明使用的负载均衡策略为 roundrobin grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
defer conn.Close()
defer conn.Close()
// 初始化client(这里client作为调用方,所以只需要初始化一次)
once.Do(func() {
meetingRoomRpcClient = grpc_test2.NewGrpcHelloWorldServiceClient(conn)
//meetingRoomRpcClient = grpc_test.NewMeetingDeviceServiceClient("meeting-device-service")
})
}
func RpcHellWorld() {
worldResp, err := meetingRoomRpcClient.HelloWorld(context.Background(), &grpc_test2.GrpcHelloWorldReq{
Message: "hello world",
MessageList: []string{"hello world", "hello world"},
})
if err != nil {
return
}
fmt.Println(worldResp.Message)
}
这样基于ETCD改造之后,server和客户端只需要知道对方的名称就能用了
本文作者:伞菌
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!