Telemetry Buffer

The current transport implementation of most of the SDKs uses a simple FIFO queue that processes all envelope types with equal priority. As our SDKs collect more and more telemetry with different types, it may become a problem that critical telemetry (such as errors/crashes) get delayed and de-prioritized due to other telemetry (traces, replays, logs) occupying the queue. This is especially relevant with replays, logs and continuous profiling which we periodically flush to the queue.

Introduce a per-telemetry buffer layer between Client and Transport to:

  • Batch telemetry when protocol-safe (all types, not only logs)
  • Apply early rate limiting (don’t enqueue when limited)
  • Enforce bounded memory via fixed-capacity ring buffers
  • Prioritize delivery by telemetry criticality via weighted round-robin
  • Keep existing transport/offline cache semantics unchanged (might change)
  • (Stretch) Have a http connection per telemetry type (only backend SDKs)

Copied
┌───────────────────────────────────────────────────────────────────────────┐
│                              Client                                       │
│   captureEvent / captureTransaction / captureReplay / captureLogs / ...   │
└───────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────────────┐
│                          TelemetryBuffer                                  │
│  - Holds per-category buffers                                             │
│  - Early rate-limit check (shared RateLimiter)│  - Method-based submit to per-category buffers                            │
└───────────────────────────────────────────────────────────────────────────┘
               ┌───────────────┼────────────────────────────────┐
               ▼               ▼                                ▼
┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐
│  Errors/Feedback      │ │  Sessions/CheckIns    │ │  Log                  │
(CRITICAL)           │ │  (HIGH)               │ │  (MEDIUM)│  RingBuffer + Batcher │ │  RingBuffer + Batcher │ │  RingBuffer + Batcher │
└───────────────────────┘ └───────────────────────┘ └───────────────────────┘
               │               │                                │
               ▼               ▼                                ▼
┌───────────────────────────────────────────────────────────────────────────┐
│                    EnvelopeScheduler (Weighted RR)│  - Cross-buffer selection by priority (5..1)│  - Re-checks RateLimiter before send                                      │
│  - Submits envelopes to transport                                         │
└───────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────────────┐
│                        Transport (unchanged).                             │
│   - Single worker, disk cache, offline retry, client reports              │
└───────────────────────────────────────────────────────────────────────────┘

  • CRITICAL: Error, Feedback
  • HIGH: Session, CheckIn
  • MEDIUM: Log, ClientReport, Span
  • LOW: Transaction, Profile, ProfileChunk
  • LOWEST: Replay

Configurable via weights.

  • Client

    • Owns per-category buffers and is the single entry for all capture paths.
    • Consults RateLimiter early; on active rate limit do not enqueue and record DiscardReason.RATELIMIT_BACKOFF.
    • Submits items from capture methods to the matching per-category buffer.
  • TelemetryBuffer<T> (per DataCategory)

    • Fixed-capacity ring buffer (bounded memory).
    • Stores raw items (pre-envelope).
    • Type-aware batching policy (size and/or time). Examples:
      • Errors/Feedback/Sessions/CheckIns: typically single-item; allow small batch if protocol-safe.
      • Logs: size/time-based (reuse semantics of LoggerBatchProcessor).
      • Spans: trace-based.
      • Transactions/Profiles/Replay: default single-item;
    • Overflow policy: drop-oldest (default). Record DiscardReason.QUEUE_OVERFLOW.
  • EnvelopeScheduler

    • Single worker; weighted round-robin across priorities: weights 5,4,3,2,1.
    • Pulls ready batches from buffers; builds envelopes from batches; re-checks RateLimiter at send-time.
    • Submits envelopes to ITransport. If transport unhealthy or recently rejected, backs off briefly.
  • Rate Limiting

    • Shared RateLimiter is the source of truth.
    • Checked both at buffer ingress (to avoid queueing) and at egress (to avoid sending).
  • Transport and Offline Cache

    • Unchanged. Disk caching, retry semantics, client report recording remain in transport.
    • Buffers are in-memory only (for now).

  • bufferCapacityByCategory: map<DataCategory, int> (defaults tuned per volume)
  • priorityWeights: CRITICAL..LOWEST (default 5,4,3,2,1)
  • overflowPolicy: drop_oldest | drop_newest (default drop_oldest)
  • preemptLowerPriorityForCritical: boolean (default false)
  • scheduler:
    • backoffMsOnTransportUnhealthy (e.g., 250–1000 ms for backpressure)
    • maxQueueSize (soft cap; default derived from transport queue)

Copied
enum class EnvelopePriority(val weight: Int) {
  CRITICAL(5), HIGH(4), MEDIUM(3), LOW(2), LOWEST(1)
}

interface TelemetryBuffer<T> {
  val category: DataCategory
  val priority: EnvelopePriority
  fun offer(item: T): OfferResult // may drop; record client report
  fun nextBatchReady(nowMs: Long): Boolean
  fun drainBatch(maxItems: Int, nowMs: Long): List<T>
  fun size(): Int
}

class Client(
  private val buffers: Map<DataCategory, TelemetryBuffer<Any>>,
  private val rateLimiter: RateLimiter,
  private val clientReports: ClientReportRecorder
) {
  fun captureEvent(event: Any) = submit(DataCategory.Error, event)
  fun captureTransaction(tx: Any) = submit(DataCategory.Transaction, tx)
  fun captureReplay(replay: Any) = submit(DataCategory.Replay, replay)
  fun captureLog(log: Any) = submit(DataCategory.Log, log)
  // ... other capture methods ...

  private fun submit(category: DataCategory, item: Any) {
    if (rateLimiter.isRateLimitActive(category)) {
      clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, category)
      return
    }
    val res = buffers[category]?.offer(item)
    if (res is OfferResult.Dropped) {
      clientReports.recordLostEvent(DiscardReason.QUEUE_OVERFLOW, category, res.count)
    }
  }
}

class EnvelopeScheduler(
  private val buffersByPriority: Map<EnvelopePriority, List<TelemetryBuffer<*>>,
  private val transport: ITransport,
  private val rateLimiter: RateLimiter,
  private val clientReports: ClientReportRecorder,
  private val weights: Map<EnvelopePriority, Int>,
  private val backoffMs: Long
) : Thread("TelemetryEnvelopeScheduler") {
  override fun run() {
    val order = generatePriorityCycle(weights) // e.g., [CRITICAL×5, HIGH×4, ...]
    while (true) {
      var sentSomething = false
      for (p in order) {
        val buf = selectReadyBuffer(buffersByPriority[p])
        if (buf != null) {
          val cat = buf.category
          val batch = buf.drainBatch(maxItemsFor(cat), nowMs())
          if (batch.isNotEmpty()) {
            if (!rateLimiter.isRateLimitActive(cat)) {
              val envelopes = buildEnvelopes(cat, batch)
              for (env in envelopes) {
                transport.send(env)
              }
              sentSomething = true
            } else {
              clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, cat, batch.size)
            }
          }
        }
      }
      if (!sentSomething) sleep(backoffMs)
    }
  }

  private fun buildEnvelopes(category: DataCategory, batch: List<Any>): List<Envelope> {
    // Applies SDK version, trace context if provided, and size constraints
    // Returns one or more envelopes constructed from the batch
    TODO()
  }
}

Ring buffer semantics (per buffer):

  • Fixed capacity N; on offer when full:
    • drop_oldest: evict 1 from head, enqueue new; record queue overflow client report
    • drop_newest: reject incoming; record queue overflow client report

Batching policy examples:

  • Logs: batch up to 100 items or 5s; split if envelope size limit reached
  • Spans: batch per trace
  • Errors/Feedback: batch size 1 (default)
  • Sessions/CheckIns: small batches if safe (e.g., 10 or 1s)
  • Transactions/Profiles/Replay: default 1

  • Counters per category: enqueued, sent, queue_overflow, rate_limited
    • Create a dashboard per category with reasons for dropped events
  • Health: include buffer health in Client.isHealthy() alongside transport

  • Enabled by default with conservative capacities and weights
  • No change to envelope format, transport, or disk caching

  • Default capacities per category (especially logs/replay vs. critical)
  • Category weights (logs and spans will be high volume, so we might want to send them more often)
  • Safe batching across categories beyond logs/client reports
    • Shall we adapt Relay to accept multiple "top-level" items (like errors or transactions) in a single envelope?
  • Multiple http connections per telemetry type
Was this helpful?
Help improve this content
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").