Zipkin/Brave中的Span超时回收机制

对 http://www.iocoder.cn/categories/Zipkin/ 的一些补充,分析基于Brave#release-5.11.2分支,可能未来会有所变化

开头先打个广告,我们团队目前正在找人,坐标上海,感兴趣可以投递呀,嘻嘻。高级Java开发工程师(框架开发)

今天我们来讲Zipkin/BraveSpan的超时回收机制。这里的”超时”是广义上的超时,一般是指由于Span停留在内存中的时间过久而触发的回收机制。超时的原因一般可以认为是程序对于Span的处理不当造成的,比如启动/创建了一个Span但由于某些原因没有关闭这个Span,从而导致内存泄露。

Timeout或Deadline机制

在Twitter开发的RPC框架Finagle中我们可以找到这样的一个类DeadlineSpanMap,这个类会负责管理维护所有系统中的Span。以下Finagle的分析基于源码版本finagle-20.4.1

根据文档,DeadlineSpanMap维护的Span会有三种可能的状态流转,

  1. live -> on hold -> logged
  2. live -> on hold -> flushed -> logged
  3. live -> flushed -> logged

DeadlineSpanMap类的构造函数有三个参数,

  • logSpans: Seq[Span] => Future[Unit] 用于记录Span,是一个函数,输入一个Span列表,返回一个Future
  • ttl: Duration 过期时间TTL
  • timer: Timer 定时器,用于设置定时任务
  • hold: Duration = 500.milliseconds 表示Span几乎完成(Almost Complete)到从spanMap中被移除的时间,默认是500ms

我们可以通过测试用例来理解这三种不同的流转过程,

live -> on hold -> logged

  /**
   * Tests state transition sequence (i): live -> on hold -> logged.
   * See the comment in DeadlineSpanMap.scala for more details.
   */
  test("The hold timer catches late spans and on expiry logs the span") {
    // 用于模拟当前的时间
    Time.withCurrentTimeFrozen { tc =>
      var spansLoggedCount = 0
      var annotationCount = 0
      // 记录span的数目以及注解annotation的数目
      val logger: Seq[Span] => Future[Unit] = { spans =>
        spans.foreach { span => annotationCount += span.annotations.length }
        spansLoggedCount += spans.size
        Future.Done
      }

      val timer = new MockTimer
      // TTL时间
      val ttl: Duration = 10.milliseconds
      // Hold时间
      val hold: Duration = 2.milliseconds
      val map = new DeadlineSpanMap(logger, ttl, timer, hold)
      val traceId = TraceId(Some(SpanId(123)), Some(SpanId(123)), SpanId(123), None)

      // 向Span添加一个注解,注解就是类似于Log一样的存在。后面会解释
      map.update(traceId)(
        _.addAnnotation(
          ZipkinAnnotation(Time.now, Constants.CLIENT_RECV, Endpoint.Unknown)
        )
      )

      // 将虚拟的时钟往前拨1ms,但不超过hold的时效
      tc.advance(1.milliseconds) // advance timer but not beyond the hold deadline
      timer.tick()

      // 增加另一个无意义的Annotation
      map.update(traceId)(
        _.addAnnotation(
          ZipkinAnnotation(Time.now, "Extra annotation", Endpoint.Unknown)
        )
      )

      // 将虚拟的时钟往前拨1ms,超过hold的时效,但不超过ttl
      tc.advance(1.milliseconds)
      timer.tick()

      // 将虚拟的时钟往前拨10ms,超过ttl
      tc.advance(10.milliseconds)
      timer.tick()

      // Span应该仅被记录了一次,而Annotation是两个
      assert(spansLoggedCount == 1, "Wrong number of calls to log spans")
      assert(annotationCount == 2, "Wrong number of annotations")
    }
  }

这个测试用例演绎了一次正常的状态流转。

第一次更新的Annotation(Key=Constants.CLIENT_RECV, Value=Endpoint.Unknown),表示一个RPC请求收到了对端(peer)的响应,根据Zipkin的语义,这个注解表示一个RPC请求已经几乎完成。那么,这个Span会被打上onHold的标签,表示Span即将完成,参考 MutableSpan.scala

如果调用DeadlineSpanMap.update方法以后,发现Span处于onHold状态,那么会在定时器上绑定一个complete钩子,在经过hold时间后(默认为500ms)触发complete方法,在这个方法中会把该Span从spanMap中移除,并调用logSpans方法记录这个Span。

live -> on hold -> flushed -> logged

/**
   * Tests state transition sequence (ii): live -> on hold -> flushed -> logged.
   * See the comment in DeadlineSpanMap.scala for more details.
   */
  test("Even if on hold, the span is flushed if ttl expires first") {
    Time.withCurrentTimeFrozen { tc =>
      var spansLoggedCount = 0
      var annotationCount = 0
      val logger: Seq[Span] => Future[Unit] = { spans =>
        spans.foreach { span => annotationCount += span.annotations.length }
        spansLoggedCount += 1
        Future.Done
      }

      val timer = new MockTimer
      // 相比于(i)的测试用例,这里的ttl设定的很短
      val ttl: Duration = 1.milliseconds
      val hold: Duration = 2.milliseconds
      val map = new DeadlineSpanMap(logger, ttl, timer, hold)
      val traceId = TraceId(Some(SpanId(123)), Some(SpanId(123)), SpanId(123), None)

      // Add an annotation to transition the span to hold state.
      map.update(traceId)(
        _.addAnnotation(
          ZipkinAnnotation(Time.now, Constants.CLIENT_RECV, Endpoint.Unknown)
        )
      )

      tc.advance(1.milliseconds) // advance timer beyond the ttl
      timer.tick() // execute scheduled event

      // Add another annotation, which will be logged separately.
      map.update(traceId)(
        _.addAnnotation(
          ZipkinAnnotation(Time.now, "Extra annotation", Endpoint.Unknown)
        )
      )

      tc.advance(1.milliseconds) // advance timer beyond the ttl
      timer.tick() // execute scheduled event

      // Span must have been logged twice.
      assert(spansLoggedCount == 2, "Wrong number of calls to log spans")

      // Flushing adds a "finagle.flush" annotation and we have flushed twice.
      assert(annotationCount == 4, "Wrong number of annotations")
    }
  }

从测试用例的名称我们可以很容易理解这个状态,如果Span的完成耗时较长,那么即使已经设定了complete定时器,也有可以ttl定时器先被触发。此时的话,complete会被触发两次,一次是由timer上注册的flushTask任务触发的,

private class DeadlineSpanMap(
  logSpans: Seq[Span] => Future[Unit],
  ttl: Duration,
  timer: Timer,
  hold: Duration = 500.milliseconds) {
<SNIP>
private[this] val flushTask = timer.schedule(ttl / 2) { flush(ttl.ago) }
<SNIP>
}

另一次则是由于onHold状态被绑定的complete钩子触发的。

live -> flushed -> logged

/**
   * Tests state transition sequence (iii): live -> flushed -> logged.
   * See the comment in DeadlineSpanMap.scala for more details.
   */
  test("DeadlineSpanMap should expire and log spans") {
    Time.withCurrentTimeFrozen { tc =>
      var spansLogged: Boolean = false
      val logger: Seq[Span] => Future[Unit] = { _ =>
        spansLogged = true
        Future.Done
      }

      val timer = new MockTimer
      val map = new DeadlineSpanMap(logger, 1.milliseconds, timer)
      val traceId = TraceId(Some(SpanId(123)), Some(SpanId(123)), SpanId(123), None)

      map.update(traceId)(_.setServiceName("service").setName("name"))
      tc.advance(10.seconds) // advance timer
      timer.tick() // execute scheduled event

      // span must have been logged
      assert(spansLogged)
    }
  }

最后一种情况就是由于超时被回收。

这种超时回收的机制比较微妙的地方是你需要知道应用中的Span大致的耗时,以便对ttl和onHold做出优化,如果ttl设定的太小,那么有可能你的业务还没执行完成,就已经被标记为超时而flush。如果ttl太大,则会浪费内存。

Brave的新的”超时”机制

这里打上引号的原因是,Brave的这种机制本质上并不是根据时间来判断的,而且根据GC的压力来实现的。以下分析根据openzipkin/brave#release-5.11.2

Brave中实现这个逻辑主要是在PendingSpans类中。

public final class PendingSpans extends ReferenceQueue<TraceContext> {
  // Even though we only put by RealKey, we allow get and remove by LookupKey
  final ConcurrentMap<Object, PendingSpan> delegate = new ConcurrentHashMap<>(64);
<SNIP>
  public PendingSpan getOrCreate(TraceContext context, boolean start) {
    if (context == null) throw new NullPointerException("context == null");
    // 每次在创建MutableSpan的时候都会去检查OrphanedSpans
    reportOrphanedSpans();

<SNIP 省略创建MutableSpan和Clock的逻辑留着以后分析> 

    // 把context封装成WeakRef的实现RealKey并放到ConcurrentHashMap中
    PendingSpan previousSpan = delegate.putIfAbsent(new RealKey(context, this), newSpan);
    if (previousSpan != null) return previousSpan; // lost race
    // 是否需要追踪OrphanSpan的创建用于诊断问题
    // 如果需要,则会跟踪响应的线程和堆栈(Throwable.fillInStackTrace())
    if (trackOrphans) {
      newSpan.caller =
        new Throwable("Thread " + Thread.currentThread().getName() + " allocated span here");
    }
    return newSpan;
  }
<SNIP>
}

那么这里我们可以看到其实这里的PendingSpans继承了ReferenceQueue,目的是为了能够回收WeakReference。不太熟悉的同学可以看官方的文档,

  • https://docs.oracle.com/javase/8/docs/api/java/lang/ref/ReferenceQueue.html
  • https://docs.oracle.com/javase/8/docs/api/java/lang/ref/WeakReference.html

Creates a new weak reference that refers to the given object and is registered with the given queue.

在WeakReference的构造函数中可以传递一个ReferenceQueue用于收集那些被GC回收的引用。基于这个原理,可以实现超时回收的机制,具体来看一下代码。

public final class PendingSpans extends ReferenceQueue<TraceContext> {
  // Even though we only put by RealKey, we allow get and remove by LookupKey
  final ConcurrentMap<Object, PendingSpan> delegate = new ConcurrentHashMap<>(64);
<SNIP>
  /** Reports spans orphaned by garbage collection. */
  void reportOrphanedSpans() {
    RealKey contextKey;
    // This is called on critical path of unrelated traced operations. If we have orphaned spans, be
    // careful to not penalize the performance of the caller. It is better to cache time when
    // flushing a span than hurt performance of unrelated operations by calling
    // currentTimeMicroseconds N times
    long flushTime = 0L;
    boolean noop = orphanedSpanHandler == FinishedSpanHandler.NOOP || this.noop.get();
    // 从ReferenceQueue中提取被回收的TraceContext
    while ((contextKey = (RealKey) poll()) != null) {
      // delegate里面保存的是TraceContext到PendingSpan的映射
      PendingSpan value = delegate.remove(contextKey);
      if (noop || value == null) continue;
      if (flushTime == 0L) flushTime = clock.currentTimeMicroseconds();

      boolean isEmpty = value.state.isEmpty();
      Throwable caller = value.caller;

      // 从回收的TraceContext信息恢复TraceContext对象
      TraceContext context = InternalPropagation.instance.newTraceContext(
        contextKey.flags,
        contextKey.traceIdHigh, contextKey.traceId,
        contextKey.localRootId, 0L, contextKey.spanId,
        Collections.emptyList()
      );

      if (caller != null) {
        String message = isEmpty
          ? "Span " + context + " was allocated but never used"
          : "Span " + context + " neither finished nor flushed before GC";
        Platform.get().log(message, caller);
      }
      if (isEmpty) continue;

      // 在MutableSpan中标记一个"brave.flush"事件
      value.state.annotate(flushTime, "brave.flush");
      // 调用相应的Handler来处理OrphanedSpan
      orphanedSpanHandler.handle(context, value.state);
    }
  }
<SNIP>
}

这里PendingSpan value = delegate.remove(contextKey);这句需要注意的是,实际上在这里比较HashCode和对象是否相同的时候,TraceContext重写了相关的方法,使得它可以直接和WeakReference进行对比。

关于orphanedSpanHandler,Brave自带的ZipkinFinishedSpanHandler就是一个支持OrphanedSpans的Handler。

最后,我们通过PendingSpans的测试用例来看一下它的工作方式,

package brave.internal.recorder;
public class PendingSpansTest {
  <SNIP>
  /**
   * This is the key feature. Spans orphaned via GC are reported to zipkin on the next action.
   *
   * <p>This is a customized version of https://github.com/raphw/weak-lock-free/blob/master/src/test/java/com/blogspot/mydailyjava/weaklockfree/WeakConcurrentMapTest.java
   */
  @Test
  public void reportOrphanedSpans_afterGC() {
    // 构建context1
    TraceContext context1 = context.toBuilder().traceId(1).spanId(1).build();
    // 利用Context1创建一个PendingSpan
    PendingSpan span = pendingSpans.getOrCreate(context1, false);
    span.state.name("foo");
    // 移除对span的引用,那么在gc触发的时候span及其关联的Context1就会被回收
    span = null;
    // 类似得,构建context2,context3,context4
    TraceContext context2 = context.toBuilder().traceId(2).spanId(2).build();
    pendingSpans.getOrCreate(context2, false);
    TraceContext context3 = context.toBuilder().traceId(3).spanId(3).build();
    pendingSpans.getOrCreate(context3, false);
    TraceContext context4 = context.toBuilder().traceId(4).spanId(4).build();
    pendingSpans.getOrCreate(context4, false);
    // ensure sampled local spans are not reported when orphaned unless they are also sampled remote
    // context5是一个比较特殊的TraceContext,因为它不需要remote采样,仅设置了local采样,此类Span不会被Reporter收集
    TraceContext context5 =
      context.toBuilder().spanId(5).sampledLocal(true).sampled(false).build();
    pendingSpans.getOrCreate(context5, false);

    int initialClockVal = clock.get();

    // By clearing strong references in this test, we are left with the weak ones in the map
    // 移除context1, 2, 5的强引用,确保这几个对象会被gc
    context1 = context2 = context5 = null;
    // 模拟GC的过程,其实是调用了System.gc()
    GarbageCollectors.blockOnGC();

    // After GC, we expect that the weak references of context1 and context2 to be cleared
    // 在GC发生以后,由于Key是一个WeakReference,context1, 2, 5都会被回收,从而Key变成null
    assertThat(pendingSpans.delegate.keySet()).extracting(o -> ((Reference) o).get())
      .containsExactlyInAnyOrder(null, null, context3, context4, null);

    // Key为null的都会从ConcurrentHashMap中删除
    pendingSpans.reportOrphanedSpans();

    // After reporting, we expect no the weak references of null
    // 此时ConcurrentHashMap仅包含未被回收的context3,4
    assertThat(pendingSpans.delegate.keySet()).extracting(o -> ((Reference) o).get())
      .containsExactlyInAnyOrder(context3, context4);

    // We also expect only the sampled span containing data to have been reported
    assertThat(spans).hasSize(1);
    assertThat(spans.get(0).id()).isEqualTo("0000000000000001");
    assertThat(spans.get(0).name()).isEqualTo("foo"); // data was flushed
    assertThat(spans.get(0).annotations())
      .containsExactly(Annotation.create((initialClockVal + 1) * 1000, "brave.flush"));
  }

  <SNIP>
}

这边其实有一个地方有些问题,如果开发者对Brave库有误用,比如把Span存在了某个List或者以其他方式显式得持有了这个Span或者相应的Scope, 那么其实GC在这里是无能为力的。此时用Timeout机制可能会更好。

那么,以上就是Brave/Zipkin支持超时回收机制的始末和相关原理。