Telemetry Buffer
The current transport implementation of most of the SDKs uses a simple FIFO queue that processes all envelope types with equal priority. As our SDKs collect more and more telemetry with different types, it may become a problem that critical telemetry (such as errors/crashes) get delayed and de-prioritized due to other telemetry (traces, replays, logs) occupying the queue. This is especially relevant with replays, logs and continuous profiling which we periodically flush to the queue.
Introduce a per-telemetry buffer layer between Client
and Transport
to:
- Batch telemetry when protocol-safe (all types, not only logs)
- Apply early rate limiting (don’t enqueue when limited)
- Enforce bounded memory via fixed-capacity ring buffers
- Prioritize delivery by telemetry criticality via weighted round-robin
- Keep existing transport/offline cache semantics unchanged (might change)
- (Stretch) Have a http connection per telemetry type (only backend SDKs)
Copied
┌───────────────────────────────────────────────────────────────────────────┐
│ Client │
│ captureEvent / captureTransaction / captureReplay / captureLogs / ... │
└───────────────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────────────┐
│ TelemetryBuffer │
│ - Holds per-category buffers │
│ - Early rate-limit check (shared RateLimiter) │
│ - Method-based submit to per-category buffers │
└───────────────────────────────────────────────────────────────────────────┘
│
┌───────────────┼────────────────────────────────┐
▼ ▼ ▼
┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐
│ Errors/Feedback │ │ Sessions/CheckIns │ │ Log │
│ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │
│ RingBuffer + Batcher │ │ RingBuffer + Batcher │ │ RingBuffer + Batcher │
└───────────────────────┘ └───────────────────────┘ └───────────────────────┘
│ │ │
▼ ▼ ▼
┌───────────────────────────────────────────────────────────────────────────┐
│ EnvelopeScheduler (Weighted RR) │
│ - Cross-buffer selection by priority (5..1) │
│ - Re-checks RateLimiter before send │
│ - Submits envelopes to transport │
└───────────────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────────────┐
│ Transport (unchanged). │
│ - Single worker, disk cache, offline retry, client reports │
└───────────────────────────────────────────────────────────────────────────┘
- CRITICAL: Error, Feedback
- HIGH: Session, CheckIn
- MEDIUM: Log, ClientReport, Span
- LOW: Transaction, Profile, ProfileChunk
- LOWEST: Replay
Configurable via weights.
Client
- Owns per-category buffers and is the single entry for all capture paths.
- Consults
RateLimiter
early; on active rate limit do not enqueue and recordDiscardReason.RATELIMIT_BACKOFF
. - Submits items from capture methods to the matching per-category buffer.
TelemetryBuffer<T> (per DataCategory)
- Fixed-capacity ring buffer (bounded memory).
- Stores raw items (pre-envelope).
- Type-aware batching policy (size and/or time). Examples:
- Errors/Feedback/Sessions/CheckIns: typically single-item; allow small batch if protocol-safe.
- Logs: size/time-based (reuse semantics of
LoggerBatchProcessor
). - Spans: trace-based.
- Transactions/Profiles/Replay: default single-item;
- Overflow policy: drop-oldest (default). Record
DiscardReason.QUEUE_OVERFLOW
.
EnvelopeScheduler
- Single worker; weighted round-robin across priorities: weights 5,4,3,2,1.
- Pulls ready batches from buffers; builds envelopes from batches; re-checks
RateLimiter
at send-time. - Submits envelopes to
ITransport
. If transport unhealthy or recently rejected, backs off briefly.
Rate Limiting
- Shared
RateLimiter
is the source of truth. - Checked both at buffer ingress (to avoid queueing) and at egress (to avoid sending).
- Shared
Transport and Offline Cache
- Unchanged. Disk caching, retry semantics, client report recording remain in transport.
- Buffers are in-memory only (for now).
bufferCapacityByCategory
: map<DataCategory, int> (defaults tuned per volume)priorityWeights
: CRITICAL..LOWEST (default 5,4,3,2,1)overflowPolicy
:drop_oldest
|drop_newest
(defaultdrop_oldest
)preemptLowerPriorityForCritical
: boolean (default false)scheduler
:backoffMsOnTransportUnhealthy
(e.g., 250–1000 ms for backpressure)maxQueueSize
(soft cap; default derived from transport queue)
Copied
enum class EnvelopePriority(val weight: Int) {
CRITICAL(5), HIGH(4), MEDIUM(3), LOW(2), LOWEST(1)
}
interface TelemetryBuffer<T> {
val category: DataCategory
val priority: EnvelopePriority
fun offer(item: T): OfferResult // may drop; record client report
fun nextBatchReady(nowMs: Long): Boolean
fun drainBatch(maxItems: Int, nowMs: Long): List<T>
fun size(): Int
}
class Client(
private val buffers: Map<DataCategory, TelemetryBuffer<Any>>,
private val rateLimiter: RateLimiter,
private val clientReports: ClientReportRecorder
) {
fun captureEvent(event: Any) = submit(DataCategory.Error, event)
fun captureTransaction(tx: Any) = submit(DataCategory.Transaction, tx)
fun captureReplay(replay: Any) = submit(DataCategory.Replay, replay)
fun captureLog(log: Any) = submit(DataCategory.Log, log)
// ... other capture methods ...
private fun submit(category: DataCategory, item: Any) {
if (rateLimiter.isRateLimitActive(category)) {
clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, category)
return
}
val res = buffers[category]?.offer(item)
if (res is OfferResult.Dropped) {
clientReports.recordLostEvent(DiscardReason.QUEUE_OVERFLOW, category, res.count)
}
}
}
class EnvelopeScheduler(
private val buffersByPriority: Map<EnvelopePriority, List<TelemetryBuffer<*>>,
private val transport: ITransport,
private val rateLimiter: RateLimiter,
private val clientReports: ClientReportRecorder,
private val weights: Map<EnvelopePriority, Int>,
private val backoffMs: Long
) : Thread("TelemetryEnvelopeScheduler") {
override fun run() {
val order = generatePriorityCycle(weights) // e.g., [CRITICAL×5, HIGH×4, ...]
while (true) {
var sentSomething = false
for (p in order) {
val buf = selectReadyBuffer(buffersByPriority[p])
if (buf != null) {
val cat = buf.category
val batch = buf.drainBatch(maxItemsFor(cat), nowMs())
if (batch.isNotEmpty()) {
if (!rateLimiter.isRateLimitActive(cat)) {
val envelopes = buildEnvelopes(cat, batch)
for (env in envelopes) {
transport.send(env)
}
sentSomething = true
} else {
clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, cat, batch.size)
}
}
}
}
if (!sentSomething) sleep(backoffMs)
}
}
private fun buildEnvelopes(category: DataCategory, batch: List<Any>): List<Envelope> {
// Applies SDK version, trace context if provided, and size constraints
// Returns one or more envelopes constructed from the batch
TODO()
}
}
Ring buffer semantics (per buffer):
- Fixed capacity N; on offer when full:
drop_oldest
: evict 1 from head, enqueue new; record queue overflow client reportdrop_newest
: reject incoming; record queue overflow client report
Batching policy examples:
- Logs: batch up to 100 items or 5s; split if envelope size limit reached
- Spans: batch per trace
- Errors/Feedback: batch size 1 (default)
- Sessions/CheckIns: small batches if safe (e.g., 10 or 1s)
- Transactions/Profiles/Replay: default 1
- Counters per category: enqueued, sent, queue_overflow, rate_limited
- Create a dashboard per category with reasons for dropped events
- Health: include buffer health in
Client.isHealthy()
alongside transport
- Enabled by default with conservative capacities and weights
- No change to envelope format, transport, or disk caching
- Default capacities per category (especially logs/replay vs. critical)
- Category weights (logs and spans will be high volume, so we might want to send them more often)
- Safe batching across categories beyond logs/client reports
- Shall we adapt Relay to accept multiple "top-level" items (like errors or transactions) in a single envelope?
- Multiple http connections per telemetry type
Was this helpful?
Help improve this content
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").