OpenTelemetry实现go应用的可观测性 - gRPC

 提示:转载请注明原文链接

 本文永久链接:https://www.360us.net/article/88.html

上一篇文章用的是http协议做服务间的调用协议,这篇改成gRPC。

首先安装包otelgrpc

go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

创建一个api目录,创建rpc.proto文件:

syntax = "proto3";

package api;

// advanced目录执行编译: protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative api/rpc.proto

option go_package ="github.com/ilaziness/gopkg/opentelemetry/as/api";

service AsRpc {
    rpc Test(Req) returns (Resp) {}
}

service BsRpc {
    rpc Test(Req) returns (Resp) {}
}

message Req {
    string a = 1;
    int32 b = 2;
}

message Resp {
    string s = 1;
}

asbs服务都定义了一个Test方法。

定义好后生成代码。

然后在被调用端加上rpc服务监听和服务实现的代码。

as:

// runRpcServer 启动RPC服务
func runRpcServer() {
	lis, err := net.Listen("tcp", ":7000")
	if err != nil {
		log.Fatal(err)
	}
	s := grpc.NewServer(
		grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
		grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
	)
	api.RegisterAsRpcServer(s, &AsRpc{})
	log.Println("rpc server listening at port", "7000")
	if err := s.Serve(lis); err != nil {
		log.Fatal(err)
	}
}

type AsRpc struct {
	api.UnimplementedAsRpcServer
}

// Test test rpc服务接口
func (a *AsRpc) Test(ctx context.Context, req *api.Req) (*api.Resp, error) {
	ctx, span := tracer.Start(ctx, "as rpc Test")
	defer span.End()
	log.Println("as rpc server receive data:", req.GetA(), req.GetB())

	return &api.Resp{S: "this is as rpc server response"}, nil
}

实现了as服务器的Test接口,然后启动服务在7000端口。

grpc.NewServer方法要加上otelgrpc的拦截器,这样才能收到传播的追踪上下文信息。

bs:

// runRpcServer 启动RPC服务
func runRpcServer() {
	lis, err := net.Listen("tcp", ":7001")
	if err != nil {
		log.Fatal(err)
	}
	s := grpc.NewServer(
		grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
		grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
	)
	api.RegisterBsRpcServer(s, &BsRpc{})
	log.Println("rpc server listening at port", "7000")
	if err := s.Serve(lis); err != nil {
		log.Fatal(err)
	}
}

type BsRpc struct {
	api.UnimplementedBsRpcServer
}

// Test test rpc服务接口
func (a *BsRpc) Test(ctx context.Context, req *api.Req) (*api.Resp, error) {
	_, span := tracer.Start(ctx, "bs rpc Test")
	defer span.End()
	log.Println("bs rpc server receive data:", req.GetA(), req.GetB())
	return &api.Resp{S: "this is bs rpc server response"}, nil
}

和上面as一样,bs监听在7001端口。

然后是实现客户端调用,本例的调用关系是mian -> as -> bs

定义两个函数:

// CallAsTest 调用as服务的test
func CallAsTest(ctx context.Context, c api.AsRpcClient) {
	resp, err := c.Test(ctx, &api.Req{A: "a", B: 34})
	if err != nil {
		log.Fatal("call as test error:", err)
	}
	log.Println("call as response:", resp.S)
}

// CallBsTest 调用bs服务的test
func CallBsTest(ctx context.Context, c api.BsRpcClient) {
	resp, err := c.Test(ctx, &api.Req{A: "b", B: 35})
	if err != nil {
		log.Fatal("call bs test error:", err)
	}
	log.Println("call bs response:", resp.S)
}

上面函数的功能是用grpc分别调用asbsTest

然后在main添加调用as的代码, runApp

// ......
// rpc client
addr := "127.0.0.1:7000"
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
    grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
if err != nil {
    log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
rpcclient := api.NewAsRpcClient(conn)
_, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
    //......

    // 新增一条rpc的span
    ctx, span = tracer.Start(ctx, "main hello call as")
    defer span.End()
    // rpc call 调用as服务的Test
    rpcall.CallAsTest(ctx, rpcclient)

    fmt.Fprintf(w, "hello")
}
//.....

创建了客户端对象,在/hello handler里面调用了CallAsTest函数。

添加as调用bsTest代码:

func runApp() {
    // rpc client
    addr := "127.0.0.1:7001"
    conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
        grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    rpcClient = api.NewBsRpcClient(conn)
    _, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    // .......
}

var rpcClient api.BsRpcClient

// ....
// Test test rpc服务接口
func (a *AsRpc) Test(ctx context.Context, req *api.Req) (*api.Resp, error) {
	ctx, span := tracer.Start(ctx, "as rpc Test")
	defer span.End()
	log.Println("as rpc server receive data:", req.GetA(), req.GetB())

	// rpc call 调用Bs服务的Test
	rpcall.CallBsTest(ctx, rpcClient)

	return &api.Resp{S: "this is as rpc server response"}, nil
}

创建客户端对象,在Test服务方法里面添加了调用bs的函数CallBsTest

到这里三个服务应该都是可以正常运行,并且可以互相通过gRPC调用了。

分别运行三个服务,浏览器访问http://127.0.0.1:8080/hello,正常输出一个hello字符串。

打开jaeger UI查看追踪记录,效果如下:

jaeger ui

有两条链路,http的是http协议的调用链路,rpc的是gRPC的调用链路。

从图上可以看到,rpc链路在客户端和服务的端都会生成一条数据,而http只会在发起端记录一条http请求的记录。

总结

gRPC的链路追踪是在服务端NewServer和客户端Dial时注册otelgrpc的拦截器。

客户端调用时把spancontext做为具体rpc客户端方法的context参数。

服务端对应方法取出context,用来继续添加span就可以连上了。

完整代码参考:https://github.com/ilaziness/gopkg/tree/main/opentelemetry/advanced

 评论
暂无评论