diff --git a/main.go b/main.go index 606c0d1..451cf23 100644 --- a/main.go +++ b/main.go @@ -19,9 +19,8 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { return } - peer := NewPeer(conn) - room.AddPeer(peer) - log.Println("peer added", conn.LocalAddr().String()) + peer := NewPeer(conn, room) + room.Add(peer) go peer.ReadLoop() } @@ -29,6 +28,6 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { func main() { http.HandleFunc("/ws", wsHandler) - log.Println("listening on :8080") + log.Println("SFU listening on :8080") log.Fatal(http.ListenAndServe(":8080", nil)) } diff --git a/peer.go b/peer.go index fa82a0d..ca0c271 100644 --- a/peer.go +++ b/peer.go @@ -2,55 +2,70 @@ package main import ( "encoding/json" + "log" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/pion/webrtc/v4" ) -type Peer struct { - ID string - Conn *websocket.Conn - PC *webrtc.PeerConnection - Room *Room - Senders map[string]*webrtc.RTPSender -} - type Signal struct { Type string `json:"type"` Data json.RawMessage `json:"data"` } -func NewPeer(conn *websocket.Conn) *Peer { - pc, _ := webrtc.NewPeerConnection(webrtc.Configuration{}) +type Peer struct { + ID string + Conn *websocket.Conn + PC *webrtc.PeerConnection + Room *Room +} - p := &Peer{ - ID: uuid.NewString(), - Conn: conn, - PC: pc, - Senders: make(map[string]*webrtc.RTPSender), +func NewPeer(conn *websocket.Conn, room *Room) *Peer { + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + {URLs: []string{"stun:stun.l.google.com:19302"}}, + }, + }) + if err != nil { + panic(err) } - pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { - localTrack := NewTrack(track) - - p.Room.ForwardTrack(p, localTrack) - - for { - pkt, _, err := track.ReadRTP() - if err != nil { - return - } - localTrack.Write(pkt) - } - }) + p := &Peer{ + ID: uuid.NewString(), + Conn: conn, + PC: pc, + Room: room, + } pc.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { return } - msg, _ := json.Marshal(c.ToJSON()) - p.Send("ice", msg) + data, _ := json.Marshal(c.ToJSON()) + p.Send("ice", data) + }) + + pc.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + log.Println("track received", remote.Kind()) + + local := NewTrack(remote) + + p.Room.mu.Lock() + for _, other := range p.Room.peers { + if other.ID != p.ID { + other.AddTrack(local) + } + } + p.Room.mu.Unlock() + + for { + pkt, _, err := remote.ReadRTP() + if err != nil { + return + } + local.Write(pkt) + } }) return p @@ -66,7 +81,11 @@ func (p *Peer) AddTrack(track *Track) { } func (p *Peer) ReadLoop() { - defer p.Conn.Close() + defer func() { + p.Conn.Close() + p.Room.Remove(p.ID) + p.PC.Close() + }() for { var msg Signal @@ -95,6 +114,5 @@ func (p *Peer) ReadLoop() { } func (p *Peer) Send(t string, data []byte) { - msg := Signal{Type: t, Data: data} - p.Conn.WriteJSON(msg) + p.Conn.WriteJSON(Signal{Type: t, Data: data}) } diff --git a/room.go b/room.go index ea92b66..14a5668 100644 --- a/room.go +++ b/room.go @@ -13,29 +13,14 @@ func NewRoom() *Room { } } -func (r *Room) AddPeer(p *Peer) { +func (r *Room) Add(p *Peer) { r.mu.Lock() defer r.mu.Unlock() - r.peers[p.ID] = p - p.Room = r } -func (r *Room) RemovePeer(id string) { +func (r *Room) Remove(id string) { r.mu.Lock() defer r.mu.Unlock() - delete(r.peers, id) } - -func (r *Room) ForwardTrack(from *Peer, track *Track) { - r.mu.Lock() - defer r.mu.Unlock() - - for _, peer := range r.peers { - if peer.ID == from.ID { - continue - } - peer.AddTrack(track) - } -}