MENU

链路跟踪与观测:OpenTelemetry 应用侧接入

October 17, 2023 • 学习

初始化(每一个接入的应用都必须做的事情)

  • func Init() {
  • ctx := context.Background()
  • // applicationRes 通常一个服务实例共享同一个applicationRes
  • // 用于记录服务名,服务节点名等信息
  • applicationRes := resource.NewWithAttributes(
  • semconv.SchemaURL,
  • semconv.ServiceName("grpcClient"),
  • semconv.K8SNodeName("single-node"),
  • // 还有其他的属性可以配置
  • )
  • // 初始化traceExporter
  • otlpClient := otlptracehttp.NewClient(otlptracehttp.WithEndpoint("127.0.0.1:4318"), otlptracehttp.WithInsecure())
  • traceExporter, err := otlptrace.New(ctx, otlpClient)
  • if err != nil {
  • panic(fmt.Sprintf("creating OTLP trace exporter: %w", err))
  • }
  • otel.SetTracerProvider(sdktrace.NewTracerProvider(
  • sdktrace.WithBatcher(traceExporter),
  • sdktrace.WithResource(applicationRes),
  • ))
  • // 初始化metricExporter
  • metricExporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpoint("127.0.0.1:4318"), otlpmetrichttp.WithInsecure())
  • if err != nil {
  • panic(fmt.Sprintf("creating OTLP metric exporter: %w", err))
  • }
  • otel.SetMeterProvider(sdkmetric.NewMeterProvider(
  • sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, sdkmetric.WithInterval(30*time.Second))), // 注意此处的传参
  • sdkmetric.WithResource(applicationRes),
  • ))
  • // 初始化传播器
  • otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
  • }

HttpClient

  • //首先,记得初始化资源
  • Init()
  • func(ctx context.Context) {
  • newCtx, span := otel.Tracer("requestTracer").Start(ctx, "httpReqStart")
  • defer span.End() // 关闭Span
  • // 创建一个Baggage用于上下文传递
  • bag, _ := baggage.New()
  • member, _ := baggage.NewMember("user-id", "dextercai") // Key Value
  • setMember, err := bag.SetMember(member)
  • if err != nil {
  • panic(err)
  • }
  • // 向Ctx中注入Baggage信息
  • newCtx = baggage.ContextWithBaggage(newCtx, setMember)
  • span.AddEvent("SendRequest") // 增加一个Event 可以充当Log使用
  • req, err := http.NewRequestWithContext(newCtx, "GET", "http://localhost:3000/api/do/123", nil)
  • //====== 方式一,手动向Header注入上下文信息
  • carrier := propagation.HeaderCarrier(req.Header)
  • otel.GetTextMapPropagator().Inject(newCtx, carrier) // 注入到HttpHeader中进行传递
  • if err != nil {
  • span.RecordError(err)
  • return
  • }
  • client := http.Client{}
  • //====== 方式二,利用插件
  • // "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  • client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport,
  • otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents))}
  • // 请注意此处的 otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
  • // 有助于在ES中额外记录Http请求信息
  • //======
  • resp, err := client.Do(req)
  • if err != nil {
  • panic(err)
  • }
  • body, err := io.ReadAll(resp.Body)
  • defer resp.Body.Close()
  • fmt.Printf("%s", body)
  • }(ctx)
  • time.Sleep(10 * time.Second) // Trace后台协程发送,等待发送完。

HttpServer

  • //首先,记得初始化资源
  • Init()
  • //====== 方式一,直接注册,需要在Handle中手动从Header中拿
  • http.HandleFunc("/", indexHandler)
  • //====== 方式二,利用插件,上下文自动注入
  • // "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  • http.Handle("/", otelhttp.NewHandler(
  • http.HandlerFunc(indexHandler),
  • "indexHandler",
  • otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
  • ))
  • // 我个人比较推荐既有业务增加中间件的形式
  • // "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  • // 这个包的实现是符合标准库的
  • //======
  • func indexHandler(w http.ResponseWriter, r *http.Request) {
  • ctx := r.Context()
  • // 若不使用插件,需要手动从Header中获取Trace信息
  • // ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(r.Header)) //从Header中取出传播的信息
  • // 同时可以获取上级Span
  • span := trace.SpanFromContext(ctx)
  • bag := baggage.FromContext(ctx)
  • ctx, span := otel.Tracer("doHandleTracer").Start(ctx, "doHandle", trace.WithAttributes(attribute.String("url", r.URL.String())))
  • bag := baggage.FromContext(ctx)
  • defer span.End()
  • span.AddEvent("doHandle 处理开始")
  • // 给Event 打Tag,在ES中表现为Label.user-id
  • span.AddEvent("baggage got:" + bag.String(), trace.WithAttributes(attribute.String("user-id", bag.Member("user-id").String())))
  • // 给Span打Tag
  • t := time.Now()
  • span.SetAttributes(attribute.String("process.time", t.Sub(time.Now()).String()))
  • // 简单地打一个Metric
  • counter, _ := otel.GetMeterProvider().Meter("httpServer").Int64Counter("indexHandlerCounter")
  • counter.Add(ctx, 1)
  • w.Write([]byte(time.Now().String()))
  • }

gRPC Client

  • func main() {
  • Init()
  • ctx := context.Background()
  • dialOptions := []grpc.DialOption{
  • // 使用社区开发的插件,拦截gRPC请求,自动处理Trace传递
  • // "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
  • grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
  • grpc.WithInsecure(),
  • }
  • func(ctx context.Context) {
  • // 创建Span
  • ctx, span := otel.Tracer("grpcClientTracer").Start(ctx, "grpcSayHelloStart")
  • defer span.End()
  • // 连接服务端
  • conn, err := grpc.Dial("127.0.0.1:8080", dialOptions...)
  • if err != nil {
  • fmt.Printf("连接服务端失败: %s", err)
  • span.AddEvent("失败")
  • span.SetStatus(codes.Error, "连接服务端失败")
  • span.RecordError(err)
  • return
  • }
  • defer conn.Close()
  • c := opt.NewTestServiceClient(conn)
  • // 请求第一个RPC方法
  • span.AddEvent("Req SayHello")
  • r, err := c.SayHello(ctx, &opt.EchoRequest{Name: "Sato"})
  • if err != nil {
  • fmt.Printf("调用服务端代码失败: %s", err)
  • span.SetStatus(codes.Error, "连接服务端失败")
  • span.RecordError(err)
  • return
  • }
  • // 请求第二个RPC方法
  • span.AddEvent("Req Add")
  • bag, _ := baggage.New()
  • member, err := baggage.NewMember("user-id", "dextercai")
  • setMember, err := bag.SetMember(member)
  • if err != nil {
  • panic(err)
  • }
  • // 但是Baggage还是需要手动注入Ctx
  • ctx = baggage.ContextWithBaggage(ctx, setMember)
  • c.Add(ctx, &opt.AddRequest{Foo: []int32{
  • 1, 2, 3, 4, 5, 6, 7,
  • }})
  • // 从这个例子中可以看到,我们没有手动显式得向gRPC Metadata注入信息,插件会以拦截器的方式自动完成这一步。
  • }(ctx)
  • time.Sleep(10 * time.Second)
  • }

gRPC Server

  • type serverImpl struct {
  • *opt.UnimplementedTestServiceServer
  • }
  • func (s serverImpl) SayHello(ctx context.Context, request *opt.EchoRequest) (*opt.EchoReply, error) {
  • ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcSayHelloServerStart")
  • defer span.End()
  • span.AddEvent("Reply", trace.WithAttributes(
  • attribute.String("username", "unknown")))
  • rpy := &opt.EchoReply{Message: request.GetName()}
  • return rpy, nil
  • }
  • func (s serverImpl) Add(ctx context.Context, request *opt.AddRequest) (*opt.AddReply, error) {
  • ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcAddServerStart")
  • bag := baggage.FromContext(ctx)
  • defer span.End()
  • rpy := &opt.AddReply{}
  • for i := range request.GetFoo() {
  • rpy.Result += int64(i)
  • }
  • span.AddEvent("Done")
  • span.AddEvent("user id from baggage:" + bag.Member("user-id").Value())
  • return rpy, nil
  • }
  • func main() {
  • Init()
  • lis, err := net.Listen("tcp", ":8080")
  • if err != nil {
  • fmt.Printf("监听端口失败: %s", err)
  • return
  • }
  • s := grpc.NewServer(
  • // 同样注意这个地方,和请求时的dialOption一样,使用了社区的插件
  • grpc.StatsHandler(otelgrpc.NewServerHandler()),
  • )
  • opt.RegisterTestServiceServer(s, &serverImpl{})
  • reflection.Register(s) // 按需
  • err = s.Serve(lis)
  • if err != nil {
  • fmt.Printf("开启服务失败: %s", err)
  • return
  • }
  • }

即日起视情况关闭全站评论区,您可以通过关于页面的电邮地址和我取得联系,谢谢

Last Modified: October 20, 2023
Archives QR Code
QR Code for this page
Tipping QR Code
Leave a Comment

  • OωO
  • |´・ω・)ノ
  • ヾ(≧∇≦*)ゝ
  • (☆ω☆)
  • (╯‵□′)╯︵┴─┴
  •  ̄﹃ ̄
  • (/ω\)
  • ∠( ᐛ 」∠)_
  • (๑•̀ㅁ•́ฅ)
  • →_→
  • ୧(๑•̀⌄•́๑)૭
  • ٩(ˊᗜˋ*)و
  • (ノ°ο°)ノ
  • (´இ皿இ`)
  • ⌇●﹏●⌇
  • (ฅ´ω`ฅ)
  • (╯°A°)╯︵○○○
  • φ( ̄∇ ̄o)
  • ヾ(´・ ・`。)ノ"
  • ( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
  • (ó﹏ò。)
  • Σ(っ °Д °;)っ
  • ( ,,´・ω・)ノ"(´っω・`。)
  • ╮(╯▽╰)╭
  • o(*////▽////*)q
  • >﹏<
  • ( ๑´•ω•) "(ㆆᴗㆆ)
  • (。•ˇ‸ˇ•。)
  • 泡泡
  • 阿鲁
  • 颜文字