Skip to content

Commit

Permalink
prevent error that wg is already in use
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Feb 4, 2025
1 parent d9ac223 commit 975bcb9
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions atomic_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,23 @@ func NewAtomicCounter(db *Chotki, rid rdx.ID, offset uint64, updatePeriod time.D
func (a *AtomicCounter) load() (any, error) {
now := time.Now()
if a.data.Load() != nil && now.Sub(a.expiration) < 0 {
a.wg.Add(1)
return a.data.Load(), nil
}

// release this goroutine's wg so, in case its stopped on
// mutex and was not firt to aquire it, it would not block the progress
a.wg.Done()
a.lock.Lock()
a.wg.Add(1)
defer a.lock.Unlock()
if a.data.Load() != nil && now.Sub(a.expiration) < 0 {
a.wg.Add(1)
return a.data.Load(), nil
}

// release this goroutine's wg to proceed on wait
a.wg.Done()
a.wg.Wait()
a.wg.Add(1)
rdt, tlv, err := a.db.ObjectFieldTLV(a.rid.ToOff(a.offset))
if err != nil {
return nil, err
Expand Down Expand Up @@ -98,16 +105,16 @@ func (a *AtomicCounter) load() (any, error) {
}
a.data.Store(data)
a.expiration = now.Add(a.updatePeriod)
a.wg.Add(1)
return data, nil
}

func (a *AtomicCounter) Get(ctx context.Context) (int64, error) {
a.wg.Add(1)
defer a.wg.Done()
data, err := a.load()
if err != nil {
return 0, err
}
defer a.wg.Done()
switch c := data.(type) {
case *atomicNcounter:
return int64(c.total.Load()), nil
Expand All @@ -120,11 +127,12 @@ func (a *AtomicCounter) Get(ctx context.Context) (int64, error) {

// Loads (if needed) and increments counter
func (a *AtomicCounter) Increment(ctx context.Context, val int64) (int64, error) {
a.wg.Add(1)
defer a.wg.Done()
data, err := a.load()
if err != nil {
return 0, err
}
defer a.wg.Done()
var dtlv []byte
var result int64
var rdt byte
Expand Down

0 comments on commit 975bcb9

Please sign in to comment.