Skip to content

Commit

Permalink
Drop users when coordinator is lost
Browse files Browse the repository at this point in the history
  • Loading branch information
sergystepanov committed Oct 21, 2023
1 parent cb968d7 commit 3e116fc
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 30 deletions.
2 changes: 2 additions & 0 deletions pkg/worker/coordinatorhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke
func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com.Uid], w *Worker) {
if user := w.router.FindUser(rq.Id); user != nil {
w.router.Remove(user)
c.log.Debug().Msgf(">>> users: %v", w.router.Users())
user.Disconnect()
}
}
Expand All @@ -173,6 +174,7 @@ func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com.
func (c *coordinator) HandleQuitGame(rq api.GameQuitRequest[com.Uid], w *Worker) {
if user := w.router.FindUser(rq.Id); user != nil {
w.router.Remove(user)
c.log.Debug().Msgf(">>> users: %v", w.router.Users())
}
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/worker/room/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *Router[T]) FindRoom(id string) *Room[T] {
func (r *Router[T]) Remove(user T) {
if left := r.users.RemoveL(user); left == 0 {
r.Close()
r.SetRoom(nil)
r.SetRoom(nil) // !to remove
}
}

Expand All @@ -130,6 +130,16 @@ func (r *Router[T]) Room() *Room[T] { r.mu.Lock(); defer r.mu.Unlock()
func (r *Router[T]) SetRoom(room *Room[T]) { r.mu.Lock(); r.room = room; r.mu.Unlock() }
func (r *Router[T]) HasRoom() bool { r.mu.Lock(); defer r.mu.Unlock(); return r.room != nil }
func (r *Router[T]) Users() SessionManager[T] { return r.users }
func (r *Router[T]) Reset() {
r.mu.Lock()
if r.room != nil {
r.room.Close()
r.room = nil
}
r.users.ForEach(func(u T) { u.Disconnect() })
r.users.Reset()
r.mu.Unlock()
}

type AppSession struct {
Uid
Expand Down
28 changes: 0 additions & 28 deletions pkg/worker/room/room_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,34 +269,6 @@ func BenchmarkRoom(b *testing.B) {
}
}

type tSession struct{}

func (t tSession) SendAudio([]byte, int32) {}
func (t tSession) SendVideo([]byte, int32) {}
func (t tSession) SendData([]byte) {}
func (t tSession) Disconnect() {}
func (t tSession) Id() string { return "1" }

func TestRouter(t *testing.T) {
u := com.NewNetMap[string, *tSession]()
router := Router[*tSession]{users: &u}

var r *Room[*tSession]

router.SetRoom(&Room[*tSession]{id: "test001"})
room := router.FindRoom("test001")
if room == nil {
t.Errorf("no room, but should be")
}
router.SetRoom(r)
room = router.FindRoom("x")
if room != nil {
t.Errorf("a room, but should not be")
}
router.SetRoom(nil)
router.Close()
}

// expand joins a list of file path elements.
func expand(p ...string) string {
ph, _ := filepath.Abs(filepath.FromSlash(filepath.Join(p...)))
Expand Down
74 changes: 74 additions & 0 deletions pkg/worker/room/router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package room

import (
"testing"

"github.com/giongto35/cloud-game/v3/pkg/com"
)

type tSession struct {
id string
connected bool
}

func (t *tSession) SendAudio([]byte, int32) {}
func (t *tSession) SendVideo([]byte, int32) {}
func (t *tSession) SendData([]byte) {}
func (t *tSession) Connect() { t.connected = true }
func (t *tSession) Disconnect() { t.connected = false }
func (t *tSession) Id() string { return t.id }

type lookMap struct {
com.NetMap[string, *tSession]
prev com.NetMap[string, *tSession] // we could use pointers in the original :3
}

func (l *lookMap) Reset() {
l.prev = com.NewNetMap[string, *tSession]()
l.Map.ForEach(func(s *tSession) { l.prev.Add(s) })
l.NetMap.Reset()
}

func TestRouter(t *testing.T) {
router := newTestRouter()

var r *Room[*tSession]

router.SetRoom(&Room[*tSession]{id: "test001"})
room := router.FindRoom("test001")
if room == nil {
t.Errorf("no room, but should be")
}
router.SetRoom(r)
room = router.FindRoom("x")
if room != nil {
t.Errorf("a room, but should not be")
}
router.SetRoom(nil)
router.Close()
}

func TestRouterReset(t *testing.T) {
u := lookMap{NetMap: com.NewNetMap[string, *tSession]()}
router := Router[*tSession]{users: &u}

router.AddUser(&tSession{id: "1", connected: true})
router.AddUser(&tSession{id: "2", connected: false})
router.AddUser(&tSession{id: "3", connected: true})

router.Reset()

disconnected := true
u.prev.ForEach(func(u *tSession) { disconnected = disconnected && !u.connected })
if !disconnected {
t.Errorf("not all users were disconnected, but should")
}
if !router.Users().Empty() {
t.Errorf("has users after reset, but should not")
}
}

func newTestRouter() *Router[*tSession] {
u := com.NewNetMap[string, *tSession]()
return &Router[*tSession]{users: &u}
}
5 changes: 4 additions & 1 deletion pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@ func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) {
return worker, nil
}

func (w *Worker) Reset() { w.router.Close() }
func (w *Worker) Reset() { w.router.Reset() }

func (w *Worker) Start(done chan struct{}) {
for _, s := range w.services {
if s != nil {
s.Run()
}
}

// !to restore alive worker info when coordinator connection was lost

go func() {
remoteAddr := w.conf.Worker.Network.CoordinatorAddress
defer func() {
Expand Down

0 comments on commit 3e116fc

Please sign in to comment.