Skip to content

Commit

Permalink
ZING-32005: Fix data races. (#27)
Browse files Browse the repository at this point in the history
* fix data race

* fix data race on writer

* add sleep to manager_test.go

* fix Makefile

* replace Addwg()

* Use different strategy to fix dataraces

---------

Co-authored-by: Danylo Kondratiev <knightpp@proton.me>
  • Loading branch information
JuanC1303 and knightpp authored Dec 13, 2023
1 parent 373c1ca commit 966eaeb
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 19 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,18 @@ test: COVERAGE_PROFILE := coverprofile.out
test: COVERAGE_HTML := $(COVERAGE_DIR)/index.html
test: COVERAGE_XML := $(COVERAGE_DIR)/coverage.xml
test: fmt lint
@echo "Please ENABLE RACE DETECTOR"
@mkdir -p $(COVERAGE_DIR)
@$(GINKGO) \
run \
-r \
--tags integration \
--cover \
--coverprofile $(COVERAGE_PROFILE) \
--covermode=count \
--covermode=atomic \
--race \
--junit-report=junit.xml
$Q $(GO) tool cover -html=$(COVERAGE_PROFILE) -o $(COVERAGE_HTML)
$Q $(GOCOV) convert $(COVERAGE_PROFILE) | $(GOCOVXML) > $(COVERAGE_XML)
@echo "Please ENABLE RACE DETECTOR"

.PHONY: test-containerized
test-containerized:
Expand Down
5 changes: 4 additions & 1 deletion health/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func StopCollectorSingleton() {
collectorLock.Lock()
defer collectorLock.Unlock()
if collector != nil {
close(collector.done)
collector.doneOnce.Do(func() {
close(collector.done)
})
}
}

Expand All @@ -79,6 +81,7 @@ type healthCollector struct {
cycleDuration time.Duration
metricsIn chan<- *TargetMeasurement
done chan struct{}
doneOnce sync.Once
}

func (hc *healthCollector) HeartBeat(targetID string) (context.CancelFunc, error) {
Expand Down
37 changes: 30 additions & 7 deletions health/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

logging "github.com/zenoss/zenoss-go-sdk/health/log"
Expand All @@ -42,19 +43,41 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health
c := NewCollector(cfg.CollectionCycle, measurementsCh)
SetCollectorSingleton(c)

ctx, cancel := context.WithCancel(ctx)
var doneWg sync.WaitGroup

doneWg.Add(1)
go func() {
defer doneWg.Done()
defer cancel()
<-ctx.Done()

StopCollectorSingleton()
}()

go m.Start(ctx, measurementsCh, healthCh, targetCh)
doneWg.Add(1)
go func() {
defer doneWg.Done()
defer cancel()

go writer.Start(ctx, healthCh, targetCh)
m.Start(ctx, measurementsCh, healthCh, targetCh)
}()

doneWg.Add(1)
go func() {
defer doneWg.Done()
defer cancel()

writer.Start(ctx, healthCh, targetCh)
}()

return func() {
cancel()

StopCollectorSingleton()
m.Shutdown()
writer.Shutdown()
doneWg.Wait()
}
}

Expand Down Expand Up @@ -104,7 +127,7 @@ type healthManager struct {
wg *sync.WaitGroup

mu sync.Mutex
started bool
started atomic.Bool
}

func (hm *healthManager) Start(
Expand All @@ -131,10 +154,10 @@ func (hm *healthManager) Start(
hm.healthForwarder(ctx, healthIn)
}()

hm.started = true
hm.started.Store(true)
go func() {
hm.wg.Wait()
hm.started = false
hm.started.Store(false)
hm.stopWait <- struct{}{}
}()

Expand All @@ -150,7 +173,7 @@ func (hm *healthManager) Shutdown() {
}

func (hm *healthManager) IsStarted() bool {
return hm.started
return hm.started.Load()
}

func (hm *healthManager) AddTargets(targets []*target.Target) {
Expand All @@ -159,7 +182,7 @@ func (hm *healthManager) AddTargets(targets []*target.Target) {

for _, newTarget := range targets {
hm.registry.setRawHealthForTarget(newRawHealth(newTarget))
if hm.started {
if hm.IsStarted() {
hm.targetIn <- newTarget
}
}
Expand Down
1 change: 1 addition & 0 deletions health/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ var _ = Describe("Health Manager", func() {
CounterChange: counterValue,
}
mesuresCh <- counterMeasure
time.Sleep(200 * time.Microsecond)

go func() {
testTarget := <-targetCh
Expand Down
7 changes: 0 additions & 7 deletions health/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package writer

import (
"context"
"sync"

"github.com/zenoss/zenoss-go-sdk/health/log"
"github.com/zenoss/zenoss-go-sdk/health/target"
Expand All @@ -29,7 +28,6 @@ func New(dests []Destination) HealthWriter {
return &writer{
destinations: dests,
stopSig: stopSig,
wg: &sync.WaitGroup{},
}
}

Expand All @@ -38,13 +36,9 @@ type writer struct {
// we can also create and add some data transformers if
// health data should be changed somehow before send
stopSig chan struct{}
wg *sync.WaitGroup
}

func (w *writer) Start(ctx context.Context, healthIn <-chan *target.Health, targetIn <-chan *target.Target) {
w.wg.Add(1)
defer w.wg.Done()

for {
select {
case healthData, more := <-healthIn:
Expand Down Expand Up @@ -75,5 +69,4 @@ func (w *writer) Start(ctx context.Context, healthIn <-chan *target.Health, targ

func (w *writer) Shutdown() {
close(w.stopSig)
w.wg.Wait()
}
10 changes: 9 additions & 1 deletion health/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -83,14 +84,21 @@ var _ = Describe("Writer", func() {
)
h := target.NewHealth(targetID, utils.DefaultTargetType)

var wg sync.WaitGroup
hCh := make(chan *target.Health)
tCh := make(chan *target.Target)
go healthWriter.Start(ctx, hCh, tCh)

wg.Add(1)
go func() {
defer wg.Done()
healthWriter.Start(ctx, hCh, tCh)
}()
tCh <- hTarget
hCh <- h
time.Sleep(1 * time.Second)
close(hCh)
close(tCh)
wg.Wait()
out := buf.String()

Ω(out).Should(ContainSubstring(fmt.Sprintf("TargetID: %s, Status=Healthy", targetID)))
Expand Down

0 comments on commit 966eaeb

Please sign in to comment.