feat: added ResetRoom Handler and logic

pull/1/head
Benyamin Azarkhazin 2023-07-31 15:57:40 +03:30
parent 6e792ff588
commit 5fadfbfb20
Signed by: benyamin
GPG Key ID: 3AE44F5623C70269
7 changed files with 129 additions and 84 deletions

33
app.go
View File

@ -3,12 +3,16 @@ package main
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sourcecode.social/greatape/goldgorilla/controllers" "sourcecode.social/greatape/goldgorilla/controllers"
"sourcecode.social/greatape/goldgorilla/models" "sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/repositories" "sourcecode.social/greatape/goldgorilla/repositories"
"sourcecode.social/greatape/goldgorilla/routers" "sourcecode.social/greatape/goldgorilla/routers"
"io" "syscall"
"net/http"
"time" "time"
) )
@ -18,11 +22,13 @@ type App struct {
src string src string
} }
func (a *App) Init(srcListenAddr string, logjamBaseUrl string, targetRoom string) { func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, targetRoom string) {
println("initializing ..")
a.src = srcListenAddr a.src = srcListenAddr
a.conf = &models.ConfigModel{ a.conf = &models.ConfigModel{
LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node", LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node",
TargetRoom: targetRoom, TargetRoom: targetRoom,
ServiceAddress: svcAddr,
} }
roomRepo := repositories.NewRoomRepository(a.conf) roomRepo := repositories.NewRoomRepository(a.conf)
a.router = &routers.Router{} a.router = &routers.Router{}
@ -31,14 +37,25 @@ func (a *App) Init(srcListenAddr string, logjamBaseUrl string, targetRoom string
err := a.router.RegisterRoutes(roomCtrl) err := a.router.RegisterRoutes(roomCtrl)
panicIfErr(err) panicIfErr(err)
{
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGILL, syscall.SIGSTOP)
go func() {
a.onDie(<-sigs)
}()
}
} }
func (a *App) Run() { func (a *App) Run() {
go func() { go func() {
start: start:
buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom}) buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom, "svcAddr": a.conf.ServiceAddress})
body := bytes.NewReader(buffer) body := bytes.NewReader(buffer)
res, err := http.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body) c := &http.Client{
Timeout: 8 * time.Second,
}
res, err := c.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
@ -55,6 +72,12 @@ func (a *App) Run() {
panicIfErr(err) panicIfErr(err)
} }
func (a *App) onDie(sig os.Signal) {
fmt.Println("<-", sig)
os.Exit(0)
}
func panicIfErr(err error) { func panicIfErr(err error) {
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -4,10 +4,10 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http"
"sourcecode.social/greatape/goldgorilla/models" "sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/models/dto" "sourcecode.social/greatape/goldgorilla/models/dto"
"sourcecode.social/greatape/goldgorilla/repositories" "sourcecode.social/greatape/goldgorilla/repositories"
"net/http"
) )
type RoomController struct { type RoomController struct {
@ -34,34 +34,11 @@ func (c *RoomController) CreatePeer(ctx *gin.Context) {
c.helper.ResponseUnprocessableEntity(ctx) c.helper.ResponseUnprocessableEntity(ctx)
return return
} }
offer, err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller) err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller)
if c.helper.HandleIfErr(ctx, err, nil) { if c.helper.HandleIfErr(ctx, err, nil) {
return return
} }
c.helper.Response(ctx, struct{}{}, http.StatusNoContent) c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
if offer != nil {
buffer, err := json.Marshal(dto.SetSDPReqModel{
PeerDTO: dto.PeerDTO{
RoomId: reqModel.RoomId,
ID: reqModel.ID,
},
SDP: *offer,
})
if err != nil {
println(err.Error())
return
}
reader := bytes.NewReader(buffer)
resp, err := http.Post(c.conf.LogjamBaseUrl+"/offer", "application/json", reader)
if err != nil {
println(err.Error())
return
}
if resp.StatusCode > 204 {
println(resp.Status)
}
}
} }
func (c *RoomController) AddICECandidate(ctx *gin.Context) { func (c *RoomController) AddICECandidate(ctx *gin.Context) {
@ -133,6 +110,7 @@ func (c *RoomController) Answer(ctx *gin.Context) {
} }
err := c.repo.SetPeerAnswer(reqModel.RoomId, reqModel.ID, reqModel.SDP) err := c.repo.SetPeerAnswer(reqModel.RoomId, reqModel.ID, reqModel.SDP)
if c.helper.HandleIfErr(ctx, err, nil) { if c.helper.HandleIfErr(ctx, err, nil) {
println(err.Error())
return return
} }
c.helper.Response(ctx, struct{}{}, http.StatusNoContent) c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
@ -154,3 +132,23 @@ func (c *RoomController) ClosePeer(ctx *gin.Context) {
} }
c.helper.Response(ctx, struct{}{}, http.StatusNoContent) c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
} }
func (c *RoomController) ResetRoom(ctx *gin.Context) {
var reqModel map[string]any
badReqSt := 400
if err := ctx.ShouldBindJSON(&reqModel); c.helper.HandleIfErr(ctx, err, &badReqSt) {
return
}
roomId := ""
if rid, exists := reqModel["roomId"]; !exists {
roomId = rid
c.helper.ResponseUnprocessableEntity(ctx)
}
err := c.repo.ResetRoom(roomId)
if c.helper.HandleIfErr(ctx, err, nil) {
return
}
c.helper.Response(ctx, nil, http.StatusNoContent)
}

View File

@ -0,0 +1 @@
package controllers

View File

@ -6,6 +6,7 @@ import (
) )
func main() { func main() {
svcAddr := flag.String("svc-addr", "", "service to register in logjam")
src := flag.String("src", ":8080", "listenhost:listenPort") src := flag.String("src", ":8080", "listenhost:listenPort")
logjamBaseUrl := flag.String("logjam-base-url", "https://example.com", "logjam base url(shouldn't end with /)") logjamBaseUrl := flag.String("logjam-base-url", "https://example.com", "logjam base url(shouldn't end with /)")
targetRoom := flag.String("targetRoom", "testyroom", "target room") targetRoom := flag.String("targetRoom", "testyroom", "target room")
@ -15,7 +16,10 @@ func main() {
if strings.HasSuffix(*logjamBaseUrl, "/") { if strings.HasSuffix(*logjamBaseUrl, "/") {
panic("logjam-base-url shouldn't end with /") panic("logjam-base-url shouldn't end with /")
} }
if strings.HasSuffix(*svcAddr, "/") {
panic("service address shouldn't end with /")
}
app := App{} app := App{}
app.Init(*src, *logjamBaseUrl, *targetRoom) app.Init(*src, *svcAddr, *logjamBaseUrl, *targetRoom)
app.Run() app.Run()
} }

View File

@ -1,6 +1,7 @@
package models package models
type ConfigModel struct { type ConfigModel struct {
ServiceAddress string `json:"serviceAddress"`
LogjamBaseUrl string `json:"logjamBaseUrl"` LogjamBaseUrl string `json:"logjamBaseUrl"`
TargetRoom string `json:"targetRoom"` TargetRoom string `json:"targetRoom"`
} }

View File

@ -6,10 +6,10 @@ import (
"fmt" "fmt"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/models/dto"
"log" "log"
"net/http" "net/http"
"sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/models/dto"
"sync" "sync"
"time" "time"
) )
@ -30,6 +30,7 @@ type Room struct {
Peers map[uint64]*Peer Peers map[uint64]*Peer
trackLock *sync.Mutex trackLock *sync.Mutex
Tracks map[string]*Track Tracks map[string]*Track
timer *time.Timer
} }
type RoomRepository struct { type RoomRepository struct {
@ -64,7 +65,7 @@ func (r *RoomRepository) doesPeerExists(roomId string, id uint64) bool {
return false return false
} }
func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCaller bool) (*webrtc.SessionDescription, error) { func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCaller bool) error {
r.Lock() r.Lock()
if !r.doesRoomExists(roomId) { if !r.doesRoomExists(roomId) {
@ -73,10 +74,11 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall
Peers: make(map[uint64]*Peer), Peers: make(map[uint64]*Peer),
trackLock: &sync.Mutex{}, trackLock: &sync.Mutex{},
Tracks: make(map[string]*Track), Tracks: make(map[string]*Track),
timer: time.NewTicker(3 * time.Second),
} }
r.Rooms[roomId] = room r.Rooms[roomId] = room
go func() { go func() {
for range time.NewTicker(3 * time.Second).C { for range room.timer.C {
room.Lock() room.Lock()
for _, peer := range room.Peers { for _, peer := range room.Peers {
for _, receiver := range peer.Conn.GetReceivers() { for _, receiver := range peer.Conn.GetReceivers() {
@ -104,14 +106,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall
peerConn, err := webrtc.NewPeerConnection(webrtc.Configuration{}) peerConn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil { if err != nil {
return nil, models.NewError("can't create peer connection", 500, models.MessageResponse{Message: err.Error()}) return models.NewError("can't create peer connection", 500, models.MessageResponse{Message: err.Error()})
}
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if _, err := peerConn.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendrecv,
}); err != nil {
return nil, models.NewError("unhandled error, contact support #1313", 500, err)
}
} }
peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) { peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) {
@ -124,29 +119,12 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall
r.onPeerTrack(roomId, id, remote, receiver) r.onPeerTrack(roomId, id, remote, receiver)
}) })
var sdp *webrtc.SessionDescription
if !isCaller {
offer, err := peerConn.CreateOffer(nil)
if err != nil {
println(err.Error())
peerConn.Close()
return nil, models.NewError("unhandled error, contact support #1314", 500, err)
}
err = peerConn.SetLocalDescription(offer)
if err != nil {
println(err.Error())
peerConn.Close()
return nil, models.NewError("unhandled error, contact support #1315", 500, err)
}
sdp = &offer
}
room.Peers[id] = &Peer{ room.Peers[id] = &Peer{
ID: id, ID: id,
Conn: peerConn, Conn: peerConn,
} }
go r.updatePCTracks(roomId)
return sdp, nil return nil
} }
func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc.ICECandidate) { func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc.ICECandidate) {
@ -271,33 +249,53 @@ func (r *RoomRepository) updatePCTracks(roomId string) {
receivingPeerTracks[track.ID()] = rtpReceiver receivingPeerTracks[track.ID()] = rtpReceiver
} }
room.trackLock.Lock() room.trackLock.Lock()
renegotiate := false
for id, track := range room.Tracks { for id, track := range room.Tracks {
_, alreadySend := alreadySentTracks[id] _, alreadySend := alreadySentTracks[id]
_, alreadyReceiver := receivingPeerTracks[id] _, alreadyReceiver := receivingPeerTracks[id]
if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceiver) { if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceiver) {
go func(peer *Peer, track *Track) { renegotiate = true
println("add track") println("add track")
_, err := peer.Conn.AddTrack(track.TrackLocal) _, err := peer.Conn.AddTrack(track.TrackLocal)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
return
} }
}(peer, track)
} }
} }
room.trackLock.Unlock() room.trackLock.Unlock()
if renegotiate {
/*room.trackLock.Lock() offer, err := peer.Conn.CreateOffer(nil)
for trackId, _ := range alreadySentTracks {
if _, exists := room.Tracks[trackId]; !exists {
go func(peer *Peer, rtpSender *webrtc.RTPSender) {
err := peer.Conn.RemoveTrack(rtpSender)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
return
} }
}(peer, alreadySentTracks[trackId]) err = peer.Conn.SetLocalDescription(offer)
if err != nil {
println(err.Error())
return
}
reqModel := dto.SetSDPReqModel{
PeerDTO: dto.PeerDTO{
RoomId: roomId,
ID: peer.ID,
},
SDP: offer,
}
bodyJson, err := json.Marshal(reqModel)
if err != nil {
println(err.Error())
return
}
res, err := http.Post(r.conf.LogjamBaseUrl+"/offer", "application/json", bytes.NewReader(bodyJson))
if err != nil {
println(err.Error())
return
}
if res.StatusCode > 204 {
println("/offer ", res.Status)
} }
} }
room.trackLock.Unlock()*/
} }
} }
@ -410,3 +408,22 @@ func (r *RoomRepository) ClosePeer(roomId string, id uint64) error {
} }
return room.Peers[id].Conn.Close() return room.Peers[id].Conn.Close()
} }
func (r *RoomRepository) ResetRoom(roomId string) error {
r.Lock()
if !r.doesRoomExists(roomId) {
return
}
r.Unlock()
room := r.Rooms[roomId]
room.Lock()
defer room.Unlock()
room.timer.Stop()
for _, peer := range room.Peers {
peer.Conn.Close()
}
r.Lock()
delete(r.Rooms, roomId)
r.Unlock()
}

View File

@ -12,5 +12,6 @@ func registerRoomRoutes(rg *gin.RouterGroup, ctrl *controllers.RoomController) {
rg.POST("/ice", ctrl.AddICECandidate) rg.POST("/ice", ctrl.AddICECandidate)
rg.POST("/answer", ctrl.Answer) rg.POST("/answer", ctrl.Answer)
rg.POST("/offer", ctrl.Offer) rg.POST("/offer", ctrl.Offer)
rg.DELETE("/", ctrl.ResetRoom)
} }