From 0a186d9bbdae3fe68c4f57ccb3dac141c6f83ad7 Mon Sep 17 00:00:00 2001 From: benyamin Date: Thu, 2 Nov 2023 15:12:30 +0330 Subject: [PATCH] sync with new logjam structure --- app.go | 44 +++++++++++++++++++++++++------------------- controllers/room.go | 43 +++++++++++++++++++++++++++++++++++-------- main.go | 8 ++------ models/config.go | 8 +++++--- models/dto/room.go | 9 ++++++--- repositories/room.go | 35 +++++++++++++++++++++++++++-------- 6 files changed, 100 insertions(+), 47 deletions(-) diff --git a/app.go b/app.go index 7438f94..25a7b00 100644 --- a/app.go +++ b/app.go @@ -23,7 +23,7 @@ type App struct { src string } -func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, targetRoom string, iceTCPMUXListenPort uint, customICEHostCandidateIP string) { +func (a *App) Init(srcListenAddr string, logjamBaseUrl string, iceTCPMUXListenPort uint, customICEHostCandidateIP string) { println("initializing ..") a.src = srcListenAddr var iceServers []webrtc.ICEServer @@ -36,11 +36,9 @@ func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, t panic("[E] can't parse ice.servers.json: " + err.Error()) } } - startRejoinCH := make(chan bool, 2) + startRejoinCH := make(chan models.RejoinMode, 2) a.conf = &models.ConfigModel{ - LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node", - TargetRoom: targetRoom, - ServiceAddress: svcAddr, + LogjamBaseUrl: logjamBaseUrl, ICEServers: iceServers, ICETCPMUXListenPort: iceTCPMUXListenPort, CustomICEHostCandidateIP: customICEHostCandidateIP, @@ -65,51 +63,59 @@ func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, t func (a *App) Run() { go func() { - *a.conf.StartRejoinCH <- true - for simplyJoin := range *a.conf.StartRejoinCH { - if simplyJoin { - buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom, "svcAddr": a.conf.ServiceAddress}) + //*a.conf.StartRejoinCH <- true + c := &http.Client{ + Timeout: 8 * time.Second, + } + for data := range *a.conf.StartRejoinCH { + if data.SimplyJoin { + buffer, _ := json.Marshal(map[string]any{"roomId": data.RoomId}) body := bytes.NewReader(buffer) - c := &http.Client{ - Timeout: 8 * time.Second, - } res, err := c.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body) if err != nil { println(err.Error()) time.Sleep(4 * time.Second) - *a.conf.StartRejoinCH <- true + *a.conf.StartRejoinCH <- data continue } if res.StatusCode > 204 { resbody, _ := io.ReadAll(res.Body) println("get /join "+res.Status, string(resbody)) time.Sleep(4 * time.Second) - *a.conf.StartRejoinCH <- true + *a.conf.StartRejoinCH <- data continue } } else { + data.SimplyJoin = true reqModel := struct { RoomId string `json:"roomId"` }{ - RoomId: a.conf.TargetRoom, + RoomId: data.RoomId, } serializedReqBody, err := json.Marshal(reqModel) if err != nil { println(err.Error()) - *a.conf.StartRejoinCH <- true + *a.conf.StartRejoinCH <- data continue } - resp, err := http.Post(a.conf.LogjamBaseUrl+"/rejoin", "application/json", bytes.NewReader(serializedReqBody)) + resp, err := c.Post(a.conf.LogjamBaseUrl+"/rejoin", "application/json", bytes.NewReader(serializedReqBody)) if err != nil { println(err.Error()) - *a.conf.StartRejoinCH <- true continue } if resp.StatusCode > 204 { println("/rejoin", resp.Status) - *a.conf.StartRejoinCH <- true } + /*if err != nil { + println(err.Error()) + *a.conf.StartRejoinCH <- data + continue + } + if resp.StatusCode > 204 { + println("/rejoin", resp.Status) + *a.conf.StartRejoinCH <- data + }*/ } } }() diff --git a/controllers/room.go b/controllers/room.go index 0dfc478..62266bc 100644 --- a/controllers/room.go +++ b/controllers/room.go @@ -36,7 +36,7 @@ func (c *RoomController) CreatePeer(ctx *gin.Context) { c.helper.ResponseUnprocessableEntity(ctx) return } - err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller) + err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller, reqModel.GGID) if c.helper.HandleIfErr(ctx, err, nil) { return } @@ -88,12 +88,14 @@ func (c *RoomController) Offer(ctx *gin.Context) { } c.helper.Response(ctx, struct{}{}, http.StatusNoContent) { + ggid := c.repo.GetRoomGGID(reqModel.RoomId) buffer, err := json.Marshal(dto.SetSDPReqModel{ PeerDTO: dto.PeerDTO{ RoomId: reqModel.RoomId, ID: reqModel.ID, }, - SDP: *answer, + GGID: *ggid, + SDP: *answer, }) if err != nil { println(err.Error()) @@ -165,25 +167,50 @@ func (c *RoomController) ResetRoom(ctx *gin.Context) { return } } - err := c.repo.ResetRoom(roomId) + ggid, err := c.repo.ResetRoom(roomId) if c.helper.HandleIfErr(ctx, err, nil) { return } - c.helper.Response(ctx, nil, http.StatusNoContent) + c.helper.Response(ctx, struct { + GGID uint64 `json:"ggid"` + }{ + GGID: ggid, + }, http.StatusOK) } func (c *RoomController) Start(ctx *gin.Context) { - buffer, _ := json.Marshal(map[string]any{"roomId": c.conf.TargetRoom, "svcAddr": c.conf.ServiceAddress}) + reqModel := struct { + RoomId string `json:"roomId"` + }{} + if err := ctx.ShouldBindJSON(&reqModel); err != nil { + c.helper.ResponseBadReq(ctx) + return + } + buffer, _ := json.Marshal(map[string]any{"roomId": reqModel.RoomId}) body := bytes.NewReader(buffer) res, err := http.Post(c.conf.LogjamBaseUrl+"/join", "application/json", body) if err != nil { println(err.Error()) time.Sleep(4 * time.Second) } - if res != nil && res.StatusCode > 204 { - resbody, _ := io.ReadAll(res.Body) - println("get /join "+res.Status, string(resbody)) + if res != nil { + if res.StatusCode > 204 { + resbody, _ := io.ReadAll(res.Body) + println("get /join "+res.Status, string(resbody)) + } else { + resbody, _ := io.ReadAll(res.Body) + println(string(resbody)) + respData := make(map[string]any) + if len(resbody) > 2 { + err := json.Unmarshal(resbody, &respData) + if err != nil { + println(err.Error()) + c.helper.Response(ctx, nil, http.StatusBadRequest) + return + } + } + } } c.helper.Response(ctx, nil, http.StatusNoContent) } diff --git a/main.go b/main.go index a5ff3cf..90f8ef9 100644 --- a/main.go +++ b/main.go @@ -6,10 +6,8 @@ import ( ) func main() { - svcAddr := flag.String("svc-addr", "http://localhost:8080", "service baseurl to register in logjam ( shouldn't end with / )") src := flag.String("src", ":8080", "listenHost:listenPort") logjamBaseUrl := flag.String("logjam-base-url", "http://localhost:8090", "logjam base url( shouldn't end with / )") - targetRoom := flag.String("targetRoom", "test", "target room") icetcpmuxListenPort := flag.Uint("ice-tcp-mux-listen-port", 4444, "listen port to use for tcp ice candidates") customICEHostCandidateIP := flag.String("custom-ice-host-candidate-ip", "", "set to override host ice candidates address") flag.Parse() @@ -17,10 +15,8 @@ func main() { if strings.HasSuffix(*logjamBaseUrl, "/") { panic("logjam-base-url shouldn't end with /") } - if strings.HasSuffix(*svcAddr, "/") { - panic("service address shouldn't end with /") - } app := App{} - app.Init(*src, *svcAddr, *logjamBaseUrl, *targetRoom, *icetcpmuxListenPort, *customICEHostCandidateIP) + *logjamBaseUrl += "/goldgorilla" + app.Init(*src, *logjamBaseUrl, *icetcpmuxListenPort, *customICEHostCandidateIP) app.Run() } diff --git a/models/config.go b/models/config.go index 8c5269c..6dcf3c0 100644 --- a/models/config.go +++ b/models/config.go @@ -2,12 +2,14 @@ package models import "github.com/pion/webrtc/v3" +type RejoinMode struct { + SimplyJoin bool + RoomId string +} type ConfigModel struct { - ServiceAddress string `json:"serviceAddress"` LogjamBaseUrl string `json:"logjamBaseUrl"` - TargetRoom string `json:"targetRoom"` ICETCPMUXListenPort uint `json:"ice_tcpmux_listenPort"` CustomICEHostCandidateIP string `json:"customICEHostCandidateIP"` ICEServers []webrtc.ICEServer `json:"iceServers"` - StartRejoinCH *chan bool + StartRejoinCH *chan RejoinMode } diff --git a/models/dto/room.go b/models/dto/room.go index 0f2f676..d2005d2 100644 --- a/models/dto/room.go +++ b/models/dto/room.go @@ -13,12 +13,14 @@ func (model *PeerDTO) Validate() bool { type CreatePeerReqModel struct { PeerDTO - CanPublish bool `json:"canPublish"` - IsCaller bool `json:"isCaller"` + GGID uint64 `json:"ggid"` + CanPublish bool `json:"canPublish"` + IsCaller bool `json:"isCaller"` } type AddPeerICECandidateReqModel struct { PeerDTO + GGID uint64 `json:"ggid"` ICECandidate webrtc.ICECandidateInit `json:"iceCandidate"` } @@ -28,7 +30,8 @@ func (model *AddPeerICECandidateReqModel) Validate() bool { type SetSDPReqModel struct { PeerDTO - SDP webrtc.SessionDescription `json:"sdp"` + GGID uint64 `json:"ggid"` + SDP webrtc.SessionDescription `json:"sdp"` } func (model *SetSDPReqModel) Validate() bool { diff --git a/repositories/room.go b/repositories/room.go index 7b1a42e..d869e36 100644 --- a/repositories/room.go +++ b/repositories/room.go @@ -38,6 +38,7 @@ type Room struct { trackLock *sync.Mutex Tracks map[string]*Track timer *time.Ticker + ggId uint64 } type RoomRepository struct { @@ -114,7 +115,7 @@ func (r *RoomRepository) doesPeerExists(roomId string, id uint64) bool { return false } -func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, isCaller bool) error { +func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, isCaller bool, ggid uint64) error { r.Lock() if !r.doesRoomExists(roomId) { @@ -124,6 +125,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i trackLock: &sync.Mutex{}, Tracks: make(map[string]*Track), timer: time.NewTicker(3 * time.Second), + ggId: ggid, } r.Rooms[roomId] = room go func() { @@ -164,7 +166,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i } peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) { - r.onPeerICECandidate(roomId, id, ic) + r.onPeerICECandidate(roomId, id, room.ggId, ic) }) peerConn.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { r.Lock() @@ -209,14 +211,17 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i } func (r *RoomRepository) onCallerDisconnected(roomId string) { - if err := r.ResetRoom(roomId); err != nil { + if _, err := r.ResetRoom(roomId); err != nil { println(err.Error()) return } - *r.conf.StartRejoinCH <- false + *r.conf.StartRejoinCH <- models.RejoinMode{ + SimplyJoin: false, + RoomId: roomId, + } } -func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc.ICECandidate) { +func (r *RoomRepository) onPeerICECandidate(roomId string, id, ggid uint64, ic *webrtc.ICECandidate) { if ic == nil { return } @@ -225,6 +230,7 @@ func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc RoomId: roomId, ID: id, }, + GGID: ggid, ICECandidate: ic.ToJSON(), } serializedReqBody, err := json.Marshal(reqModel) @@ -518,14 +524,15 @@ func (r *RoomRepository) ClosePeer(roomId string, id uint64) error { return peer.Conn.Close() } -func (r *RoomRepository) ResetRoom(roomId string) error { +func (r *RoomRepository) ResetRoom(roomId string) (uint64, error) { r.Lock() defer r.Unlock() if !r.doesRoomExists(roomId) { - return nil + return 0, nil } room := r.Rooms[roomId] room.Lock() + ggid := room.ggId room.timer.Stop() for _, peer := range room.Peers { go func(conn *webrtc.PeerConnection) { @@ -534,7 +541,7 @@ func (r *RoomRepository) ResetRoom(roomId string) error { } room.Unlock() delete(r.Rooms, roomId) - return nil + return ggid, nil } func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error { @@ -548,7 +555,9 @@ func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error { if err != nil { return err } + ggid := r.GetRoomGGID(roomId) reqModel := dto.SetSDPReqModel{ + GGID: *ggid, PeerDTO: dto.PeerDTO{ RoomId: roomId, ID: peer.ID, @@ -568,3 +577,13 @@ func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error { } return nil } + +func (r *RoomRepository) GetRoomGGID(roomId string) *uint64 { + r.Lock() + defer r.Unlock() + if !r.doesRoomExists(roomId) { + return nil + } + gid := r.Rooms[roomId].ggId + return &gid +}