Skip to content

Commit

Permalink
collect outq sizes to prom
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Jan 10, 2025
1 parent 016e3f2 commit 7878715
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
43 changes: 43 additions & 0 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,48 @@ func (n *NetCollector) Collect(m chan<- prometheus.Metric) {
}
}

type ChotkiCollector struct {
chotki *Chotki
outq_size *prometheus.Desc
collected_prev map[string]struct{}
lock sync.Mutex
}

func NewChotkiCollector(chotki *Chotki) *ChotkiCollector {
return &ChotkiCollector{
chotki: chotki,
outq_size: prometheus.NewDesc("chotki_outq_len", "", []string{"name"}, prometheus.Labels{}),
}
}

func (c *ChotkiCollector) Describe(d chan<- *prometheus.Desc) {
d <- c.outq_size
}

func (n *ChotkiCollector) Collect(m chan<- prometheus.Metric) {
n.lock.Lock()
defer n.lock.Unlock()

nw_collected := make(map[string]struct{})
need_to_pass := make(map[string]bool)
for key := range n.collected_prev {
need_to_pass[key] = true
}
n.chotki.outq.Range(func(key string, value protocol.DrainCloser) bool {
if q, ok := value.(*utils.FDQueue[protocol.Records, []byte]); ok {
m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, float64(q.Len()), key)
nw_collected[key] = struct{}{}
need_to_pass[key] = false
}
return true
})
for name, v := range need_to_pass {
if v { // we previously set this, but now it was deleted
m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, 0, name)
}
}
n.collected_prev = nw_collected
}
func (cho *Chotki) Metrics() []prometheus.Collector {
cho.db.Metrics()
return []prometheus.Collector{
Expand All @@ -566,6 +608,7 @@ func (cho *Chotki) Metrics() []prometheus.Collector {
NewNetCollector(cho.net),
EventsBatchSize,
NewPebbleCollector(cho.db),
NewChotkiCollector(cho),
OpenedIterators,
OpenedSnapshots,
SessionsStates,
Expand Down
7 changes: 7 additions & 0 deletions utils/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func (q *FDQueue[S, E]) Close() error {
return nil
}

func (q *FDQueue[S, E]) Len() int {
if q.ctx.Err() != nil {
return 0
}
return len(q.ch)
}

func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error {
if q.ctx.Err() != nil {
return ErrClosed
Expand Down

0 comments on commit 7878715

Please sign in to comment.