编辑
2025-01-23
技术
00
请注意,本文编写于 135 天前,最后修改于 134 天前,其中某些信息可能已经过时。

目录

grpc定义
代码生成
接口实现
进阶——服务发现ETCD

整理一下之前工作中用到的GRPC相关内容

grpc定义

grpc是一种接口描述协议,我们可以通过定义proto文件来自动实现相关代码逻辑,举个例子:

bash
syntax = "proto3"; // 标记所属包名 package grpc_test; // 标记生成的go代码目录:即生成代码后会把代码文件丢到当前目录下的grpc_test文件夹中(如果是java-grpc,则是java_package) option go_package = "./grpc_test"; // 可以把这个messsage理解为结构体定义,这个message映射为代码类或实体结构体,这里的=1和=2是在标记这个字段,从1开始,第几个字段标记为几 message GrpcHelloWorldReq { string message = 1; // 可以定义string list repeated string messageList = 2; } message GrpcHelloWorldResp { string message = 1; // 可以定义结构体list repeated GrpcHelloWorldReq req = 2; } service MeetingDeviceService { rpc HelloWorld(GrpcHelloWorldReq) returns (GrpcHelloWorldResp){} // 定义一个rpc接口,输入GrpcHelloWorldReq结构体,返回GrpcHelloWorldResp }

代码生成

在proto文件目录下运行下面的指令

bash
protoc --go_out=. *.proto protoc --go-grpc_out=. *.proto

成功运行后你就会在目录/grpc_test/下看到.pb.go的文件,这个里面包含了根据定义的proto文件生成的代码

接口实现

服务端:

go
package main import ( "context" "fmt" "net" "kafka-test/main/grpc_test" "google.golang.org/grpc" ) type server struct { grpc_test.MeetingDeviceServiceServer } func (s *server) HelloWorld(ctx context.Context, req *grpc_test.GrpcHelloWorldReq) (*grpc_test.GrpcHelloWorldResp, error) { fmt.Println("request:", req.Message) return &grpc_test.GrpcHelloWorldResp{Message: "hello " + req.Message}, nil } func main() { listen, err := net.Listen("tcp", ":8001") if err != nil { fmt.Printf("failed to listen: %v", err) return } s := grpc.NewServer() //reflection.Register(s) defer func() { s.Stop() listen.Close() }() fmt.Println("Serving 8001...") err = s.Serve(listen) if err != nil { fmt.Printf("failed to serve: %v", err) return } }

客户端:

go
package main import ( "context" "fmt" "sync" "kafka-test/main/grpc_test" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) var ( meetingRoomRpcClient grpc_test.MeetingDeviceServiceClient once = sync.Once{} ) func init() { var serviceHost = "127.0.0.1:8001" conn, err := grpc.Dial(serviceHost, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println(err) } defer conn.Close() // 初始化client(这里client作为调用方,所以只需要初始化一次) once.Do(func() { meetingRoomRpcClient = grpc_test.NewMeetingDeviceServiceClient(conn) //meetingRoomRpcClient = grpc_test.NewMeetingDeviceServiceClient("meeting-device-service") }) } func RpcHellWorld() { worldResp, err := meetingRoomRpcClient.HelloWorld(context.Background(), &grpc_test.GrpcHelloWorldReq{ Message: "hello world", MessageList: []string{"hello world", "hello world"}, }) if err != nil { return } fmt.Println(worldResp.Message) }

启动服务后,rpc服务会运行在本地的8001端口,因此作为下游客户端的rpc服务只需要监听这个8001端口即可实现rpc调用。当然,下游的rpc客户端服务也可以定义自己的proto rpc文件,然后被其他服务调用自己的rpc方法。

在rpc服务调用过程中,谁提供方法接口,谁是server,谁调用方法接口,谁是client,只是一个相对概念。

进阶——服务发现ETCD

当RPC服务多了以后,我们需要配置每个RPC服务的IP和端口,非常麻烦,因此这就需要用到注册中心来简化RPC服务管理。用注册中心的话,我们只需要记住每个服务的服务名即可,就不用再挨个记IP了。

放一个官方文档:https://etcd.io/docs/v3.5/quickstart/ 安装请自行参考官方文档

ETCD注册服务端:

golang
import ( "context" "flag" "fmt" "net" "time" grpc_test2 "kafka-test/rpc/grpc_test" // etcd eclient "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints" "google.golang.org/grpc" ) const ( // grpc 服务名 GrpcTestServiceName = "umb/grpc-test" // etcd 端口 ETCD_ADDR = "http://localhost:2379" ) type server struct { grpc_test2.UnimplementedGrpcHelloWorldServiceServer } func (s *server) HelloWorld(ctx context.Context, req *grpc_test2.GrpcHelloWorldReq) (*grpc_test2.GrpcHelloWorldResp, error) { fmt.Println("request:", req.Message) return &grpc_test2.GrpcHelloWorldResp{Message: "hello " + req.Message}, nil } func registerEndPointToEtcd(ctx context.Context, addr string) { // 创建 etcd 客户端 etcdClient, _ := eclient.NewFromURL(ETCD_ADDR) etcdManager, _ := endpoints.NewManager(etcdClient, GrpcTestServiceName) // 创建一个租约,每隔 10s 需要向 etcd 汇报一次心跳,证明当前节点仍然存活 var ttl int64 = 10 lease, _ := etcdClient.Grant(ctx, ttl) // 添加注册节点到 etcd 中,并且携带上租约 id _ = etcdManager.AddEndpoint(ctx, fmt.Sprintf("%s/%s", GrpcTestServiceName, addr), endpoints.Endpoint{Addr: addr}, eclient.WithLease(lease.ID)) // 每隔 5 s进行一次延续租约的动作 for { select { case <-time.After(5 * time.Second): // 续约操作 resp, _ := etcdClient.KeepAliveOnce(ctx, lease.ID) fmt.Printf("keep alive resp: %+v", resp) case <-ctx.Done(): return } } } func InitRpcServer() { var port int flag.IntVar(&port, "port", 8001, "port") flag.Parse() addr := fmt.Sprintf("http://localhost:%d", port) listen, err := net.Listen("tcp", addr) if err != nil { fmt.Printf("failed to listen: %v", err) return } s := grpc.NewServer() //reflection.Register(s) grpc_test2.RegisterGrpcHelloWorldServiceServer(s, &server{}) defer func() { s.Stop() listen.Close() }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 注册 grpc 服务节点到 etcd 中 go registerEndPointToEtcd(ctx, addr) fmt.Println("Serving 8001...") err = s.Serve(listen) if err != nil { fmt.Printf("failed to serve: %v", err) return } }

ETCD注册客户端:

golang
import ( "context" "fmt" "sync" grpc_test2 "kafka-test/rpc/grpc_test" // etcd eclient "go.etcd.io/etcd/client/v3" eresolver "go.etcd.io/etcd/client/v3/naming/resolver" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) var ( meetingRoomRpcClient grpc_test2.GrpcHelloWorldServiceClient once = sync.Once{} ) func init() { // 创建 etcd 客户端 etcdClient, _ := eclient.NewFromURL(ETCD_ADDR) // 创建 etcd 实现的 grpc 服务注册发现模块 resolver etcdResolverBuilder, _ := eresolver.NewBuilder(etcdClient) // 拼接服务名称,需要固定义 etcd:/// 作为前缀 etcdTarget := fmt.Sprintf("etcd:///%s", GrpcTestServiceName) // 创建 grpc 连接代理 conn, _ := grpc.Dial( // 服务名称 etcdTarget, // 注入 etcd resolver grpc.WithResolvers(etcdResolverBuilder), // 声明使用的负载均衡策略为 roundrobin grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), grpc.WithTransportCredentials(insecure.NewCredentials()), ) defer conn.Close() defer conn.Close() // 初始化client(这里client作为调用方,所以只需要初始化一次) once.Do(func() { meetingRoomRpcClient = grpc_test2.NewGrpcHelloWorldServiceClient(conn) //meetingRoomRpcClient = grpc_test.NewMeetingDeviceServiceClient("meeting-device-service") }) } func RpcHellWorld() { worldResp, err := meetingRoomRpcClient.HelloWorld(context.Background(), &grpc_test2.GrpcHelloWorldReq{ Message: "hello world", MessageList: []string{"hello world", "hello world"}, }) if err != nil { return } fmt.Println(worldResp.Message) }

这样基于ETCD改造之后,server和客户端只需要知道对方的名称就能用了

本文作者:伞菌

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!