diff --git a/app.go b/app.go index e0fac51..7be5d78 100644 --- a/app.go +++ b/app.go @@ -3,12 +3,16 @@ package main import ( "bytes" "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/signal" "sourcecode.social/greatape/goldgorilla/controllers" "sourcecode.social/greatape/goldgorilla/models" "sourcecode.social/greatape/goldgorilla/repositories" "sourcecode.social/greatape/goldgorilla/routers" - "io" - "net/http" + "syscall" "time" ) @@ -18,11 +22,13 @@ type App struct { src string } -func (a *App) Init(srcListenAddr string, logjamBaseUrl string, targetRoom string) { +func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, targetRoom string) { + println("initializing ..") a.src = srcListenAddr a.conf = &models.ConfigModel{ - LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node", - TargetRoom: targetRoom, + LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node", + TargetRoom: targetRoom, + ServiceAddress: svcAddr, } roomRepo := repositories.NewRoomRepository(a.conf) a.router = &routers.Router{} @@ -31,14 +37,25 @@ func (a *App) Init(srcListenAddr string, logjamBaseUrl string, targetRoom string err := a.router.RegisterRoutes(roomCtrl) panicIfErr(err) + + { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGILL, syscall.SIGSTOP) + go func() { + a.onDie(<-sigs) + }() + } } func (a *App) Run() { go func() { start: - buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom}) + buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom, "svcAddr": a.conf.ServiceAddress}) body := bytes.NewReader(buffer) - res, err := http.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body) + c := &http.Client{ + Timeout: 8 * time.Second, + } + res, err := c.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body) if err != nil { println(err.Error()) time.Sleep(4 * time.Second) @@ -55,6 +72,12 @@ func (a *App) Run() { panicIfErr(err) } +func (a *App) onDie(sig os.Signal) { + fmt.Println("<-", sig) + + os.Exit(0) +} + func panicIfErr(err error) { if err != nil { panic(err) diff --git a/controllers/room.go b/controllers/room.go index e5f7c25..3d5b018 100644 --- a/controllers/room.go +++ b/controllers/room.go @@ -4,10 +4,10 @@ import ( "bytes" "encoding/json" "github.com/gin-gonic/gin" + "net/http" "sourcecode.social/greatape/goldgorilla/models" "sourcecode.social/greatape/goldgorilla/models/dto" "sourcecode.social/greatape/goldgorilla/repositories" - "net/http" ) type RoomController struct { @@ -34,34 +34,11 @@ func (c *RoomController) CreatePeer(ctx *gin.Context) { c.helper.ResponseUnprocessableEntity(ctx) return } - offer, err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller) + err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller) if c.helper.HandleIfErr(ctx, err, nil) { return } c.helper.Response(ctx, struct{}{}, http.StatusNoContent) - - if offer != nil { - buffer, err := json.Marshal(dto.SetSDPReqModel{ - PeerDTO: dto.PeerDTO{ - RoomId: reqModel.RoomId, - ID: reqModel.ID, - }, - SDP: *offer, - }) - if err != nil { - println(err.Error()) - return - } - reader := bytes.NewReader(buffer) - resp, err := http.Post(c.conf.LogjamBaseUrl+"/offer", "application/json", reader) - if err != nil { - println(err.Error()) - return - } - if resp.StatusCode > 204 { - println(resp.Status) - } - } } func (c *RoomController) AddICECandidate(ctx *gin.Context) { @@ -133,6 +110,7 @@ func (c *RoomController) Answer(ctx *gin.Context) { } err := c.repo.SetPeerAnswer(reqModel.RoomId, reqModel.ID, reqModel.SDP) if c.helper.HandleIfErr(ctx, err, nil) { + println(err.Error()) return } c.helper.Response(ctx, struct{}{}, http.StatusNoContent) @@ -154,3 +132,23 @@ func (c *RoomController) ClosePeer(ctx *gin.Context) { } c.helper.Response(ctx, struct{}{}, http.StatusNoContent) } + +func (c *RoomController) ResetRoom(ctx *gin.Context) { + var reqModel map[string]any + badReqSt := 400 + if err := ctx.ShouldBindJSON(&reqModel); c.helper.HandleIfErr(ctx, err, &badReqSt) { + return + } + roomId := "" + if rid, exists := reqModel["roomId"]; !exists { + roomId = rid + c.helper.ResponseUnprocessableEntity(ctx) + } + + err := c.repo.ResetRoom(roomId) + if c.helper.HandleIfErr(ctx, err, nil) { + return + } + + c.helper.Response(ctx, nil, http.StatusNoContent) +} diff --git a/controllers/status.go b/controllers/status.go new file mode 100644 index 0000000..2d32936 --- /dev/null +++ b/controllers/status.go @@ -0,0 +1 @@ +package controllers diff --git a/main.go b/main.go index b45e14d..f40c3bc 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( ) func main() { + svcAddr := flag.String("svc-addr", "", "service to register in logjam") src := flag.String("src", ":8080", "listenhost:listenPort") logjamBaseUrl := flag.String("logjam-base-url", "https://example.com", "logjam base url(shouldn't end with /)") targetRoom := flag.String("targetRoom", "testyroom", "target room") @@ -15,7 +16,10 @@ func main() { if strings.HasSuffix(*logjamBaseUrl, "/") { panic("logjam-base-url shouldn't end with /") } + if strings.HasSuffix(*svcAddr, "/") { + panic("service address shouldn't end with /") + } app := App{} - app.Init(*src, *logjamBaseUrl, *targetRoom) + app.Init(*src, *svcAddr, *logjamBaseUrl, *targetRoom) app.Run() } diff --git a/models/config.go b/models/config.go index d687881..8e043cf 100644 --- a/models/config.go +++ b/models/config.go @@ -1,6 +1,7 @@ package models type ConfigModel struct { - LogjamBaseUrl string `json:"logjamBaseUrl"` - TargetRoom string `json:"targetRoom"` + ServiceAddress string `json:"serviceAddress"` + LogjamBaseUrl string `json:"logjamBaseUrl"` + TargetRoom string `json:"targetRoom"` } diff --git a/repositories/room.go b/repositories/room.go index 68db5b2..71a496b 100644 --- a/repositories/room.go +++ b/repositories/room.go @@ -6,10 +6,10 @@ import ( "fmt" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" - "sourcecode.social/greatape/goldgorilla/models" - "sourcecode.social/greatape/goldgorilla/models/dto" "log" "net/http" + "sourcecode.social/greatape/goldgorilla/models" + "sourcecode.social/greatape/goldgorilla/models/dto" "sync" "time" ) @@ -30,6 +30,7 @@ type Room struct { Peers map[uint64]*Peer trackLock *sync.Mutex Tracks map[string]*Track + timer *time.Timer } type RoomRepository struct { @@ -64,7 +65,7 @@ func (r *RoomRepository) doesPeerExists(roomId string, id uint64) bool { return false } -func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCaller bool) (*webrtc.SessionDescription, error) { +func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCaller bool) error { r.Lock() if !r.doesRoomExists(roomId) { @@ -73,10 +74,11 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall Peers: make(map[uint64]*Peer), trackLock: &sync.Mutex{}, Tracks: make(map[string]*Track), + timer: time.NewTicker(3 * time.Second), } r.Rooms[roomId] = room go func() { - for range time.NewTicker(3 * time.Second).C { + for range room.timer.C { room.Lock() for _, peer := range room.Peers { for _, receiver := range peer.Conn.GetReceivers() { @@ -104,14 +106,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall peerConn, err := webrtc.NewPeerConnection(webrtc.Configuration{}) if err != nil { - return nil, models.NewError("can't create peer connection", 500, models.MessageResponse{Message: err.Error()}) - } - for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} { - if _, err := peerConn.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{ - Direction: webrtc.RTPTransceiverDirectionSendrecv, - }); err != nil { - return nil, models.NewError("unhandled error, contact support #1313", 500, err) - } + return models.NewError("can't create peer connection", 500, models.MessageResponse{Message: err.Error()}) } peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) { @@ -124,29 +119,12 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall r.onPeerTrack(roomId, id, remote, receiver) }) - var sdp *webrtc.SessionDescription - if !isCaller { - offer, err := peerConn.CreateOffer(nil) - if err != nil { - println(err.Error()) - peerConn.Close() - return nil, models.NewError("unhandled error, contact support #1314", 500, err) - } - err = peerConn.SetLocalDescription(offer) - if err != nil { - println(err.Error()) - peerConn.Close() - return nil, models.NewError("unhandled error, contact support #1315", 500, err) - } - sdp = &offer - } - room.Peers[id] = &Peer{ ID: id, Conn: peerConn, } - - return sdp, nil + go r.updatePCTracks(roomId) + return nil } func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc.ICECandidate) { @@ -271,33 +249,53 @@ func (r *RoomRepository) updatePCTracks(roomId string) { receivingPeerTracks[track.ID()] = rtpReceiver } room.trackLock.Lock() + renegotiate := false for id, track := range room.Tracks { _, alreadySend := alreadySentTracks[id] _, alreadyReceiver := receivingPeerTracks[id] if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceiver) { - go func(peer *Peer, track *Track) { - println("add track") - _, err := peer.Conn.AddTrack(track.TrackLocal) - if err != nil { - println(err.Error()) - } - }(peer, track) + renegotiate = true + println("add track") + _, err := peer.Conn.AddTrack(track.TrackLocal) + if err != nil { + println(err.Error()) + return + } } } room.trackLock.Unlock() - - /*room.trackLock.Lock() - for trackId, _ := range alreadySentTracks { - if _, exists := room.Tracks[trackId]; !exists { - go func(peer *Peer, rtpSender *webrtc.RTPSender) { - err := peer.Conn.RemoveTrack(rtpSender) - if err != nil { - println(err.Error()) - } - }(peer, alreadySentTracks[trackId]) + if renegotiate { + offer, err := peer.Conn.CreateOffer(nil) + if err != nil { + println(err.Error()) + return + } + err = peer.Conn.SetLocalDescription(offer) + if err != nil { + println(err.Error()) + return + } + reqModel := dto.SetSDPReqModel{ + PeerDTO: dto.PeerDTO{ + RoomId: roomId, + ID: peer.ID, + }, + SDP: offer, + } + bodyJson, err := json.Marshal(reqModel) + if err != nil { + println(err.Error()) + return + } + res, err := http.Post(r.conf.LogjamBaseUrl+"/offer", "application/json", bytes.NewReader(bodyJson)) + if err != nil { + println(err.Error()) + return + } + if res.StatusCode > 204 { + println("/offer ", res.Status) } } - room.trackLock.Unlock()*/ } } @@ -410,3 +408,22 @@ func (r *RoomRepository) ClosePeer(roomId string, id uint64) error { } return room.Peers[id].Conn.Close() } + +func (r *RoomRepository) ResetRoom(roomId string) error { + r.Lock() + if !r.doesRoomExists(roomId) { + return + } + r.Unlock() + room := r.Rooms[roomId] + room.Lock() + defer room.Unlock() + room.timer.Stop() + for _, peer := range room.Peers { + peer.Conn.Close() + } + + r.Lock() + delete(r.Rooms, roomId) + r.Unlock() +} diff --git a/routers/room.go b/routers/room.go index de0224e..980e713 100644 --- a/routers/room.go +++ b/routers/room.go @@ -12,5 +12,6 @@ func registerRoomRoutes(rg *gin.RouterGroup, ctrl *controllers.RoomController) { rg.POST("/ice", ctrl.AddICECandidate) rg.POST("/answer", ctrl.Answer) rg.POST("/offer", ctrl.Offer) + rg.DELETE("/", ctrl.ResetRoom) }