初始化(每一个接入的应用都必须做的事情)
func Init() {
ctx := context.Background()
// applicationRes 通常一个服务实例共享同一个applicationRes
// 用于记录服务名,服务节点名等信息
applicationRes := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("grpcClient"),
semconv.K8SNodeName("single-node"),
// 还有其他的属性可以配置
)
// 初始化traceExporter
otlpClient := otlptracehttp.NewClient(otlptracehttp.WithEndpoint("127.0.0.1:4318"), otlptracehttp.WithInsecure())
traceExporter, err := otlptrace.New(ctx, otlpClient)
if err != nil {
panic(fmt.Sprintf("creating OTLP trace exporter: %w", err))
}
otel.SetTracerProvider(sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(applicationRes),
))
// 初始化metricExporter
metricExporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpoint("127.0.0.1:4318"), otlpmetrichttp.WithInsecure())
if err != nil {
panic(fmt.Sprintf("creating OTLP metric exporter: %w", err))
}
otel.SetMeterProvider(sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, sdkmetric.WithInterval(30*time.Second))), // 注意此处的传参
sdkmetric.WithResource(applicationRes),
))
// 初始化传播器
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
}
HttpClient
//首先,记得初始化资源
Init()
func(ctx context.Context) {
newCtx, span := otel.Tracer("requestTracer").Start(ctx, "httpReqStart")
defer span.End() // 关闭Span
// 创建一个Baggage用于上下文传递
bag, _ := baggage.New()
member, _ := baggage.NewMember("user-id", "dextercai") // Key Value
setMember, err := bag.SetMember(member)
if err != nil {
panic(err)
}
// 向Ctx中注入Baggage信息
newCtx = baggage.ContextWithBaggage(newCtx, setMember)
span.AddEvent("SendRequest") // 增加一个Event 可以充当Log使用
req, err := http.NewRequestWithContext(newCtx, "GET", "http://localhost:3000/api/do/123", nil)
//====== 方式一,手动向Header注入上下文信息
carrier := propagation.HeaderCarrier(req.Header)
otel.GetTextMapPropagator().Inject(newCtx, carrier) // 注入到HttpHeader中进行传递
if err != nil {
span.RecordError(err)
return
}
client := http.Client{}
//====== 方式二,利用插件
// "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport,
otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents))}
// 请注意此处的 otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
// 有助于在ES中额外记录Http请求信息
//======
resp, err := client.Do(req)
if err != nil {
panic(err)
}
body, err := io.ReadAll(resp.Body)
defer resp.Body.Close()
fmt.Printf("%s", body)
}(ctx)
time.Sleep(10 * time.Second) // Trace后台协程发送,等待发送完。
HttpServer
//首先,记得初始化资源
Init()
//====== 方式一,直接注册,需要在Handle中手动从Header中拿
http.HandleFunc("/", indexHandler)
//====== 方式二,利用插件,上下文自动注入
// "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
http.Handle("/", otelhttp.NewHandler(
http.HandlerFunc(indexHandler),
"indexHandler",
otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)
))
// 我个人比较推荐既有业务增加中间件的形式
// "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
// 这个包的实现是符合标准库的
//======
func indexHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 若不使用插件,需要手动从Header中获取Trace信息
// ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(r.Header)) //从Header中取出传播的信息
// 同时可以获取上级Span
span := trace.SpanFromContext(ctx)
bag := baggage.FromContext(ctx)
ctx, span := otel.Tracer("doHandleTracer").Start(ctx, "doHandle", trace.WithAttributes(attribute.String("url", r.URL.String())))
bag := baggage.FromContext(ctx)
defer span.End()
span.AddEvent("doHandle 处理开始")
// 给Event 打Tag,在ES中表现为Label.user-id
span.AddEvent("baggage got:" + bag.String(), trace.WithAttributes(attribute.String("user-id", bag.Member("user-id").String())))
// 给Span打Tag
t := time.Now()
span.SetAttributes(attribute.String("process.time", t.Sub(time.Now()).String()))
// 简单地打一个Metric
counter, _ := otel.GetMeterProvider().Meter("httpServer").Int64Counter("indexHandlerCounter")
counter.Add(ctx, 1)
w.Write([]byte(time.Now().String()))
}
gRPC Client
func main() {
Init()
ctx := context.Background()
dialOptions := []grpc.DialOption{
// 使用社区开发的插件,拦截gRPC请求,自动处理Trace传递
// "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithInsecure(),
}
func(ctx context.Context) {
// 创建Span
ctx, span := otel.Tracer("grpcClientTracer").Start(ctx, "grpcSayHelloStart")
defer span.End()
// 连接服务端
conn, err := grpc.Dial("127.0.0.1:8080", dialOptions...)
if err != nil {
fmt.Printf("连接服务端失败: %s", err)
span.AddEvent("失败")
span.SetStatus(codes.Error, "连接服务端失败")
span.RecordError(err)
return
}
defer conn.Close()
c := opt.NewTestServiceClient(conn)
// 请求第一个RPC方法
span.AddEvent("Req SayHello")
r, err := c.SayHello(ctx, &opt.EchoRequest{Name: "Sato"})
if err != nil {
fmt.Printf("调用服务端代码失败: %s", err)
span.SetStatus(codes.Error, "连接服务端失败")
span.RecordError(err)
return
}
// 请求第二个RPC方法
span.AddEvent("Req Add")
bag, _ := baggage.New()
member, err := baggage.NewMember("user-id", "dextercai")
setMember, err := bag.SetMember(member)
if err != nil {
panic(err)
}
// 但是Baggage还是需要手动注入Ctx
ctx = baggage.ContextWithBaggage(ctx, setMember)
c.Add(ctx, &opt.AddRequest{Foo: []int32{
1, 2, 3, 4, 5, 6, 7,
}})
// 从这个例子中可以看到,我们没有手动显式得向gRPC Metadata注入信息,插件会以拦截器的方式自动完成这一步。
}(ctx)
time.Sleep(10 * time.Second)
}
gRPC Server
type serverImpl struct {
*opt.UnimplementedTestServiceServer
}
func (s serverImpl) SayHello(ctx context.Context, request *opt.EchoRequest) (*opt.EchoReply, error) {
ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcSayHelloServerStart")
defer span.End()
span.AddEvent("Reply", trace.WithAttributes(
attribute.String("username", "unknown")))
rpy := &opt.EchoReply{Message: request.GetName()}
return rpy, nil
}
func (s serverImpl) Add(ctx context.Context, request *opt.AddRequest) (*opt.AddReply, error) {
ctx, span := otel.Tracer("grpcTracer").Start(ctx, "grpcAddServerStart")
bag := baggage.FromContext(ctx)
defer span.End()
rpy := &opt.AddReply{}
for i := range request.GetFoo() {
rpy.Result += int64(i)
}
span.AddEvent("Done")
span.AddEvent("user id from baggage:" + bag.Member("user-id").Value())
return rpy, nil
}
func main() {
Init()
lis, err := net.Listen("tcp", ":8080")
if err != nil {
fmt.Printf("监听端口失败: %s", err)
return
}
s := grpc.NewServer(
// 同样注意这个地方,和请求时的dialOption一样,使用了社区的插件
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
opt.RegisterTestServiceServer(s, &serverImpl{})
reflection.Register(s) // 按需
err = s.Serve(lis)
if err != nil {
fmt.Printf("开启服务失败: %s", err)
return
}
}
本文标题:链路跟踪与观测:OpenTelemetry 应用侧接入
本文连接:https://blog.dextercai.com/archives/208.html
除另行说明,本站文字内容采用创作共用版权 CC-BY-NC-ND 4.0 许可协议,版权归本人所有。
除另行说明,本站图片内容版权归本人所有,任何形式的使用需提前联系。