From 22599d7dbd006c8fc7d39cba8e8e7b6fb9de8c75 Mon Sep 17 00:00:00 2001 From: benyamin Date: Thu, 31 Aug 2023 16:49:15 +0330 Subject: [PATCH] fix: fix deadlock in room repository --- controllers/room.go | 11 +++++++++++ controllers/status.go | 7 ------- repositories/room.go | 37 ++++++++++++++++++++++++++----------- routers/router.go | 2 +- 4 files changed, 38 insertions(+), 19 deletions(-) delete mode 100644 controllers/status.go diff --git a/controllers/room.go b/controllers/room.go index 6eb2d1b..7e36bce 100644 --- a/controllers/room.go +++ b/controllers/room.go @@ -185,4 +185,15 @@ func (c *RoomController) Start(ctx *gin.Context) { resbody, _ := io.ReadAll(res.Body) println("get /join "+res.Status, string(resbody)) } + c.helper.Response(ctx, nil, http.StatusNoContent) +} + +func (c *RoomController) HealthCheck(ctx *gin.Context) { + if len(ctx.Query("roomId")) > 0 { + if !c.repo.DoesRoomExists(ctx.Query("roomId")) { + ctx.Status(http.StatusNotFound) + return + } + } + ctx.Status(204) } diff --git a/controllers/status.go b/controllers/status.go deleted file mode 100644 index b17c977..0000000 --- a/controllers/status.go +++ /dev/null @@ -1,7 +0,0 @@ -package controllers - -import "github.com/gin-gonic/gin" - -func HealthCheck(ctx *gin.Context) { - ctx.Status(204) -} diff --git a/repositories/room.go b/repositories/room.go index 4189fb5..6858da2 100644 --- a/repositories/room.go +++ b/repositories/room.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" - "log" "net/http" "sourcecode.social/greatape/goldgorilla/models" "sourcecode.social/greatape/goldgorilla/models/dto" @@ -50,6 +49,12 @@ func NewRoomRepository(conf *models.ConfigModel) *RoomRepository { } } +func (r *RoomRepository) DoesRoomExists(id string) bool { + r.Lock() + defer r.Unlock() + return r.doesRoomExists(id) +} + func (r *RoomRepository) doesRoomExists(id string) bool { if _, exists := r.Rooms[id]; exists { return true @@ -185,7 +190,7 @@ func (r *RoomRepository) onPeerConnectionStateChange(roomId string, id uint64, n switch newState { case webrtc.PeerConnectionStateFailed: if err := room.Peers[id].Conn.Close(); err != nil { - log.Print(err) + println(err.Error()) } case webrtc.PeerConnectionStateClosed: delete(room.Peers, id) @@ -225,11 +230,11 @@ func (r *RoomRepository) onPeerTrack(roomId string, id uint64, remote *webrtc.Tr n, _, err := remote.Read(buffer) if err != nil { println(err.Error()) - return + break } if _, err = trackLocal.Write(buffer[:n]); err != nil { println(err.Error()) - return + break } } } @@ -271,22 +276,28 @@ func (r *RoomRepository) updatePCTracks(roomId string) { _, alreadyReceived := receivingPeerTracks[id] if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceived) { renegotiate = true + if peer.Conn.ConnectionState() == webrtc.PeerConnectionStateClosed { + break + } println("[PC] add track", track.TrackLocal.ID(), "to", peer.ID) _, err := peer.Conn.AddTrack(track.TrackLocal) if err != nil { println(err.Error()) - return + break } } } for trackId, rtpSender := range alreadySentTracks { if _, exists := room.Tracks[trackId]; !exists { renegotiate = true + if peer.Conn.ConnectionState() == webrtc.PeerConnectionStateClosed { + break + } println("[PC] remove track", trackId, "from", peer.ID) err := peer.Conn.RemoveTrack(rtpSender) if err != nil { println(err.Error()) - return + break } } } @@ -309,16 +320,18 @@ func (r *RoomRepository) AddPeerIceCandidate(roomId string, id uint64, ic webrtc r.Unlock() return models.NewError("room doesn't exists", 403, map[string]any{"roomId": roomId}) } - r.Unlock() room := r.Rooms[roomId] + r.Unlock() room.Lock() - defer room.Unlock() if !r.doesPeerExists(roomId, id) { + room.Unlock() return models.NewError("no such a peer with this id in this room", 403, map[string]any{"roomId": roomId, "peerId": id}) } + peer := room.Peers[id] + room.Unlock() - err := room.Peers[id].Conn.AddICECandidate(ic) + err := peer.Conn.AddICECandidate(ic) if err != nil { return models.NewError(err.Error(), 500, models.MessageResponse{Message: err.Error()}) } @@ -431,11 +444,13 @@ func (r *RoomRepository) ResetRoom(roomId string) error { } room := r.Rooms[roomId] room.Lock() - defer room.Unlock() room.timer.Stop() for _, peer := range room.Peers { - _ = peer.Conn.Close() + go func(conn *webrtc.PeerConnection) { + _ = conn.Close() + }(peer.Conn) } + room.Unlock() delete(r.Rooms, roomId) return nil } diff --git a/routers/router.go b/routers/router.go index 172e1cc..bb37137 100644 --- a/routers/router.go +++ b/routers/router.go @@ -13,7 +13,7 @@ func (r *Router) RegisterRoutes(rCtrl *controllers.RoomController) error { gin.SetMode(gin.ReleaseMode) r.router = gin.Default() registerRoomRoutes(r.router.Group("/room"), rCtrl) - r.router.GET("/healthcheck", controllers.HealthCheck) + r.router.GET("/healthcheck", rCtrl.HealthCheck) return nil }