MENU

链路跟踪与观测:OpenTelemetry 应用侧接入

October 17, 2023 • 学习

初始化(每一个接入的应用都必须做的事情)

func Init() {
    ctx := context.Background()

    // applicationRes 通常一个服务实例共享同一个applicationRes
    // 用于记录服务名,服务节点名等信息
    applicationRes := resource.NewWithAttributes(
        semconv.SchemaURL,
        semconv.ServiceName("grpcClient"),
        semconv.K8SNodeName("single-node"),
        // 还有其他的属性可以配置
    )

    // 初始化traceExporter
    otlpClient := otlptracehttp.NewClient(otlptracehttp.WithEndpoint("127.0.0.1:4318"), otlptracehttp.WithInsecure()) 
    traceExporter, err := otlptrace.New(ctx, otlpClient)
    if err != nil {
        panic(fmt.Sprintf("creating OTLP trace exporter: %w", err))
    }
    otel.SetTracerProvider(sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(traceExporter),
        sdktrace.WithResource(applicationRes),
    ))
    
    // 初始化metricExporter
    metricExporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpoint("127.0.0.1:4318"), otlpmetrichttp.WithInsecure())
    if err != nil {
        panic(fmt.Sprintf("creating OTLP metric exporter: %w", err))
    }
    otel.SetMeterProvider(sdkmetric.NewMeterProvider(
        sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, sdkmetric.WithInterval(30*time.Second))), // 注意此处的传参
        sdkmetric.WithResource(applicationRes),
    ))
    
    // 初始化传播器
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
}

HttpClient

//首先,记得初始化资源
Init()

func(ctx context.Context) {
  newCtx, span := otel.Tracer("requestTracer").Start(ctx, "httpReqStart")
    defer span.End() // 关闭Span

    // 创建一个Baggage用于上下文传递
    bag, _ := baggage.New()
    member, _ := baggage.NewMember("user-id", "dextercai") // Key Value
    setMember, err := bag.SetMember(member)
    if err != nil {
        panic(err)
    }

    // 向Ctx中注入Baggage信息
    newCtx = baggage.ContextWithBaggage(newCtx, setMember)
    span.AddEvent("SendRequest") // 增加一个Event 可以充当Log使用
    req, err := http.NewRequestWithContext(newCtx, "GET", "http://localhost:3000/api/do/123", nil)
    

    //====== 方式一,手动向Header注入上下文信息 
    carrier := propagation.HeaderCarrier(req.Header)
    otel.GetTextMapPropagator().Inject(newCtx, carrier) // 注入到HttpHeader中进行传递
    if err != nil {
        span.RecordError(err)
        return
    }
    client := http.Client{}

    //====== 方式二,利用插件
    // "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport, 
                                                                                                                    otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents))}
    // 请注意此处的 otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
  // 有助于在ES中额外记录Http请求信息
    //======

    resp, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    body, err := io.ReadAll(resp.Body)
    defer resp.Body.Close()
    fmt.Printf("%s", body)
}(ctx)

time.Sleep(10 * time.Second) // Trace后台协程发送,等待发送完。

HttpServer

//首先,记得初始化资源
Init()

//====== 方式一,直接注册,需要在Handle中手动从Header中拿
http.HandleFunc("/", indexHandler)
//====== 方式二,利用插件,上下文自动注入
// "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
http.Handle("/", otelhttp.NewHandler(
        http.HandlerFunc(indexHandler), 
        "indexHandler", 
        otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
))
// 我个人比较推荐既有业务增加中间件的形式
// "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
// 这个包的实现是符合标准库的
//====== 

func indexHandler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    
    // 若不使用插件,需要手动从Header中获取Trace信息
    // ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(r.Header)) //从Header中取出传播的信息
    
    // 同时可以获取上级Span
    span := trace.SpanFromContext(ctx)
    bag := baggage.FromContext(ctx)

    ctx, span := otel.Tracer("doHandleTracer").Start(ctx, "doHandle", trace.WithAttributes(attribute.String("url", r.URL.String())))

    bag := baggage.FromContext(ctx)
    defer span.End()
    span.AddEvent("doHandle 处理开始")

    // 给Event 打Tag,在ES中表现为Label.user-id
    span.AddEvent("baggage got:" + bag.String(), trace.WithAttributes(attribute.String("user-id", bag.Member("user-id").String())))
    
    // 给Span打Tag
    t := time.Now()
    span.SetAttributes(attribute.String("process.time", t.Sub(time.Now()).String()))

    // 简单地打一个Metric
    counter, _ := otel.GetMeterProvider().Meter("httpServer").Int64Counter("indexHandlerCounter")
    counter.Add(ctx, 1)

    w.Write([]byte(time.Now().String()))
}

gRPC Client

func main() {
    Init()

    ctx := context.Background()
    dialOptions := []grpc.DialOption{
        // 使用社区开发的插件,拦截gRPC请求,自动处理Trace传递
        // "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
        grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
        grpc.WithInsecure(),
    }
    func(ctx context.Context) {
        // 创建Span
        ctx, span := otel.Tracer("grpcClientTracer").Start(ctx, "grpcSayHelloStart")
        defer span.End()
        // 连接服务端
        conn, err := grpc.Dial("127.0.0.1:8080", dialOptions...)
        if err != nil {
            fmt.Printf("连接服务端失败: %s", err)
            span.AddEvent("失败")
            span.SetStatus(codes.Error, "连接服务端失败")
            span.RecordError(err)
            return
        }
        defer conn.Close()
        c := opt.NewTestServiceClient(conn)
        
        // 请求第一个RPC方法
        span.AddEvent("Req SayHello")
        r, err := c.SayHello(ctx, &opt.EchoRequest{Name: "Sato"})
        if err != nil {
            fmt.Printf("调用服务端代码失败: %s", err)
            span.SetStatus(codes.Error, "连接服务端失败")
            span.RecordError(err)
            return
        }

        // 请求第二个RPC方法
        span.AddEvent("Req Add")
        bag, _ := baggage.New()
        member, err := baggage.NewMember("user-id", "dextercai")
        setMember, err := bag.SetMember(member)

        if err != nil {
            panic(err)
        }
        // 但是Baggage还是需要手动注入Ctx
        ctx = baggage.ContextWithBaggage(ctx, setMember)

        c.Add(ctx, &opt.AddRequest{Foo: []int32{
            1, 2, 3, 4, 5, 6, 7,
        }})
    
        // 从这个例子中可以看到,我们没有手动显式得向gRPC Metadata注入信息,插件会以拦截器的方式自动完成这一步。

    }(ctx)

    time.Sleep(10 * time.Second)

}

gRPC Server

type serverImpl struct {
    *opt.UnimplementedTestServiceServer
}

func (s serverImpl) SayHello(ctx context.Context, request *opt.EchoRequest) (*opt.EchoReply, error) {
    ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcSayHelloServerStart")
    defer span.End()
    span.AddEvent("Reply", trace.WithAttributes(
        attribute.String("username", "unknown")))
    rpy := &opt.EchoReply{Message: request.GetName()}
    return rpy, nil
}

func (s serverImpl) Add(ctx context.Context, request *opt.AddRequest) (*opt.AddReply, error) {
    ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcAddServerStart")
    bag := baggage.FromContext(ctx)
    defer span.End()
    rpy := &opt.AddReply{}
    for i := range request.GetFoo() {
        rpy.Result += int64(i)
    }
    span.AddEvent("Done")
    span.AddEvent("user id from baggage:" + bag.Member("user-id").Value())
    return rpy, nil
}

func main() {
    Init()
    lis, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Printf("监听端口失败: %s", err)
        return
    }

    s := grpc.NewServer(
        // 同样注意这个地方,和请求时的dialOption一样,使用了社区的插件
        grpc.StatsHandler(otelgrpc.NewServerHandler()),
    )
    opt.RegisterTestServiceServer(s, &serverImpl{})

    reflection.Register(s) // 按需

    err = s.Serve(lis)
    if err != nil {
        fmt.Printf("开启服务失败: %s", err)
        return
    }
}

即日起视情况关闭全站评论区,您可以通过关于页面的电邮地址和我取得联系,谢谢

Last Modified: October 20, 2023
Archives QR Code
QR Code for this page
Tipping QR Code