Merge pull request 'sync with new logjam structure' (#1) from newlogjamsyncronization into master

Reviewed-on: #1
master
Benyamin Azarkhazin 2023-11-02 15:14:58 +00:00
commit 4d89da72ad
6 changed files with 100 additions and 47 deletions

40
app.go
View File

@ -23,7 +23,7 @@ type App struct {
src string
}
func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, targetRoom string, iceTCPMUXListenPort uint, customICEHostCandidateIP string) {
func (a *App) Init(srcListenAddr string, logjamBaseUrl string, iceTCPMUXListenPort uint, customICEHostCandidateIP string) {
println("initializing ..")
a.src = srcListenAddr
var iceServers []webrtc.ICEServer
@ -36,11 +36,9 @@ func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, t
panic("[E] can't parse ice.servers.json: " + err.Error())
}
}
startRejoinCH := make(chan bool, 2)
startRejoinCH := make(chan models.RejoinMode, 2)
a.conf = &models.ConfigModel{
LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node",
TargetRoom: targetRoom,
ServiceAddress: svcAddr,
LogjamBaseUrl: logjamBaseUrl,
ICEServers: iceServers,
ICETCPMUXListenPort: iceTCPMUXListenPort,
CustomICEHostCandidateIP: customICEHostCandidateIP,
@ -65,51 +63,59 @@ func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, t
func (a *App) Run() {
go func() {
*a.conf.StartRejoinCH <- true
for simplyJoin := range *a.conf.StartRejoinCH {
if simplyJoin {
buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom, "svcAddr": a.conf.ServiceAddress})
body := bytes.NewReader(buffer)
//*a.conf.StartRejoinCH <- true
c := &http.Client{
Timeout: 8 * time.Second,
}
for data := range *a.conf.StartRejoinCH {
if data.SimplyJoin {
buffer, _ := json.Marshal(map[string]any{"roomId": data.RoomId})
body := bytes.NewReader(buffer)
res, err := c.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body)
if err != nil {
println(err.Error())
time.Sleep(4 * time.Second)
*a.conf.StartRejoinCH <- true
*a.conf.StartRejoinCH <- data
continue
}
if res.StatusCode > 204 {
resbody, _ := io.ReadAll(res.Body)
println("get /join "+res.Status, string(resbody))
time.Sleep(4 * time.Second)
*a.conf.StartRejoinCH <- true
*a.conf.StartRejoinCH <- data
continue
}
} else {
data.SimplyJoin = true
reqModel := struct {
RoomId string `json:"roomId"`
}{
RoomId: a.conf.TargetRoom,
RoomId: data.RoomId,
}
serializedReqBody, err := json.Marshal(reqModel)
if err != nil {
println(err.Error())
*a.conf.StartRejoinCH <- true
*a.conf.StartRejoinCH <- data
continue
}
resp, err := http.Post(a.conf.LogjamBaseUrl+"/rejoin", "application/json", bytes.NewReader(serializedReqBody))
resp, err := c.Post(a.conf.LogjamBaseUrl+"/rejoin", "application/json", bytes.NewReader(serializedReqBody))
if err != nil {
println(err.Error())
*a.conf.StartRejoinCH <- true
continue
}
if resp.StatusCode > 204 {
println("/rejoin", resp.Status)
*a.conf.StartRejoinCH <- true
}
/*if err != nil {
println(err.Error())
*a.conf.StartRejoinCH <- data
continue
}
if resp.StatusCode > 204 {
println("/rejoin", resp.Status)
*a.conf.StartRejoinCH <- data
}*/
}
}
}()

View File

@ -36,7 +36,7 @@ func (c *RoomController) CreatePeer(ctx *gin.Context) {
c.helper.ResponseUnprocessableEntity(ctx)
return
}
err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller)
err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller, reqModel.GGID)
if c.helper.HandleIfErr(ctx, err, nil) {
return
}
@ -88,11 +88,13 @@ func (c *RoomController) Offer(ctx *gin.Context) {
}
c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
{
ggid := c.repo.GetRoomGGID(reqModel.RoomId)
buffer, err := json.Marshal(dto.SetSDPReqModel{
PeerDTO: dto.PeerDTO{
RoomId: reqModel.RoomId,
ID: reqModel.ID,
},
GGID: *ggid,
SDP: *answer,
})
if err != nil {
@ -165,25 +167,50 @@ func (c *RoomController) ResetRoom(ctx *gin.Context) {
return
}
}
err := c.repo.ResetRoom(roomId)
ggid, err := c.repo.ResetRoom(roomId)
if c.helper.HandleIfErr(ctx, err, nil) {
return
}
c.helper.Response(ctx, nil, http.StatusNoContent)
c.helper.Response(ctx, struct {
GGID uint64 `json:"ggid"`
}{
GGID: ggid,
}, http.StatusOK)
}
func (c *RoomController) Start(ctx *gin.Context) {
buffer, _ := json.Marshal(map[string]any{"roomId": c.conf.TargetRoom, "svcAddr": c.conf.ServiceAddress})
reqModel := struct {
RoomId string `json:"roomId"`
}{}
if err := ctx.ShouldBindJSON(&reqModel); err != nil {
c.helper.ResponseBadReq(ctx)
return
}
buffer, _ := json.Marshal(map[string]any{"roomId": reqModel.RoomId})
body := bytes.NewReader(buffer)
res, err := http.Post(c.conf.LogjamBaseUrl+"/join", "application/json", body)
if err != nil {
println(err.Error())
time.Sleep(4 * time.Second)
}
if res != nil && res.StatusCode > 204 {
if res != nil {
if res.StatusCode > 204 {
resbody, _ := io.ReadAll(res.Body)
println("get /join "+res.Status, string(resbody))
} else {
resbody, _ := io.ReadAll(res.Body)
println(string(resbody))
respData := make(map[string]any)
if len(resbody) > 2 {
err := json.Unmarshal(resbody, &respData)
if err != nil {
println(err.Error())
c.helper.Response(ctx, nil, http.StatusBadRequest)
return
}
}
}
}
c.helper.Response(ctx, nil, http.StatusNoContent)
}

View File

@ -6,10 +6,8 @@ import (
)
func main() {
svcAddr := flag.String("svc-addr", "http://localhost:8080", "service baseurl to register in logjam ( shouldn't end with / )")
src := flag.String("src", ":8080", "listenHost:listenPort")
logjamBaseUrl := flag.String("logjam-base-url", "http://localhost:8090", "logjam base url( shouldn't end with / )")
targetRoom := flag.String("targetRoom", "test", "target room")
icetcpmuxListenPort := flag.Uint("ice-tcp-mux-listen-port", 4444, "listen port to use for tcp ice candidates")
customICEHostCandidateIP := flag.String("custom-ice-host-candidate-ip", "", "set to override host ice candidates address")
flag.Parse()
@ -17,10 +15,8 @@ func main() {
if strings.HasSuffix(*logjamBaseUrl, "/") {
panic("logjam-base-url shouldn't end with /")
}
if strings.HasSuffix(*svcAddr, "/") {
panic("service address shouldn't end with /")
}
app := App{}
app.Init(*src, *svcAddr, *logjamBaseUrl, *targetRoom, *icetcpmuxListenPort, *customICEHostCandidateIP)
*logjamBaseUrl += "/goldgorilla"
app.Init(*src, *logjamBaseUrl, *icetcpmuxListenPort, *customICEHostCandidateIP)
app.Run()
}

View File

@ -2,12 +2,14 @@ package models
import "github.com/pion/webrtc/v3"
type RejoinMode struct {
SimplyJoin bool
RoomId string
}
type ConfigModel struct {
ServiceAddress string `json:"serviceAddress"`
LogjamBaseUrl string `json:"logjamBaseUrl"`
TargetRoom string `json:"targetRoom"`
ICETCPMUXListenPort uint `json:"ice_tcpmux_listenPort"`
CustomICEHostCandidateIP string `json:"customICEHostCandidateIP"`
ICEServers []webrtc.ICEServer `json:"iceServers"`
StartRejoinCH *chan bool
StartRejoinCH *chan RejoinMode
}

View File

@ -13,12 +13,14 @@ func (model *PeerDTO) Validate() bool {
type CreatePeerReqModel struct {
PeerDTO
GGID uint64 `json:"ggid"`
CanPublish bool `json:"canPublish"`
IsCaller bool `json:"isCaller"`
}
type AddPeerICECandidateReqModel struct {
PeerDTO
GGID uint64 `json:"ggid"`
ICECandidate webrtc.ICECandidateInit `json:"iceCandidate"`
}
@ -28,6 +30,7 @@ func (model *AddPeerICECandidateReqModel) Validate() bool {
type SetSDPReqModel struct {
PeerDTO
GGID uint64 `json:"ggid"`
SDP webrtc.SessionDescription `json:"sdp"`
}

View File

@ -38,6 +38,7 @@ type Room struct {
trackLock *sync.Mutex
Tracks map[string]*Track
timer *time.Ticker
ggId uint64
}
type RoomRepository struct {
@ -114,7 +115,7 @@ func (r *RoomRepository) doesPeerExists(roomId string, id uint64) bool {
return false
}
func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, isCaller bool) error {
func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, isCaller bool, ggid uint64) error {
r.Lock()
if !r.doesRoomExists(roomId) {
@ -124,6 +125,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i
trackLock: &sync.Mutex{},
Tracks: make(map[string]*Track),
timer: time.NewTicker(3 * time.Second),
ggId: ggid,
}
r.Rooms[roomId] = room
go func() {
@ -164,7 +166,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i
}
peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) {
r.onPeerICECandidate(roomId, id, ic)
r.onPeerICECandidate(roomId, id, room.ggId, ic)
})
peerConn.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
r.Lock()
@ -209,14 +211,17 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i
}
func (r *RoomRepository) onCallerDisconnected(roomId string) {
if err := r.ResetRoom(roomId); err != nil {
if _, err := r.ResetRoom(roomId); err != nil {
println(err.Error())
return
}
*r.conf.StartRejoinCH <- false
*r.conf.StartRejoinCH <- models.RejoinMode{
SimplyJoin: false,
RoomId: roomId,
}
}
func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc.ICECandidate) {
func (r *RoomRepository) onPeerICECandidate(roomId string, id, ggid uint64, ic *webrtc.ICECandidate) {
if ic == nil {
return
}
@ -225,6 +230,7 @@ func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc
RoomId: roomId,
ID: id,
},
GGID: ggid,
ICECandidate: ic.ToJSON(),
}
serializedReqBody, err := json.Marshal(reqModel)
@ -518,14 +524,15 @@ func (r *RoomRepository) ClosePeer(roomId string, id uint64) error {
return peer.Conn.Close()
}
func (r *RoomRepository) ResetRoom(roomId string) error {
func (r *RoomRepository) ResetRoom(roomId string) (uint64, error) {
r.Lock()
defer r.Unlock()
if !r.doesRoomExists(roomId) {
return nil
return 0, nil
}
room := r.Rooms[roomId]
room.Lock()
ggid := room.ggId
room.timer.Stop()
for _, peer := range room.Peers {
go func(conn *webrtc.PeerConnection) {
@ -534,7 +541,7 @@ func (r *RoomRepository) ResetRoom(roomId string) error {
}
room.Unlock()
delete(r.Rooms, roomId)
return nil
return ggid, nil
}
func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error {
@ -548,7 +555,9 @@ func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error {
if err != nil {
return err
}
ggid := r.GetRoomGGID(roomId)
reqModel := dto.SetSDPReqModel{
GGID: *ggid,
PeerDTO: dto.PeerDTO{
RoomId: roomId,
ID: peer.ID,
@ -568,3 +577,13 @@ func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error {
}
return nil
}
func (r *RoomRepository) GetRoomGGID(roomId string) *uint64 {
r.Lock()
defer r.Unlock()
if !r.doesRoomExists(roomId) {
return nil
}
gid := r.Rooms[roomId].ggId
return &gid
}