初始化(每一个接入的应用都必须做的事情)
- func Init() {
- ctx := context.Background()
-
- // applicationRes 通常一个服务实例共享同一个applicationRes
- // 用于记录服务名,服务节点名等信息
- applicationRes := resource.NewWithAttributes(
- semconv.SchemaURL,
- semconv.ServiceName("grpcClient"),
- semconv.K8SNodeName("single-node"),
- // 还有其他的属性可以配置
- )
-
- // 初始化traceExporter
- otlpClient := otlptracehttp.NewClient(otlptracehttp.WithEndpoint("127.0.0.1:4318"), otlptracehttp.WithInsecure())
- traceExporter, err := otlptrace.New(ctx, otlpClient)
- if err != nil {
- panic(fmt.Sprintf("creating OTLP trace exporter: %w", err))
- }
- otel.SetTracerProvider(sdktrace.NewTracerProvider(
- sdktrace.WithBatcher(traceExporter),
- sdktrace.WithResource(applicationRes),
- ))
-
- // 初始化metricExporter
- metricExporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpoint("127.0.0.1:4318"), otlpmetrichttp.WithInsecure())
- if err != nil {
- panic(fmt.Sprintf("creating OTLP metric exporter: %w", err))
- }
- otel.SetMeterProvider(sdkmetric.NewMeterProvider(
- sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, sdkmetric.WithInterval(30*time.Second))), // 注意此处的传参
- sdkmetric.WithResource(applicationRes),
- ))
-
- // 初始化传播器
- otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
- }
HttpClient
- //首先,记得初始化资源
- Init()
-
- func(ctx context.Context) {
- newCtx, span := otel.Tracer("requestTracer").Start(ctx, "httpReqStart")
- defer span.End() // 关闭Span
-
- // 创建一个Baggage用于上下文传递
- bag, _ := baggage.New()
- member, _ := baggage.NewMember("user-id", "dextercai") // Key Value
- setMember, err := bag.SetMember(member)
- if err != nil {
- panic(err)
- }
-
- // 向Ctx中注入Baggage信息
- newCtx = baggage.ContextWithBaggage(newCtx, setMember)
- span.AddEvent("SendRequest") // 增加一个Event 可以充当Log使用
- req, err := http.NewRequestWithContext(newCtx, "GET", "http://localhost:3000/api/do/123", nil)
-
-
- //====== 方式一,手动向Header注入上下文信息
- carrier := propagation.HeaderCarrier(req.Header)
- otel.GetTextMapPropagator().Inject(newCtx, carrier) // 注入到HttpHeader中进行传递
- if err != nil {
- span.RecordError(err)
- return
- }
- client := http.Client{}
-
- //====== 方式二,利用插件
- // "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
- client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport,
- otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents))}
- // 请注意此处的 otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
- // 有助于在ES中额外记录Http请求信息
- //======
-
- resp, err := client.Do(req)
- if err != nil {
- panic(err)
- }
- body, err := io.ReadAll(resp.Body)
- defer resp.Body.Close()
- fmt.Printf("%s", body)
- }(ctx)
-
- time.Sleep(10 * time.Second) // Trace后台协程发送,等待发送完。
HttpServer
- //首先,记得初始化资源
- Init()
-
- //====== 方式一,直接注册,需要在Handle中手动从Header中拿
- http.HandleFunc("/", indexHandler)
- //====== 方式二,利用插件,上下文自动注入
- // "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
- http.Handle("/", otelhttp.NewHandler(
- http.HandlerFunc(indexHandler),
- "indexHandler",
- otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
- ))
- // 我个人比较推荐既有业务增加中间件的形式
- // "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
- // 这个包的实现是符合标准库的
- //======
-
- func indexHandler(w http.ResponseWriter, r *http.Request) {
- ctx := r.Context()
-
- // 若不使用插件,需要手动从Header中获取Trace信息
- // ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(r.Header)) //从Header中取出传播的信息
-
- // 同时可以获取上级Span
- span := trace.SpanFromContext(ctx)
- bag := baggage.FromContext(ctx)
-
- ctx, span := otel.Tracer("doHandleTracer").Start(ctx, "doHandle", trace.WithAttributes(attribute.String("url", r.URL.String())))
-
- bag := baggage.FromContext(ctx)
- defer span.End()
- span.AddEvent("doHandle 处理开始")
-
- // 给Event 打Tag,在ES中表现为Label.user-id
- span.AddEvent("baggage got:" + bag.String(), trace.WithAttributes(attribute.String("user-id", bag.Member("user-id").String())))
-
- // 给Span打Tag
- t := time.Now()
- span.SetAttributes(attribute.String("process.time", t.Sub(time.Now()).String()))
-
- // 简单地打一个Metric
- counter, _ := otel.GetMeterProvider().Meter("httpServer").Int64Counter("indexHandlerCounter")
- counter.Add(ctx, 1)
-
- w.Write([]byte(time.Now().String()))
- }
gRPC Client
- func main() {
- Init()
-
- ctx := context.Background()
- dialOptions := []grpc.DialOption{
- // 使用社区开发的插件,拦截gRPC请求,自动处理Trace传递
- // "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
- grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
- grpc.WithInsecure(),
- }
- func(ctx context.Context) {
- // 创建Span
- ctx, span := otel.Tracer("grpcClientTracer").Start(ctx, "grpcSayHelloStart")
- defer span.End()
- // 连接服务端
- conn, err := grpc.Dial("127.0.0.1:8080", dialOptions...)
- if err != nil {
- fmt.Printf("连接服务端失败: %s", err)
- span.AddEvent("失败")
- span.SetStatus(codes.Error, "连接服务端失败")
- span.RecordError(err)
- return
- }
- defer conn.Close()
- c := opt.NewTestServiceClient(conn)
-
- // 请求第一个RPC方法
- span.AddEvent("Req SayHello")
- r, err := c.SayHello(ctx, &opt.EchoRequest{Name: "Sato"})
- if err != nil {
- fmt.Printf("调用服务端代码失败: %s", err)
- span.SetStatus(codes.Error, "连接服务端失败")
- span.RecordError(err)
- return
- }
-
- // 请求第二个RPC方法
- span.AddEvent("Req Add")
- bag, _ := baggage.New()
- member, err := baggage.NewMember("user-id", "dextercai")
- setMember, err := bag.SetMember(member)
-
- if err != nil {
- panic(err)
- }
- // 但是Baggage还是需要手动注入Ctx
- ctx = baggage.ContextWithBaggage(ctx, setMember)
-
- c.Add(ctx, &opt.AddRequest{Foo: []int32{
- 1, 2, 3, 4, 5, 6, 7,
- }})
-
- // 从这个例子中可以看到,我们没有手动显式得向gRPC Metadata注入信息,插件会以拦截器的方式自动完成这一步。
-
- }(ctx)
-
- time.Sleep(10 * time.Second)
-
- }
gRPC Server
- type serverImpl struct {
- *opt.UnimplementedTestServiceServer
- }
-
- func (s serverImpl) SayHello(ctx context.Context, request *opt.EchoRequest) (*opt.EchoReply, error) {
- ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcSayHelloServerStart")
- defer span.End()
- span.AddEvent("Reply", trace.WithAttributes(
- attribute.String("username", "unknown")))
- rpy := &opt.EchoReply{Message: request.GetName()}
- return rpy, nil
- }
-
- func (s serverImpl) Add(ctx context.Context, request *opt.AddRequest) (*opt.AddReply, error) {
- ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcAddServerStart")
- bag := baggage.FromContext(ctx)
- defer span.End()
- rpy := &opt.AddReply{}
- for i := range request.GetFoo() {
- rpy.Result += int64(i)
- }
- span.AddEvent("Done")
- span.AddEvent("user id from baggage:" + bag.Member("user-id").Value())
- return rpy, nil
- }
-
- func main() {
- Init()
- lis, err := net.Listen("tcp", ":8080")
- if err != nil {
- fmt.Printf("监听端口失败: %s", err)
- return
- }
-
- s := grpc.NewServer(
- // 同样注意这个地方,和请求时的dialOption一样,使用了社区的插件
- grpc.StatsHandler(otelgrpc.NewServerHandler()),
- )
- opt.RegisterTestServiceServer(s, &serverImpl{})
-
- reflection.Register(s) // 按需
-
- err = s.Serve(lis)
- if err != nil {
- fmt.Printf("开启服务失败: %s", err)
- return
- }
- }
本文标题:链路跟踪与观测:OpenTelemetry 应用侧接入
本文连接:https://blog.dextercai.com/archives/208.html
除另行说明,本站文字内容采用创作共用版权 CC-BY-NC-ND 4.0 许可协议,版权归本人所有。
除另行说明,本站图片内容版权归本人所有,未经许可前,严禁以任何形式的使用。
即日起视情况关闭全站评论区,您可以通过关于页面的电邮地址和我取得联系,谢谢