Compare commits
2 Commits
2261bf55f4
...
4d89da72ad
Author | SHA1 | Date |
---|---|---|
Benyamin Azarkhazin | 4d89da72ad | |
Benyamin Azarkhazin | 0a186d9bbd |
40
app.go
40
app.go
|
@ -23,7 +23,7 @@ type App struct {
|
||||||
src string
|
src string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, targetRoom string, iceTCPMUXListenPort uint, customICEHostCandidateIP string) {
|
func (a *App) Init(srcListenAddr string, logjamBaseUrl string, iceTCPMUXListenPort uint, customICEHostCandidateIP string) {
|
||||||
println("initializing ..")
|
println("initializing ..")
|
||||||
a.src = srcListenAddr
|
a.src = srcListenAddr
|
||||||
var iceServers []webrtc.ICEServer
|
var iceServers []webrtc.ICEServer
|
||||||
|
@ -36,11 +36,9 @@ func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, t
|
||||||
panic("[E] can't parse ice.servers.json: " + err.Error())
|
panic("[E] can't parse ice.servers.json: " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
startRejoinCH := make(chan bool, 2)
|
startRejoinCH := make(chan models.RejoinMode, 2)
|
||||||
a.conf = &models.ConfigModel{
|
a.conf = &models.ConfigModel{
|
||||||
LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node",
|
LogjamBaseUrl: logjamBaseUrl,
|
||||||
TargetRoom: targetRoom,
|
|
||||||
ServiceAddress: svcAddr,
|
|
||||||
ICEServers: iceServers,
|
ICEServers: iceServers,
|
||||||
ICETCPMUXListenPort: iceTCPMUXListenPort,
|
ICETCPMUXListenPort: iceTCPMUXListenPort,
|
||||||
CustomICEHostCandidateIP: customICEHostCandidateIP,
|
CustomICEHostCandidateIP: customICEHostCandidateIP,
|
||||||
|
@ -65,51 +63,59 @@ func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, t
|
||||||
|
|
||||||
func (a *App) Run() {
|
func (a *App) Run() {
|
||||||
go func() {
|
go func() {
|
||||||
*a.conf.StartRejoinCH <- true
|
//*a.conf.StartRejoinCH <- true
|
||||||
for simplyJoin := range *a.conf.StartRejoinCH {
|
|
||||||
if simplyJoin {
|
|
||||||
buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom, "svcAddr": a.conf.ServiceAddress})
|
|
||||||
body := bytes.NewReader(buffer)
|
|
||||||
c := &http.Client{
|
c := &http.Client{
|
||||||
Timeout: 8 * time.Second,
|
Timeout: 8 * time.Second,
|
||||||
}
|
}
|
||||||
|
for data := range *a.conf.StartRejoinCH {
|
||||||
|
if data.SimplyJoin {
|
||||||
|
buffer, _ := json.Marshal(map[string]any{"roomId": data.RoomId})
|
||||||
|
body := bytes.NewReader(buffer)
|
||||||
res, err := c.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body)
|
res, err := c.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println(err.Error())
|
println(err.Error())
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
*a.conf.StartRejoinCH <- true
|
*a.conf.StartRejoinCH <- data
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if res.StatusCode > 204 {
|
if res.StatusCode > 204 {
|
||||||
resbody, _ := io.ReadAll(res.Body)
|
resbody, _ := io.ReadAll(res.Body)
|
||||||
println("get /join "+res.Status, string(resbody))
|
println("get /join "+res.Status, string(resbody))
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
*a.conf.StartRejoinCH <- true
|
*a.conf.StartRejoinCH <- data
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
data.SimplyJoin = true
|
||||||
reqModel := struct {
|
reqModel := struct {
|
||||||
RoomId string `json:"roomId"`
|
RoomId string `json:"roomId"`
|
||||||
}{
|
}{
|
||||||
RoomId: a.conf.TargetRoom,
|
RoomId: data.RoomId,
|
||||||
}
|
}
|
||||||
serializedReqBody, err := json.Marshal(reqModel)
|
serializedReqBody, err := json.Marshal(reqModel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println(err.Error())
|
println(err.Error())
|
||||||
*a.conf.StartRejoinCH <- true
|
*a.conf.StartRejoinCH <- data
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := http.Post(a.conf.LogjamBaseUrl+"/rejoin", "application/json", bytes.NewReader(serializedReqBody))
|
resp, err := c.Post(a.conf.LogjamBaseUrl+"/rejoin", "application/json", bytes.NewReader(serializedReqBody))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println(err.Error())
|
println(err.Error())
|
||||||
*a.conf.StartRejoinCH <- true
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if resp.StatusCode > 204 {
|
if resp.StatusCode > 204 {
|
||||||
println("/rejoin", resp.Status)
|
println("/rejoin", resp.Status)
|
||||||
*a.conf.StartRejoinCH <- true
|
|
||||||
}
|
}
|
||||||
|
/*if err != nil {
|
||||||
|
println(err.Error())
|
||||||
|
*a.conf.StartRejoinCH <- data
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if resp.StatusCode > 204 {
|
||||||
|
println("/rejoin", resp.Status)
|
||||||
|
*a.conf.StartRejoinCH <- data
|
||||||
|
}*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (c *RoomController) CreatePeer(ctx *gin.Context) {
|
||||||
c.helper.ResponseUnprocessableEntity(ctx)
|
c.helper.ResponseUnprocessableEntity(ctx)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller)
|
err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller, reqModel.GGID)
|
||||||
if c.helper.HandleIfErr(ctx, err, nil) {
|
if c.helper.HandleIfErr(ctx, err, nil) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -88,11 +88,13 @@ func (c *RoomController) Offer(ctx *gin.Context) {
|
||||||
}
|
}
|
||||||
c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
|
c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
|
||||||
{
|
{
|
||||||
|
ggid := c.repo.GetRoomGGID(reqModel.RoomId)
|
||||||
buffer, err := json.Marshal(dto.SetSDPReqModel{
|
buffer, err := json.Marshal(dto.SetSDPReqModel{
|
||||||
PeerDTO: dto.PeerDTO{
|
PeerDTO: dto.PeerDTO{
|
||||||
RoomId: reqModel.RoomId,
|
RoomId: reqModel.RoomId,
|
||||||
ID: reqModel.ID,
|
ID: reqModel.ID,
|
||||||
},
|
},
|
||||||
|
GGID: *ggid,
|
||||||
SDP: *answer,
|
SDP: *answer,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -165,25 +167,50 @@ func (c *RoomController) ResetRoom(ctx *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := c.repo.ResetRoom(roomId)
|
ggid, err := c.repo.ResetRoom(roomId)
|
||||||
if c.helper.HandleIfErr(ctx, err, nil) {
|
if c.helper.HandleIfErr(ctx, err, nil) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.helper.Response(ctx, nil, http.StatusNoContent)
|
c.helper.Response(ctx, struct {
|
||||||
|
GGID uint64 `json:"ggid"`
|
||||||
|
}{
|
||||||
|
GGID: ggid,
|
||||||
|
}, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RoomController) Start(ctx *gin.Context) {
|
func (c *RoomController) Start(ctx *gin.Context) {
|
||||||
buffer, _ := json.Marshal(map[string]any{"roomId": c.conf.TargetRoom, "svcAddr": c.conf.ServiceAddress})
|
reqModel := struct {
|
||||||
|
RoomId string `json:"roomId"`
|
||||||
|
}{}
|
||||||
|
if err := ctx.ShouldBindJSON(&reqModel); err != nil {
|
||||||
|
c.helper.ResponseBadReq(ctx)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
buffer, _ := json.Marshal(map[string]any{"roomId": reqModel.RoomId})
|
||||||
body := bytes.NewReader(buffer)
|
body := bytes.NewReader(buffer)
|
||||||
res, err := http.Post(c.conf.LogjamBaseUrl+"/join", "application/json", body)
|
res, err := http.Post(c.conf.LogjamBaseUrl+"/join", "application/json", body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println(err.Error())
|
println(err.Error())
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
}
|
}
|
||||||
if res != nil && res.StatusCode > 204 {
|
if res != nil {
|
||||||
|
if res.StatusCode > 204 {
|
||||||
resbody, _ := io.ReadAll(res.Body)
|
resbody, _ := io.ReadAll(res.Body)
|
||||||
println("get /join "+res.Status, string(resbody))
|
println("get /join "+res.Status, string(resbody))
|
||||||
|
} else {
|
||||||
|
resbody, _ := io.ReadAll(res.Body)
|
||||||
|
println(string(resbody))
|
||||||
|
respData := make(map[string]any)
|
||||||
|
if len(resbody) > 2 {
|
||||||
|
err := json.Unmarshal(resbody, &respData)
|
||||||
|
if err != nil {
|
||||||
|
println(err.Error())
|
||||||
|
c.helper.Response(ctx, nil, http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
c.helper.Response(ctx, nil, http.StatusNoContent)
|
c.helper.Response(ctx, nil, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
8
main.go
8
main.go
|
@ -6,10 +6,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
svcAddr := flag.String("svc-addr", "http://localhost:8080", "service baseurl to register in logjam ( shouldn't end with / )")
|
|
||||||
src := flag.String("src", ":8080", "listenHost:listenPort")
|
src := flag.String("src", ":8080", "listenHost:listenPort")
|
||||||
logjamBaseUrl := flag.String("logjam-base-url", "http://localhost:8090", "logjam base url( shouldn't end with / )")
|
logjamBaseUrl := flag.String("logjam-base-url", "http://localhost:8090", "logjam base url( shouldn't end with / )")
|
||||||
targetRoom := flag.String("targetRoom", "test", "target room")
|
|
||||||
icetcpmuxListenPort := flag.Uint("ice-tcp-mux-listen-port", 4444, "listen port to use for tcp ice candidates")
|
icetcpmuxListenPort := flag.Uint("ice-tcp-mux-listen-port", 4444, "listen port to use for tcp ice candidates")
|
||||||
customICEHostCandidateIP := flag.String("custom-ice-host-candidate-ip", "", "set to override host ice candidates address")
|
customICEHostCandidateIP := flag.String("custom-ice-host-candidate-ip", "", "set to override host ice candidates address")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
@ -17,10 +15,8 @@ func main() {
|
||||||
if strings.HasSuffix(*logjamBaseUrl, "/") {
|
if strings.HasSuffix(*logjamBaseUrl, "/") {
|
||||||
panic("logjam-base-url shouldn't end with /")
|
panic("logjam-base-url shouldn't end with /")
|
||||||
}
|
}
|
||||||
if strings.HasSuffix(*svcAddr, "/") {
|
|
||||||
panic("service address shouldn't end with /")
|
|
||||||
}
|
|
||||||
app := App{}
|
app := App{}
|
||||||
app.Init(*src, *svcAddr, *logjamBaseUrl, *targetRoom, *icetcpmuxListenPort, *customICEHostCandidateIP)
|
*logjamBaseUrl += "/goldgorilla"
|
||||||
|
app.Init(*src, *logjamBaseUrl, *icetcpmuxListenPort, *customICEHostCandidateIP)
|
||||||
app.Run()
|
app.Run()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,12 +2,14 @@ package models
|
||||||
|
|
||||||
import "github.com/pion/webrtc/v3"
|
import "github.com/pion/webrtc/v3"
|
||||||
|
|
||||||
|
type RejoinMode struct {
|
||||||
|
SimplyJoin bool
|
||||||
|
RoomId string
|
||||||
|
}
|
||||||
type ConfigModel struct {
|
type ConfigModel struct {
|
||||||
ServiceAddress string `json:"serviceAddress"`
|
|
||||||
LogjamBaseUrl string `json:"logjamBaseUrl"`
|
LogjamBaseUrl string `json:"logjamBaseUrl"`
|
||||||
TargetRoom string `json:"targetRoom"`
|
|
||||||
ICETCPMUXListenPort uint `json:"ice_tcpmux_listenPort"`
|
ICETCPMUXListenPort uint `json:"ice_tcpmux_listenPort"`
|
||||||
CustomICEHostCandidateIP string `json:"customICEHostCandidateIP"`
|
CustomICEHostCandidateIP string `json:"customICEHostCandidateIP"`
|
||||||
ICEServers []webrtc.ICEServer `json:"iceServers"`
|
ICEServers []webrtc.ICEServer `json:"iceServers"`
|
||||||
StartRejoinCH *chan bool
|
StartRejoinCH *chan RejoinMode
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,12 +13,14 @@ func (model *PeerDTO) Validate() bool {
|
||||||
|
|
||||||
type CreatePeerReqModel struct {
|
type CreatePeerReqModel struct {
|
||||||
PeerDTO
|
PeerDTO
|
||||||
|
GGID uint64 `json:"ggid"`
|
||||||
CanPublish bool `json:"canPublish"`
|
CanPublish bool `json:"canPublish"`
|
||||||
IsCaller bool `json:"isCaller"`
|
IsCaller bool `json:"isCaller"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type AddPeerICECandidateReqModel struct {
|
type AddPeerICECandidateReqModel struct {
|
||||||
PeerDTO
|
PeerDTO
|
||||||
|
GGID uint64 `json:"ggid"`
|
||||||
ICECandidate webrtc.ICECandidateInit `json:"iceCandidate"`
|
ICECandidate webrtc.ICECandidateInit `json:"iceCandidate"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,6 +30,7 @@ func (model *AddPeerICECandidateReqModel) Validate() bool {
|
||||||
|
|
||||||
type SetSDPReqModel struct {
|
type SetSDPReqModel struct {
|
||||||
PeerDTO
|
PeerDTO
|
||||||
|
GGID uint64 `json:"ggid"`
|
||||||
SDP webrtc.SessionDescription `json:"sdp"`
|
SDP webrtc.SessionDescription `json:"sdp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ type Room struct {
|
||||||
trackLock *sync.Mutex
|
trackLock *sync.Mutex
|
||||||
Tracks map[string]*Track
|
Tracks map[string]*Track
|
||||||
timer *time.Ticker
|
timer *time.Ticker
|
||||||
|
ggId uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type RoomRepository struct {
|
type RoomRepository struct {
|
||||||
|
@ -114,7 +115,7 @@ func (r *RoomRepository) doesPeerExists(roomId string, id uint64) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, isCaller bool) error {
|
func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, isCaller bool, ggid uint64) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
|
|
||||||
if !r.doesRoomExists(roomId) {
|
if !r.doesRoomExists(roomId) {
|
||||||
|
@ -124,6 +125,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i
|
||||||
trackLock: &sync.Mutex{},
|
trackLock: &sync.Mutex{},
|
||||||
Tracks: make(map[string]*Track),
|
Tracks: make(map[string]*Track),
|
||||||
timer: time.NewTicker(3 * time.Second),
|
timer: time.NewTicker(3 * time.Second),
|
||||||
|
ggId: ggid,
|
||||||
}
|
}
|
||||||
r.Rooms[roomId] = room
|
r.Rooms[roomId] = room
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -164,7 +166,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i
|
||||||
}
|
}
|
||||||
|
|
||||||
peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) {
|
peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) {
|
||||||
r.onPeerICECandidate(roomId, id, ic)
|
r.onPeerICECandidate(roomId, id, room.ggId, ic)
|
||||||
})
|
})
|
||||||
peerConn.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
peerConn.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
|
@ -209,14 +211,17 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish bool, i
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomRepository) onCallerDisconnected(roomId string) {
|
func (r *RoomRepository) onCallerDisconnected(roomId string) {
|
||||||
if err := r.ResetRoom(roomId); err != nil {
|
if _, err := r.ResetRoom(roomId); err != nil {
|
||||||
println(err.Error())
|
println(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
*r.conf.StartRejoinCH <- false
|
*r.conf.StartRejoinCH <- models.RejoinMode{
|
||||||
|
SimplyJoin: false,
|
||||||
|
RoomId: roomId,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc.ICECandidate) {
|
func (r *RoomRepository) onPeerICECandidate(roomId string, id, ggid uint64, ic *webrtc.ICECandidate) {
|
||||||
if ic == nil {
|
if ic == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -225,6 +230,7 @@ func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc
|
||||||
RoomId: roomId,
|
RoomId: roomId,
|
||||||
ID: id,
|
ID: id,
|
||||||
},
|
},
|
||||||
|
GGID: ggid,
|
||||||
ICECandidate: ic.ToJSON(),
|
ICECandidate: ic.ToJSON(),
|
||||||
}
|
}
|
||||||
serializedReqBody, err := json.Marshal(reqModel)
|
serializedReqBody, err := json.Marshal(reqModel)
|
||||||
|
@ -518,14 +524,15 @@ func (r *RoomRepository) ClosePeer(roomId string, id uint64) error {
|
||||||
return peer.Conn.Close()
|
return peer.Conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomRepository) ResetRoom(roomId string) error {
|
func (r *RoomRepository) ResetRoom(roomId string) (uint64, error) {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
if !r.doesRoomExists(roomId) {
|
if !r.doesRoomExists(roomId) {
|
||||||
return nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
room := r.Rooms[roomId]
|
room := r.Rooms[roomId]
|
||||||
room.Lock()
|
room.Lock()
|
||||||
|
ggid := room.ggId
|
||||||
room.timer.Stop()
|
room.timer.Stop()
|
||||||
for _, peer := range room.Peers {
|
for _, peer := range room.Peers {
|
||||||
go func(conn *webrtc.PeerConnection) {
|
go func(conn *webrtc.PeerConnection) {
|
||||||
|
@ -534,7 +541,7 @@ func (r *RoomRepository) ResetRoom(roomId string) error {
|
||||||
}
|
}
|
||||||
room.Unlock()
|
room.Unlock()
|
||||||
delete(r.Rooms, roomId)
|
delete(r.Rooms, roomId)
|
||||||
return nil
|
return ggid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error {
|
func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error {
|
||||||
|
@ -548,7 +555,9 @@ func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
ggid := r.GetRoomGGID(roomId)
|
||||||
reqModel := dto.SetSDPReqModel{
|
reqModel := dto.SetSDPReqModel{
|
||||||
|
GGID: *ggid,
|
||||||
PeerDTO: dto.PeerDTO{
|
PeerDTO: dto.PeerDTO{
|
||||||
RoomId: roomId,
|
RoomId: roomId,
|
||||||
ID: peer.ID,
|
ID: peer.ID,
|
||||||
|
@ -568,3 +577,13 @@ func (r *RoomRepository) offerPeer(peer *Peer, roomId string) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RoomRepository) GetRoomGGID(roomId string) *uint64 {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
if !r.doesRoomExists(roomId) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
gid := r.Rooms[roomId].ggId
|
||||||
|
return &gid
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue