From 1ff98068cde228705c73197a57250713d19239ed Mon Sep 17 00:00:00 2001 From: benyamin Date: Fri, 4 Aug 2023 13:16:27 +0330 Subject: [PATCH] fix: race condition in handshake --- controllers/room.go | 2 + repositories/room.go | 143 +++++++++++++++++++++++++------------------ 2 files changed, 86 insertions(+), 59 deletions(-) diff --git a/controllers/room.go b/controllers/room.go index 50fb18b..1fb5757 100644 --- a/controllers/room.go +++ b/controllers/room.go @@ -70,6 +70,7 @@ func (c *RoomController) Offer(ctx *gin.Context) { c.helper.ResponseUnprocessableEntity(ctx) return } + println("offer from", reqModel.ID) answer, err := c.repo.SetPeerOffer(reqModel.RoomId, reqModel.ID, reqModel.SDP) if c.helper.HandleIfErr(ctx, err, nil) { println(err.Error()) @@ -110,6 +111,7 @@ func (c *RoomController) Answer(ctx *gin.Context) { c.helper.ResponseUnprocessableEntity(ctx) return } + println("answer from", reqModel.ID) err := c.repo.SetPeerAnswer(reqModel.RoomId, reqModel.ID, reqModel.SDP) if c.helper.HandleIfErr(ctx, err, nil) { println(err.Error()) diff --git a/repositories/room.go b/repositories/room.go index 124fc44..573eed5 100644 --- a/repositories/room.go +++ b/repositories/room.go @@ -3,6 +3,7 @@ package repositories import ( "bytes" "encoding/json" + "errors" "fmt" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" @@ -20,9 +21,10 @@ type Track struct { } type Peer struct { - ID uint64 - Conn *webrtc.PeerConnection - CanPublish bool + ID uint64 + Conn *webrtc.PeerConnection + CanPublish bool + HandshakeLock *sync.Mutex } type Room struct { @@ -86,11 +88,16 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall continue } - go peer.Conn.WriteRTCP([]rtcp.Packet{ - &rtcp.PictureLossIndication{ - MediaSSRC: uint32(receiver.Track().SSRC()), - }, - }) + go func(recv *webrtc.RTPReceiver) { + err := peer.Conn.WriteRTCP([]rtcp.Packet{ + &rtcp.PictureLossIndication{ + MediaSSRC: uint32(recv.Track().SSRC()), + }, + }) + if err != nil { + println(`[E] [rtcp] `, err.Error()) + } + }(receiver) } } @@ -99,8 +106,8 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall }() } - r.Unlock() room := r.Rooms[roomId] + r.Unlock() room.Lock() defer room.Unlock() @@ -118,10 +125,15 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall peerConn.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { r.onPeerTrack(roomId, id, remote, receiver) }) - + /*peerConn.OnNegotiationNeeded(func() { + println("[PC] negotiating with peer", id) + r.offerPeer(peerConn,roomId,id) + })*/ room.Peers[id] = &Peer{ - ID: id, - Conn: peerConn, + ID: id, + Conn: peerConn, + HandshakeLock: &sync.Mutex{}, + CanPublish: canPublish, } go r.updatePCTracks(roomId) return nil @@ -197,12 +209,12 @@ func (r *RoomRepository) onPeerTrack(roomId string, id uint64, remote *webrtc.Tr } room.trackLock.Unlock() - defer func() { + defer func(trackId string) { room.trackLock.Lock() - delete(room.Tracks, remote.ID()) + delete(room.Tracks, trackId) room.trackLock.Unlock() r.updatePCTracks(roomId) - }() + }(remote.ID()) go r.updatePCTracks(roomId) buffer := make([]byte, 1500) for { @@ -252,10 +264,10 @@ func (r *RoomRepository) updatePCTracks(roomId string) { renegotiate := false for id, track := range room.Tracks { _, alreadySend := alreadySentTracks[id] - _, alreadyReceiver := receivingPeerTracks[id] - if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceiver) { + _, alreadyReceived := receivingPeerTracks[id] + if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceived) { renegotiate = true - println("add track") + println("[PC] add track", track.TrackLocal.ID(), "to", peer.ID) _, err := peer.Conn.AddTrack(track.TrackLocal) if err != nil { println(err.Error()) @@ -263,38 +275,20 @@ func (r *RoomRepository) updatePCTracks(roomId string) { } } } + for trackId, rtpSender := range alreadySentTracks { + if _, exists := room.Tracks[trackId]; !exists { + println("[PC] remove track", trackId, "from", peer.ID) + _ = rtpSender.Stop() + _ = peer.Conn.RemoveTrack(rtpSender) + } + } room.trackLock.Unlock() if renegotiate { - offer, err := peer.Conn.CreateOffer(nil) + err := r.offerPeer(peer, roomId) if err != nil { - println(err.Error()) + println(`[E]`, err.Error()) return } - err = peer.Conn.SetLocalDescription(offer) - if err != nil { - println(err.Error()) - return - } - reqModel := dto.SetSDPReqModel{ - PeerDTO: dto.PeerDTO{ - RoomId: roomId, - ID: peer.ID, - }, - SDP: offer, - } - bodyJson, err := json.Marshal(reqModel) - if err != nil { - println(err.Error()) - return - } - res, err := http.Post(r.conf.LogjamBaseUrl+"/offer", "application/json", bytes.NewReader(bodyJson)) - if err != nil { - println(err.Error()) - return - } - if res.StatusCode > 204 { - println("/offer ", res.Status) - } } } } @@ -340,6 +334,7 @@ func (r *RoomRepository) SetPeerAnswer(roomId string, id uint64, answer webrtc.S if err != nil { return models.NewError(err.Error(), 500, models.MessageResponse{Message: err.Error()}) } + room.Peers[id].HandshakeLock.Unlock() return nil } func (r *RoomRepository) SetPeerOffer(roomId string, id uint64, offer webrtc.SessionDescription) (sdpAnswer *webrtc.SessionDescription, err error) { @@ -348,24 +343,28 @@ func (r *RoomRepository) SetPeerOffer(roomId string, id uint64, offer webrtc.Ses r.Unlock() return nil, models.NewError("room doesn't exists", 403, map[string]any{"roomId": roomId}) } - r.Unlock() room := r.Rooms[roomId] + r.Unlock() room.Lock() - defer room.Unlock() if !r.doesPeerExists(roomId, id) { + room.Unlock() return nil, models.NewError("no such a peer with this id in this room", 403, map[string]any{"roomId": roomId, "peerId": id}) } + peer := room.Peers[id] + room.Unlock() - err = room.Peers[id].Conn.SetRemoteDescription(offer) + peer.HandshakeLock.Lock() + defer peer.HandshakeLock.Unlock() + err = peer.Conn.SetRemoteDescription(offer) if err != nil { return nil, models.NewError(err.Error(), 500, models.MessageResponse{Message: err.Error()}) } - answer, err := room.Peers[id].Conn.CreateAnswer(nil) + answer, err := peer.Conn.CreateAnswer(nil) if err != nil { return nil, models.NewError(err.Error(), 500, models.MessageResponse{Message: err.Error()}) } - err = room.Peers[id].Conn.SetLocalDescription(answer) + err = peer.Conn.SetLocalDescription(answer) if err != nil { return nil, models.NewError(err.Error(), 500, models.MessageResponse{Message: err.Error()}) } @@ -411,23 +410,49 @@ func (r *RoomRepository) ClosePeer(roomId string, id uint64) error { func (r *RoomRepository) ResetRoom(roomId string) error { r.Lock() + defer r.Unlock() if !r.doesRoomExists(roomId) { - r.Unlock() return nil } room := r.Rooms[roomId] - r.Unlock() - room.Lock() - + defer room.Unlock() room.timer.Stop() for _, peer := range room.Peers { - peer.Conn.Close() + _ = peer.Conn.Close() } - room.Unlock() - - r.Lock() delete(r.Rooms, roomId) - r.Unlock() + return nil +} + +func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error { + peer.HandshakeLock.Lock() + println("[PC] negotiating with peer", peer.ID) + offer, err := peer.Conn.CreateOffer(nil) + if err != nil { + return err + } + err = peer.Conn.SetLocalDescription(offer) + if err != nil { + return err + } + reqModel := dto.SetSDPReqModel{ + PeerDTO: dto.PeerDTO{ + RoomId: roomId, + ID: peer.ID, + }, + SDP: offer, + } + bodyJson, err := json.Marshal(reqModel) + if err != nil { + return err + } + res, err := http.Post(r.conf.LogjamBaseUrl+"/offer", "application/json", bytes.NewReader(bodyJson)) + if err != nil { + return err + } + if res.StatusCode > 204 { + return errors.New("POST {logjambaseurl}/offer : " + res.Status) + } return nil }