Skip to content

Commit

Permalink
Merge pull request #15 from drpcorg/broadcast-queue
Browse files Browse the repository at this point in the history
Broadcast queue
  • Loading branch information
Termina1 authored Jan 14, 2025
2 parents 7878715 + f556d6d commit fc920f4
Show file tree
Hide file tree
Showing 4 changed files with 420 additions and 77 deletions.
33 changes: 16 additions & 17 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ type Options struct {
Src uint64
Name string
Log1 protocol.Records
MaxLogLen int64
RelaxedOrder bool
Logger utils.Logger
PingPeriod time.Duration
PingWait time.Duration
PingPeriod time.Duration // how often should we ping neighbour replicae if its silent
PingWait time.Duration // how much time we wait until pong received
PebbleWriteOptions *pebble.WriteOptions
BroadcastBatchSize int
BroadcastTimeLimit time.Duration
ReadAccumTimeLimit time.Duration
BroadcastQueueMaxSize int // size in bytes, after reaching it all writes will block
BroadcastQueueMinBatchSize int // reads will wait until they have enough data or timelimit expires
// if this limit expires before read has enough data (BroadcastQueueMinBatchSize) it will return whatever it has,
// writes will cause overflow error which will result in queue shutdown and session end
BroadcastQueueTimeLimit time.Duration
ReadAccumTimeLimit time.Duration //
ReadMaxBufferSize int
ReadMinBufferSizeToProcess int
TcpReadBufferSize int
Expand All @@ -95,10 +96,6 @@ type Options struct {
}

func (o *Options) SetDefaults() {
if o.MaxLogLen == 0 {
o.MaxLogLen = 1 << 23
}

if o.PingPeriod == 0 {
o.PingPeriod = 30 * time.Second
}
Expand All @@ -118,8 +115,11 @@ func (o *Options) SetDefaults() {
o.PebbleWriteOptions = &pebble.WriteOptions{Sync: true}
}

if o.BroadcastTimeLimit == 0 {
o.BroadcastTimeLimit = time.Millisecond
if o.BroadcastQueueTimeLimit == 0 {
o.BroadcastQueueTimeLimit = time.Second
}
if o.BroadcastQueueMaxSize == 0 {
o.BroadcastQueueMaxSize = 10 * 1024 * 1024 // 10Mb
}
if o.ReadAccumTimeLimit == 0 {
o.ReadAccumTimeLimit = 5 * time.Second
Expand Down Expand Up @@ -236,9 +236,8 @@ func Open(dirname string, opts Options) (*Chotki, error) {

cho.net = protocol.NewNet(cho.log,
func(name string) protocol.FeedDrainCloserTraced { // new connection
const outQueueLimit = 1 << 20

queue := utils.NewFDQueue[protocol.Records](outQueueLimit, cho.opts.BroadcastTimeLimit, cho.opts.BroadcastBatchSize)
queue := utils.NewFDQueue[protocol.Records](cho.opts.BroadcastQueueMaxSize, cho.opts.BroadcastQueueTimeLimit, cho.opts.BroadcastQueueMinBatchSize)
if q, loaded := cho.outq.LoadAndStore(name, queue); loaded && q != nil {
cho.log.Warn(fmt.Sprintf("closing the old conn to %s", name))
if err := q.Close(); err != nil {
Expand Down Expand Up @@ -586,8 +585,8 @@ func (n *ChotkiCollector) Collect(m chan<- prometheus.Metric) {
need_to_pass[key] = true
}
n.chotki.outq.Range(func(key string, value protocol.DrainCloser) bool {
if q, ok := value.(*utils.FDQueue[protocol.Records, []byte]); ok {
m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, float64(q.Len()), key)
if q, ok := value.(*utils.FDQueue[protocol.Records]); ok {
m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, float64(q.Size()), key)
nw_collected[key] = struct{}{}
need_to_pass[key] = false
}
Expand Down
18 changes: 8 additions & 10 deletions protocol/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func tlsConfig(servername string) *tls.Config {
}
}

type TracedQueue[S ~[]E, E any] struct {
*utils.FDQueue[S, E]
type TracedQueue[T ~[][]byte] struct {
*utils.FDQueue[T]
}

func (t *TracedQueue[S, E]) GetTraceId() string {
func (t *TracedQueue[T]) GetTraceId() string {
return ""
}

Expand All @@ -77,28 +77,26 @@ func TestTCPDepot_Connect(t *testing.T) {

log := utils.NewDefaultLogger(slog.LevelDebug)

lCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
lCon := utils.NewFDQueue[Records](1000, time.Minute, 1)
l := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{lCon}
return &TracedQueue[Records]{lCon}
}, func(_ string, t Traced) { lCon.Close() }, &NetTlsConfigOpt{tlsConfig("a.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute})

err := l.Listen(loop)
assert.Nil(t, err)

cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
cCon := utils.NewFDQueue[Records](1000, time.Minute, 1)
c := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{cCon}
return &TracedQueue[Records]{cCon}
}, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute})

err = c.Connect(loop)
assert.Nil(t, err)
time.Sleep(time.Second) // Wait connection, todo use events

// send a record
err = cCon.Drain(context.Background(), Records{Record('M', []byte("Hi there"))})
assert.Nil(t, err)

time.Sleep(1000 * time.Millisecond)
rec, err := lCon.Feed(context.Background())
assert.Nil(t, err)
assert.Greater(t, len(rec), 0)
Expand Down Expand Up @@ -136,7 +134,7 @@ func TestTCPDepot_ConnectFailed(t *testing.T) {

cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
c := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{cCon}
return &TracedQueue[Records]{cCon}
}, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")})

err := c.Connect(loop)
Expand Down
Loading

0 comments on commit fc920f4

Please sign in to comment.