fix: fix deadlock in room repository

pull/1/head
Benyamin Azarkhazin 2023-08-31 16:49:15 +03:30
parent faef0f2962
commit 22599d7dbd
Signed by: benyamin
GPG Key ID: 3AE44F5623C70269
4 changed files with 38 additions and 19 deletions

View File

@ -185,4 +185,15 @@ func (c *RoomController) Start(ctx *gin.Context) {
resbody, _ := io.ReadAll(res.Body) resbody, _ := io.ReadAll(res.Body)
println("get /join "+res.Status, string(resbody)) println("get /join "+res.Status, string(resbody))
} }
c.helper.Response(ctx, nil, http.StatusNoContent)
}
func (c *RoomController) HealthCheck(ctx *gin.Context) {
if len(ctx.Query("roomId")) > 0 {
if !c.repo.DoesRoomExists(ctx.Query("roomId")) {
ctx.Status(http.StatusNotFound)
return
}
}
ctx.Status(204)
} }

View File

@ -1,7 +0,0 @@
package controllers
import "github.com/gin-gonic/gin"
func HealthCheck(ctx *gin.Context) {
ctx.Status(204)
}

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"log"
"net/http" "net/http"
"sourcecode.social/greatape/goldgorilla/models" "sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/models/dto" "sourcecode.social/greatape/goldgorilla/models/dto"
@ -50,6 +49,12 @@ func NewRoomRepository(conf *models.ConfigModel) *RoomRepository {
} }
} }
func (r *RoomRepository) DoesRoomExists(id string) bool {
r.Lock()
defer r.Unlock()
return r.doesRoomExists(id)
}
func (r *RoomRepository) doesRoomExists(id string) bool { func (r *RoomRepository) doesRoomExists(id string) bool {
if _, exists := r.Rooms[id]; exists { if _, exists := r.Rooms[id]; exists {
return true return true
@ -185,7 +190,7 @@ func (r *RoomRepository) onPeerConnectionStateChange(roomId string, id uint64, n
switch newState { switch newState {
case webrtc.PeerConnectionStateFailed: case webrtc.PeerConnectionStateFailed:
if err := room.Peers[id].Conn.Close(); err != nil { if err := room.Peers[id].Conn.Close(); err != nil {
log.Print(err) println(err.Error())
} }
case webrtc.PeerConnectionStateClosed: case webrtc.PeerConnectionStateClosed:
delete(room.Peers, id) delete(room.Peers, id)
@ -225,11 +230,11 @@ func (r *RoomRepository) onPeerTrack(roomId string, id uint64, remote *webrtc.Tr
n, _, err := remote.Read(buffer) n, _, err := remote.Read(buffer)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
return break
} }
if _, err = trackLocal.Write(buffer[:n]); err != nil { if _, err = trackLocal.Write(buffer[:n]); err != nil {
println(err.Error()) println(err.Error())
return break
} }
} }
} }
@ -271,22 +276,28 @@ func (r *RoomRepository) updatePCTracks(roomId string) {
_, alreadyReceived := receivingPeerTracks[id] _, alreadyReceived := receivingPeerTracks[id]
if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceived) { if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceived) {
renegotiate = true renegotiate = true
if peer.Conn.ConnectionState() == webrtc.PeerConnectionStateClosed {
break
}
println("[PC] add track", track.TrackLocal.ID(), "to", peer.ID) println("[PC] add track", track.TrackLocal.ID(), "to", peer.ID)
_, err := peer.Conn.AddTrack(track.TrackLocal) _, err := peer.Conn.AddTrack(track.TrackLocal)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
return break
} }
} }
} }
for trackId, rtpSender := range alreadySentTracks { for trackId, rtpSender := range alreadySentTracks {
if _, exists := room.Tracks[trackId]; !exists { if _, exists := room.Tracks[trackId]; !exists {
renegotiate = true renegotiate = true
if peer.Conn.ConnectionState() == webrtc.PeerConnectionStateClosed {
break
}
println("[PC] remove track", trackId, "from", peer.ID) println("[PC] remove track", trackId, "from", peer.ID)
err := peer.Conn.RemoveTrack(rtpSender) err := peer.Conn.RemoveTrack(rtpSender)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
return break
} }
} }
} }
@ -309,16 +320,18 @@ func (r *RoomRepository) AddPeerIceCandidate(roomId string, id uint64, ic webrtc
r.Unlock() r.Unlock()
return models.NewError("room doesn't exists", 403, map[string]any{"roomId": roomId}) return models.NewError("room doesn't exists", 403, map[string]any{"roomId": roomId})
} }
r.Unlock()
room := r.Rooms[roomId] room := r.Rooms[roomId]
r.Unlock()
room.Lock() room.Lock()
defer room.Unlock()
if !r.doesPeerExists(roomId, id) { if !r.doesPeerExists(roomId, id) {
room.Unlock()
return models.NewError("no such a peer with this id in this room", 403, map[string]any{"roomId": roomId, "peerId": id}) return models.NewError("no such a peer with this id in this room", 403, map[string]any{"roomId": roomId, "peerId": id})
} }
peer := room.Peers[id]
room.Unlock()
err := room.Peers[id].Conn.AddICECandidate(ic) err := peer.Conn.AddICECandidate(ic)
if err != nil { if err != nil {
return models.NewError(err.Error(), 500, models.MessageResponse{Message: err.Error()}) return models.NewError(err.Error(), 500, models.MessageResponse{Message: err.Error()})
} }
@ -431,11 +444,13 @@ func (r *RoomRepository) ResetRoom(roomId string) error {
} }
room := r.Rooms[roomId] room := r.Rooms[roomId]
room.Lock() room.Lock()
defer room.Unlock()
room.timer.Stop() room.timer.Stop()
for _, peer := range room.Peers { for _, peer := range room.Peers {
_ = peer.Conn.Close() go func(conn *webrtc.PeerConnection) {
_ = conn.Close()
}(peer.Conn)
} }
room.Unlock()
delete(r.Rooms, roomId) delete(r.Rooms, roomId)
return nil return nil
} }

View File

@ -13,7 +13,7 @@ func (r *Router) RegisterRoutes(rCtrl *controllers.RoomController) error {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
r.router = gin.Default() r.router = gin.Default()
registerRoomRoutes(r.router.Group("/room"), rCtrl) registerRoomRoutes(r.router.Group("/room"), rCtrl)
r.router.GET("/healthcheck", controllers.HealthCheck) r.router.GET("/healthcheck", rCtrl.HealthCheck)
return nil return nil
} }