Compare commits

..

2 Commits

8 changed files with 165 additions and 84 deletions

37
app.go
View File

@ -3,12 +3,16 @@ package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sourcecode.social/greatape/goldgorilla/controllers"
"sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/repositories"
"sourcecode.social/greatape/goldgorilla/routers"
"io"
"net/http"
"syscall"
"time"
)
@ -18,11 +22,13 @@ type App struct {
src string
}
func (a *App) Init(srcListenAddr string, logjamBaseUrl string, targetRoom string) {
func (a *App) Init(srcListenAddr string, svcAddr string, logjamBaseUrl string, targetRoom string) {
println("initializing ..")
a.src = srcListenAddr
a.conf = &models.ConfigModel{
LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node",
TargetRoom: targetRoom,
LogjamBaseUrl: logjamBaseUrl + "/auxiliary-node",
TargetRoom: targetRoom,
ServiceAddress: svcAddr,
}
roomRepo := repositories.NewRoomRepository(a.conf)
a.router = &routers.Router{}
@ -31,14 +37,25 @@ func (a *App) Init(srcListenAddr string, logjamBaseUrl string, targetRoom string
err := a.router.RegisterRoutes(roomCtrl)
panicIfErr(err)
{
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGILL, syscall.SIGSTOP)
go func() {
a.onDie(<-sigs)
}()
}
}
func (a *App) Run() {
go func() {
start:
buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom})
buffer, _ := json.Marshal(map[string]any{"roomId": a.conf.TargetRoom, "svcAddr": a.conf.ServiceAddress})
body := bytes.NewReader(buffer)
res, err := http.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body)
c := &http.Client{
Timeout: 8 * time.Second,
}
res, err := c.Post(a.conf.LogjamBaseUrl+"/join", "application/json", body)
if err != nil {
println(err.Error())
time.Sleep(4 * time.Second)
@ -55,6 +72,12 @@ func (a *App) Run() {
panicIfErr(err)
}
func (a *App) onDie(sig os.Signal) {
fmt.Println("<-", sig)
os.Exit(0)
}
func panicIfErr(err error) {
if err != nil {
panic(err)

View File

@ -4,10 +4,12 @@ import (
"bytes"
"encoding/json"
"github.com/gin-gonic/gin"
"io"
"net/http"
"sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/models/dto"
"sourcecode.social/greatape/goldgorilla/repositories"
"net/http"
"time"
)
type RoomController struct {
@ -34,34 +36,11 @@ func (c *RoomController) CreatePeer(ctx *gin.Context) {
c.helper.ResponseUnprocessableEntity(ctx)
return
}
offer, err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller)
err := c.repo.CreatePeer(reqModel.RoomId, reqModel.ID, reqModel.CanPublish, reqModel.IsCaller)
if c.helper.HandleIfErr(ctx, err, nil) {
return
}
c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
if offer != nil {
buffer, err := json.Marshal(dto.SetSDPReqModel{
PeerDTO: dto.PeerDTO{
RoomId: reqModel.RoomId,
ID: reqModel.ID,
},
SDP: *offer,
})
if err != nil {
println(err.Error())
return
}
reader := bytes.NewReader(buffer)
resp, err := http.Post(c.conf.LogjamBaseUrl+"/offer", "application/json", reader)
if err != nil {
println(err.Error())
return
}
if resp.StatusCode > 204 {
println(resp.Status)
}
}
}
func (c *RoomController) AddICECandidate(ctx *gin.Context) {
@ -133,6 +112,7 @@ func (c *RoomController) Answer(ctx *gin.Context) {
}
err := c.repo.SetPeerAnswer(reqModel.RoomId, reqModel.ID, reqModel.SDP)
if c.helper.HandleIfErr(ctx, err, nil) {
println(err.Error())
return
}
c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
@ -154,3 +134,43 @@ func (c *RoomController) ClosePeer(ctx *gin.Context) {
}
c.helper.Response(ctx, struct{}{}, http.StatusNoContent)
}
func (c *RoomController) ResetRoom(ctx *gin.Context) {
var reqModel map[string]any
badReqSt := 400
if err := ctx.ShouldBindJSON(&reqModel); c.helper.HandleIfErr(ctx, err, &badReqSt) {
return
}
roomId := ""
if rid, exists := reqModel["roomId"]; !exists {
c.helper.ResponseUnprocessableEntity(ctx)
return
} else {
if castedrid, stringItIs := rid.(string); stringItIs {
roomId = castedrid
} else {
c.helper.ResponseUnprocessableEntity(ctx)
return
}
}
err := c.repo.ResetRoom(roomId)
if c.helper.HandleIfErr(ctx, err, nil) {
return
}
c.helper.Response(ctx, nil, http.StatusNoContent)
}
func (c *RoomController) Start(ctx *gin.Context) {
buffer, _ := json.Marshal(map[string]any{"roomId": c.conf.TargetRoom, "svcAddr": c.conf.ServiceAddress})
body := bytes.NewReader(buffer)
res, err := http.Post(c.conf.LogjamBaseUrl+"/join", "application/json", body)
if err != nil {
println(err.Error())
time.Sleep(4 * time.Second)
}
if res.StatusCode > 204 {
resbody, _ := io.ReadAll(res.Body)
println("get /join "+res.Status, string(resbody))
}
}

View File

@ -0,0 +1,7 @@
package controllers
import "github.com/gin-gonic/gin"
func HealthCheck(ctx *gin.Context) {
ctx.Status(204)
}

View File

@ -6,6 +6,7 @@ import (
)
func main() {
svcAddr := flag.String("svc-addr", "", "service baseurl to register in logjam ( shoudln't end with / )")
src := flag.String("src", ":8080", "listenhost:listenPort")
logjamBaseUrl := flag.String("logjam-base-url", "https://example.com", "logjam base url(shouldn't end with /)")
targetRoom := flag.String("targetRoom", "testyroom", "target room")
@ -15,7 +16,10 @@ func main() {
if strings.HasSuffix(*logjamBaseUrl, "/") {
panic("logjam-base-url shouldn't end with /")
}
if strings.HasSuffix(*svcAddr, "/") {
panic("service address shouldn't end with /")
}
app := App{}
app.Init(*src, *logjamBaseUrl, *targetRoom)
app.Init(*src, *svcAddr, *logjamBaseUrl, *targetRoom)
app.Run()
}

View File

@ -1,6 +1,7 @@
package models
type ConfigModel struct {
LogjamBaseUrl string `json:"logjamBaseUrl"`
TargetRoom string `json:"targetRoom"`
ServiceAddress string `json:"serviceAddress"`
LogjamBaseUrl string `json:"logjamBaseUrl"`
TargetRoom string `json:"targetRoom"`
}

View File

@ -6,10 +6,10 @@ import (
"fmt"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/models/dto"
"log"
"net/http"
"sourcecode.social/greatape/goldgorilla/models"
"sourcecode.social/greatape/goldgorilla/models/dto"
"sync"
"time"
)
@ -30,6 +30,7 @@ type Room struct {
Peers map[uint64]*Peer
trackLock *sync.Mutex
Tracks map[string]*Track
timer *time.Ticker
}
type RoomRepository struct {
@ -64,7 +65,7 @@ func (r *RoomRepository) doesPeerExists(roomId string, id uint64) bool {
return false
}
func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCaller bool) (*webrtc.SessionDescription, error) {
func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCaller bool) error {
r.Lock()
if !r.doesRoomExists(roomId) {
@ -73,10 +74,11 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall
Peers: make(map[uint64]*Peer),
trackLock: &sync.Mutex{},
Tracks: make(map[string]*Track),
timer: time.NewTicker(3 * time.Second),
}
r.Rooms[roomId] = room
go func() {
for range time.NewTicker(3 * time.Second).C {
for range room.timer.C {
room.Lock()
for _, peer := range room.Peers {
for _, receiver := range peer.Conn.GetReceivers() {
@ -104,14 +106,7 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall
peerConn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return nil, models.NewError("can't create peer connection", 500, models.MessageResponse{Message: err.Error()})
}
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if _, err := peerConn.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendrecv,
}); err != nil {
return nil, models.NewError("unhandled error, contact support #1313", 500, err)
}
return models.NewError("can't create peer connection", 500, models.MessageResponse{Message: err.Error()})
}
peerConn.OnICECandidate(func(ic *webrtc.ICECandidate) {
@ -124,29 +119,12 @@ func (r *RoomRepository) CreatePeer(roomId string, id uint64, canPublish, isCall
r.onPeerTrack(roomId, id, remote, receiver)
})
var sdp *webrtc.SessionDescription
if !isCaller {
offer, err := peerConn.CreateOffer(nil)
if err != nil {
println(err.Error())
peerConn.Close()
return nil, models.NewError("unhandled error, contact support #1314", 500, err)
}
err = peerConn.SetLocalDescription(offer)
if err != nil {
println(err.Error())
peerConn.Close()
return nil, models.NewError("unhandled error, contact support #1315", 500, err)
}
sdp = &offer
}
room.Peers[id] = &Peer{
ID: id,
Conn: peerConn,
}
return sdp, nil
go r.updatePCTracks(roomId)
return nil
}
func (r *RoomRepository) onPeerICECandidate(roomId string, id uint64, ic *webrtc.ICECandidate) {
@ -271,33 +249,53 @@ func (r *RoomRepository) updatePCTracks(roomId string) {
receivingPeerTracks[track.ID()] = rtpReceiver
}
room.trackLock.Lock()
renegotiate := false
for id, track := range room.Tracks {
_, alreadySend := alreadySentTracks[id]
_, alreadyReceiver := receivingPeerTracks[id]
if track.OwnerId != peer.ID && (!alreadySend && !alreadyReceiver) {
go func(peer *Peer, track *Track) {
println("add track")
_, err := peer.Conn.AddTrack(track.TrackLocal)
if err != nil {
println(err.Error())
}
}(peer, track)
renegotiate = true
println("add track")
_, err := peer.Conn.AddTrack(track.TrackLocal)
if err != nil {
println(err.Error())
return
}
}
}
room.trackLock.Unlock()
/*room.trackLock.Lock()
for trackId, _ := range alreadySentTracks {
if _, exists := room.Tracks[trackId]; !exists {
go func(peer *Peer, rtpSender *webrtc.RTPSender) {
err := peer.Conn.RemoveTrack(rtpSender)
if err != nil {
println(err.Error())
}
}(peer, alreadySentTracks[trackId])
if renegotiate {
offer, err := peer.Conn.CreateOffer(nil)
if err != nil {
println(err.Error())
return
}
err = peer.Conn.SetLocalDescription(offer)
if err != nil {
println(err.Error())
return
}
reqModel := dto.SetSDPReqModel{
PeerDTO: dto.PeerDTO{
RoomId: roomId,
ID: peer.ID,
},
SDP: offer,
}
bodyJson, err := json.Marshal(reqModel)
if err != nil {
println(err.Error())
return
}
res, err := http.Post(r.conf.LogjamBaseUrl+"/offer", "application/json", bytes.NewReader(bodyJson))
if err != nil {
println(err.Error())
return
}
if res.StatusCode > 204 {
println("/offer ", res.Status)
}
}
room.trackLock.Unlock()*/
}
}
@ -410,3 +408,26 @@ func (r *RoomRepository) ClosePeer(roomId string, id uint64) error {
}
return room.Peers[id].Conn.Close()
}
func (r *RoomRepository) ResetRoom(roomId string) error {
r.Lock()
if !r.doesRoomExists(roomId) {
r.Unlock()
return nil
}
room := r.Rooms[roomId]
r.Unlock()
room.Lock()
room.timer.Stop()
for _, peer := range room.Peers {
peer.Conn.Close()
}
room.Unlock()
r.Lock()
delete(r.Rooms, roomId)
r.Unlock()
return nil
}

View File

@ -9,8 +9,12 @@ func registerRoomRoutes(rg *gin.RouterGroup, ctrl *controllers.RoomController) {
rg.POST("/peer", ctrl.CreatePeer)
rg.DELETE("/peer", ctrl.ClosePeer)
rg.POST("/ice", ctrl.AddICECandidate)
rg.POST("/answer", ctrl.Answer)
rg.POST("/offer", ctrl.Offer)
rg.POST("/", ctrl.Start)
rg.DELETE("/", ctrl.ResetRoom)
}

View File

@ -13,6 +13,7 @@ func (r *Router) RegisterRoutes(rCtrl *controllers.RoomController) error {
gin.SetMode(gin.ReleaseMode)
r.router = gin.Default()
registerRoomRoutes(r.router.Group("/room"), rCtrl)
r.router.GET("/healthcheck", controllers.HealthCheck)
return nil
}