pion/ion-sfu
和ion中的SFU服务之间的区别和联系?
ion中的SFU服务是在
pion/ion-sfu
的基础上添加了GRPC信令传输功能得来的ion中的SFU服务代码主要是传输信令和根据信令调用
pion/ion-sfu
中的函数
可以控制pion/ion-sfu
主动连接其他SFU吗
pion/ion-sfu
主要为被动接收连接请求设计,所以不能CreateOffer
,ion中的SFU服务只有一个信令服务器,想要发起连接只能用pion/ion-go-sdk
将本地流推送到SFU服务,而不能控制SFU服务主动向其他SFU发起请求但
pion/ion-sfu
中有OnOffer
,如果hack一下pion/ion-go-sdk
Session相关的代码都在
pion/ion-sfu
里面,ion中的SFU服务的代码中基本没有操作Session的逻辑
可以用本地视频文件创建一个没有上行流的SFU服务吗?
pion/ion-sfu
中是如何处理新增的Track的?
首先,pion/ion-sfu
中根据视频流的传输方向抽象出了几种传输控制类:
Publisher
和PublisherTrack
:处理从外面“Publish”到本SFU的流,即上行流Subscriber
和DownTrack
:处理外面“Subscribe”本SFU的流,即下行流
这两种传输控制类分别有各自的PeerConnection,所以pion/ion-sfu
中是没有双向的PeerConnection的,收和发分别由两个PeerConnection控制。两个PeerConnection怎么处理Offer和Answer过程见后文。
Publisher
和Subscriber
的初始化函数大体相同,都会创建PeerConnection。而在Publisher
的初始化函数比Subscriber
的初始化函数多了这么一段代码:
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
Logger.V(1).Info("Peer got remote track id",
"peer_id", p.id,
"track_id", track.ID(),
"mediaSSRC", track.SSRC(),
"rid", track.RID(),
"stream_id", track.StreamID(),
)
r, pub := p.router.AddReceiver(receiver, track, track.ID(), track.StreamID())
if pub {
p.session.Publish(p.router, r)
p.mu.Lock()
publisherTrack := PublisherTrack{track, r, true}
p.tracks = append(p.tracks, publisherTrack)
for _, rp := range p.relayPeers {
if err = p.createRelayTrack(track, r, rp.peer); err != nil {
Logger.V(1).Error(err, "Creating relay track.", "peer_id", p.id)
}
}
p.mu.Unlock()
if handler, ok := p.onPublisherTrack.Load().(func(PublisherTrack)); ok && handler != nil {
handler(publisherTrack)
}
} else {
p.mu.Lock()
p.tracks = append(p.tracks, PublisherTrack{track, r, false})
p.mu.Unlock()
}
})
pc.OnDataChannel(func(dc *webrtc.DataChannel) {
if dc.Label() == APIChannelLabel {
// terminate api data channel
return
}
p.session.AddDatachannel(id, dc)
})
可以看到,这段代码分别注册了OnTrack
和OnDataChannel
两个函数,在对面有新Track和新DataChannel加进来的时候执行操作,很明显最核心的就是这个p.session.Publish(p.router, r)
和p.session.AddDatachannel(id, dc)
。把这两个函数打开看看:
首先是Publish
:
func (s *SessionLocal) Publish(router Router, r Receiver) {
for _, p := range s.Peers() {
// Don't sub to self
if router.ID() == p.ID() || p.Subscriber() == nil {
continue
}
Logger.V(0).Info("Publishing track to peer", "peer_id", p.ID())
if err := router.AddDownTracks(p.Subscriber(), r); err != nil {
Logger.Error(err, "Error subscribing transport to Router")
continue
}
}
}
太明显了,这就是一个循环把Track加进这个Session的所有Peer的Subscriber
的DownTracks里面。
然后是AddDatachannel
:
func (s *SessionLocal) AddDatachannel(owner string, dc *webrtc.DataChannel) {
label := dc.Label()
s.mu.Lock()
for _, lbl := range s.fanOutDCs {
if label == lbl {
s.mu.Unlock()
return
}
}
s.fanOutDCs = append(s.fanOutDCs, label)
peerOwner := s.peers[owner]
s.mu.Unlock()
peers := s.Peers()
peerOwner.Subscriber().RegisterDatachannel(label, dc)
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
s.FanOutMessage(owner, label, msg)
})
for _, p := range peers {
peer := p
if peer.ID() == owner || peer.Subscriber() == nil {
continue
}
ndc, err := peer.Subscriber().AddDataChannel(label)
if err != nil {
Logger.Error(err, "error adding datachannel")
continue
}
if peer.Publisher() != nil && peer.Publisher().Relayed() {
peer.Publisher().AddRelayFanOutDataChannel(label)
}
pid := peer.ID()
ndc.OnMessage(func(msg webrtc.DataChannelMessage) {
s.FanOutMessage(pid, label, msg)
if peer.Publisher().Relayed() {
for _, rdc := range peer.Publisher().GetRelayedDataChannels(label) {
if msg.IsString {
if err = rdc.SendText(string(msg.Data)); err != nil {
Logger.Error(err, "Sending dc message err")
}
} else {
if err = rdc.Send(msg.Data); err != nil {
Logger.Error(err, "Sending dc message err")
}
}
}
}
})
peer.Subscriber().negotiate()
}
}
一看就是在搞消息转发,就不细看了
pion/ion-sfu
中是如何处理关闭track的?
相关操作在AddDownTrack
的时候就已经通过OnCloseHandler
定好了
downTrack.OnCloseHandler(func() {
if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
if err == webrtc.ErrConnectionClosed {
return
}
Logger.Error(err, "Error closing down track")
} else {
sub.RemoveDownTrack(recv.StreamID(), downTrack)
sub.negotiate()
}
}
})
一个Publisher
里过来的Track可能会通过AddDownTrack
加到很多个Subscriber
里,当Publish侧的SDK通过UnPublish
函数关闭了一个流,Publisher
里会有流关闭,进而触发所有这些Subscriber
里的OnCloseHandler
函数,从而达到删除流的目的。
pion/ion-sfu
中的JoinConfig
是如何控制SFU的转发逻辑的?
JoinConfig
长这样:
// JoinConfig allow adding more control to the peers joining a SessionLocal.
type JoinConfig struct {
// If true the peer will not be allowed to publish tracks to SessionLocal.
NoPublish bool
// If true the peer will not be allowed to subscribe to other peers in SessionLocal.
NoSubscribe bool
// If true the peer will not automatically subscribe all tracks,
// and then the peer can use peer.Subscriber().AddDownTrack/RemoveDownTrack
// to customize the subscrbe stream combination as needed.
// this parameter depends on NoSubscribe=false.
NoAutoSubscribe bool
}
首先,在Peer的初始化过程中有NoSubscribe
和NoPublish
发挥作用:
if !conf.NoSubscribe {
p.subscriber, err = NewSubscriber(uid, cfg)
if err != nil {
return fmt.Errorf("error creating transport: %v", err)
}
p.subscriber.noAutoSubscribe = conf.NoAutoSubscribe
p.subscriber.OnNegotiationNeeded(func() {
p.Lock()
defer p.Unlock()
if p.remoteAnswerPending {
p.negotiationPending = true
return
}
Logger.V(1).Info("Negotiation needed", "peer_id", p.id)
offer, err := p.subscriber.CreateOffer()
if err != nil {
Logger.Error(err, "CreateOffer error")
return
}
p.remoteAnswerPending = true
if p.OnOffer != nil && !p.closed.get() {
Logger.V(0).Info("Send offer", "peer_id", p.id)
p.OnOffer(&offer)
}
})
p.subscriber.OnICECandidate(func(c *webrtc.ICECandidate) {
Logger.V(1).Info("On subscriber ice candidate called for peer", "peer_id", p.id)
if c == nil {
return
}
if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
p.OnIceCandidate(&json, subscriber)
}
})
}
if !conf.NoSubscribe {
p.session.Subscribe(p)
}
显然,这NoSubscribe
在生成PeerLocal
时控制Subscriber
的初始化,如果NoSubscribe=true
就不会有Subscriber
生成。从而也就没法AddDownTrack
向外传出Track。
if !conf.NoPublish {
p.publisher, err = NewPublisher(uid, p.session, &cfg)
if err != nil {
return fmt.Errorf("error creating transport: %v", err)
}
if !conf.NoSubscribe {
for _, dc := range p.session.GetDCMiddlewares() {
if err := p.subscriber.AddDatachannel(p, dc); err != nil {
return fmt.Errorf("setting subscriber default dc datachannel: %w", err)
}
}
}
p.publisher.OnICECandidate(func(c *webrtc.ICECandidate) {
Logger.V(1).Info("on publisher ice candidate called for peer", "peer_id", p.id)
if c == nil {
return
}
if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
p.OnIceCandidate(&json, publisher)
}
})
p.publisher.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) {
if p.OnICEConnectionStateChange != nil && !p.closed.get() {
p.OnICEConnectionStateChange(s)
}
})
}
显然,这NoPublish
在生成PeerLocal
时控制Publisher
的初始化,如果NoPublish=true
就不会有Publisher
生成。根据上一节的分析,PublisherTrack
增减相关的操作主要就是在Publisher
的初始化过程中执行的,没有了Publisher
也就不会有对传入的PublisherTrack
的那些操作了,从而也就不会接收传入的Track。
此外,我们发现NoAutoSubscribe
被赋值给了p.subscriber.noAutoSubscribe
这个值主要在AddDownTracks
的里面发挥作用:
func (r *router) AddDownTracks(s *Subscriber, recv Receiver) error {
r.Lock()
defer r.Unlock()
if s.noAutoSubscribe {
Logger.Info("peer turns off automatic subscription, skip tracks add")
return nil
}
if recv != nil {
if _, err := r.AddDownTrack(s, recv); err != nil {
return err
}
s.negotiate()
return nil
}
if len(r.receivers) > 0 {
for _, rcv := range r.receivers {
if _, err := r.AddDownTrack(s, rcv); err != nil {
return err
}
}
s.negotiate()
}
return nil
}
所以,当调用Publish
的时候,NoAutoSubscribe=true
的router不会被调用AddDownTrack
。根据前面对Publisher
的初始化函数的分析,Publisher
有新Track到达的时候会对所有Session里的Peer调用Publish
,所以NoAutoSubscribe=true
不调用AddDownTrack
就意味着新Track到达的时候这个Peer没法AddDownTrack
,所以达到了“No Auto Subscribe”的目的。
Publisher
和Subscriber
两个PeerConnection怎么处理Offer和Answer的?
从SDK的代码上看,信令的传输也会被分类为两种。在SDK侧,接收到的所有Offer都交给negotiate
函数处理,接收到的所有Answer都交给setRemoteSDP
函数处理:
var sdpType webrtc.SDPType
if payload.Description.Type == "offer" {
sdpType = webrtc.SDPTypeOffer
} else {
sdpType = webrtc.SDPTypeAnswer
}
sdp := webrtc.SessionDescription{
SDP: payload.Description.Sdp,
Type: sdpType,
}
if sdp.Type == webrtc.SDPTypeOffer {
log.Infof("[%v] [description] got offer call s.OnNegotiate sdp=%+v", r.uid, sdp)
err := r.negotiate(sdp)
if err != nil {
log.Errorf("error: %v", err)
}
} else if sdp.Type == webrtc.SDPTypeAnswer {
log.Infof("[%v] [description] got answer call sdp=%+v", r.uid, sdp)
err = r.setRemoteSDP(sdp)
if err != nil {
log.Errorf("[%v] [description] setRemoteSDP err=%s", r.uid, err)
}
}
并且可以看到negotiate
函数里基本上都是对Subscribe方向的操作、setRemoteSDP
函数里基本上都是对Publish方向的操作。
所以,所有从SFU到SDK的流(即“Subscribe”)都是SFU向SDK发Offer、SDK向SFU回Answer;所有从SDK到SFU的流(即“Publish”)都是SDK向SFU发Offer、SFU向SDK回Answer。
所以:
如果在SFU那边收到了Offer,那必然是“Publish”流里的,应该给
Publisher
里的PeerConnection用,并且让Publisher
里的PeerConnection回复一个Answer。代码位于这里。如果在SFU那边收到了Answer,那必然是“Subscribe”流里的,应该给
Subscriber
里的PeerConnection用。代码位于这里。如果在SDK这边收到了Offer,那必然是“Subscribe”流里的,应该给Subscribe方向的PeerConnection用,并且让Subscribe方向的PeerConnection回复一个Answer。代码就是上面介绍的
negotiate
。如果在SDK这边收到了Answer,那必然是“Publish”流里的,应该给Publish方向的PeerConnection用。代码就是上面介绍的
setRemoteSDP
。
那两个方向的Offer都是从哪来的?
“Publish”流的Offer是SDK在Join
函数里发出的:
offer, err := r.pub.pc.CreateOffer(nil)
if err != nil {
return err
}
err = r.pub.pc.SetLocalDescription(offer)
if err != nil {
return err
}
if len(config) > 0 {
err = r.SendJoin(sid, r.uid, offer, *config[0])
} else {
err = r.SendJoin(sid, r.uid, offer, nil)
}
这里的SendJoin
就是将SDP打包在rtc.Request_Join
里发出。
“Subscribe”流的Offer是在SFU处理上面这SDK发的rtc.Request_Join
请求时通过设置OnOffer
发出的:
// Notify user of new offer
peer.OnOffer = func(o *webrtc.SessionDescription) {
log.Debugf("[S=>C] peer.OnOffer: %v", o.SDP)
err = sig.Send(&rtc.Reply{
Payload: &rtc.Reply_Description{
Description: &rtc.SessionDescription{
Target: rtc.Target(rtc.Target_SUBSCRIBER),
Sdp: o.SDP,
Type: o.Type.String(),
},
},
})
if err != nil {
log.Errorf("negotiation error: %v", err)
}
}
很明显,不用多讲。
进一步,SDK接收“Subscribe”流和SFU接收“Publish”流用的都是OnTrack
,SFU里的操作前面已经介绍了,SDK里的OnTrack
在这:
pion/ion-sfu
里面是如何AddDownTrack
的?
NoAutoSubscribe=true
时的AddDownTrack
从SFU里的Request_Subscription
处理函数这里开始解析:
for _, p := range peer.Session().Peers() {
if p.ID() != peer.ID() {
for _, track := range p.Publisher().PublisherTracks() {
if track.Receiver.TrackID() == trackInfo.TrackId && track.Track.RID() == trackInfo.Layer {
log.Infof("Add RemoteTrack: %v to peer %v %v %v", trackInfo.TrackId, peer.ID(), track.Track.Kind(), track.Track.RID())
dt, err := peer.Publisher().GetRouter().AddDownTrack(peer.Subscriber(), track.Receiver)
if err != nil {
log.Errorf("AddDownTrack error: %v", err)
}
首先很显然这里的peer.Publisher().GetRouter().AddDownTrack(peer.Subscriber(), track.Receiver)
就是把别人的接收到Track的track.Receiver
加进自己的发送器peer.Subscriber()
里。
这个函数长这样:
func (r *router) AddDownTrack(sub *Subscriber, recv Receiver) (*DownTrack, error) {
for _, dt := range sub.GetDownTracks(recv.StreamID()) {
if dt.ID() == recv.TrackID() {
return dt, nil
}
}
codec := recv.Codec()
if err := sub.me.RegisterCodec(codec, recv.Kind()); err != nil {
return nil, err
}
downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
MimeType: codec.MimeType,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.SDPFmtpLine,
RTCPFeedback: []webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},
}, recv, r.bufferFactory, sub.id, r.config.MaxPacketTrack)
if err != nil {
return nil, err
}
// Create webrtc sender for the peer we are sending track to
if downTrack.transceiver, err = sub.pc.AddTransceiverFromTrack(downTrack, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
}); err != nil {
return nil, err
}
// nolint:scopelint
downTrack.OnCloseHandler(func() {
if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
if err == webrtc.ErrConnectionClosed {
return
}
Logger.Error(err, "Error closing down track")
} else {
sub.RemoveDownTrack(recv.StreamID(), downTrack)
sub.negotiate()
}
}
})
downTrack.OnBind(func() {
go sub.sendStreamDownTracksReports(recv.StreamID())
})
sub.AddDownTrack(recv.StreamID(), downTrack)
recv.AddDownTrack(downTrack, r.config.Simulcast.BestQualityFirst)
return downTrack, nil
}
这里的NewDownTrack
生成的downTrack
是一个继承了TrackLocal
的类,可以看到被AddTransceiverFromTrack
加进PeerConnection里了,并且在最后用sub.AddDownTrack
和recv.AddDownTrack
加进Subscriber和Receiver里了。
这两个AddDownTrack
都是简单的用变量记录DownTrack
:
func (s *Subscriber) AddDownTrack(streamID string, downTrack *DownTrack) {
s.Lock()
defer s.Unlock()
if dt, ok := s.tracks[streamID]; ok {
dt = append(dt, downTrack)
s.tracks[streamID] = dt
} else {
s.tracks[streamID] = []*DownTrack{downTrack}
}
}
func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) {
...
w.Lock()
w.storeDownTrack(layer, track)
w.Unlock()
}
...
func (w *WebRTCReceiver) storeDownTrack(layer int, dt *DownTrack) {
dts := w.downTracks[layer].Load().([]*DownTrack)
ndts := make([]*DownTrack, len(dts)+1)
copy(ndts, dts)
ndts[len(ndts)-1] = dt
w.downTracks[layer].Store(ndts)
}
那Receiver里收到的东西到底是怎么通过这个DownTrack进的Subscriber? 在Receiver的AddUpTrack
里可以看见:
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool) {
if w.closed.get() {
return
}
var layer int
switch track.RID() {
case fullResolution:
layer = 2
case halfResolution:
layer = 1
default:
layer = 0
}
w.Lock()
w.upTracks[layer] = track
w.buffers[layer] = buff
w.available[layer].set(true)
w.downTracks[layer].Store(make([]*DownTrack, 0, 10))
w.pendingTracks[layer] = make([]*DownTrack, 0, 10)
w.Unlock()
subBestQuality := func(targetLayer int) {
for l := 0; l < targetLayer; l++ {
dts := w.downTracks[l].Load()
if dts == nil {
continue
}
for _, dt := range dts.([]*DownTrack) {
_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
}
}
}
subLowestQuality := func(targetLayer int) {
for l := 2; l != targetLayer; l-- {
dts := w.downTracks[l].Load()
if dts == nil {
continue
}
for _, dt := range dts.([]*DownTrack) {
_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
}
}
}
if w.isSimulcast {
if bestQualityFirst && (!w.available[2].get() || layer == 2) {
subBestQuality(layer)
} else if !bestQualityFirst && (!w.available[0].get() || layer == 0) {
subLowestQuality(layer)
}
}
go w.writeRTP(layer)
}
这个函数看样子只一个只能调用一次的函数,它最后创建了一个go程w.writeRTP
,在这个函数里面,我们终于看到了包转发WriteRTP
的过程:
func (w *WebRTCReceiver) writeRTP(layer int) {
defer func() {
w.closeOnce.Do(func() {
w.closed.set(true)
w.closeTracks()
})
}()
pli := []rtcp.Packet{
&rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: w.SSRC(layer)},
}
for {
pkt, err := w.buffers[layer].ReadExtended()
if err == io.EOF {
return
}
if w.isSimulcast {
if w.pending[layer].get() {
if pkt.KeyFrame {
w.Lock()
for idx, dt := range w.pendingTracks[layer] {
w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.id)
w.storeDownTrack(layer, dt)
dt.SwitchSpatialLayerDone(int32(layer))
w.pendingTracks[layer][idx] = nil
}
w.pendingTracks[layer] = w.pendingTracks[layer][:0]
w.pending[layer].set(false)
w.Unlock()
} else {
w.SendRTCP(pli)
}
}
}
for _, dt := range w.downTracks[layer].Load().([]*DownTrack) {
if err = dt.WriteRTP(pkt, layer); err != nil {
if err == io.EOF && err == io.ErrClosedPipe {
w.Lock()
w.deleteDownTrack(layer, dt.id)
w.Unlock()
}
Logger.Error(err, "Error writing to down track", "id", dt.id)
}
}
}
}
所以就是把包给所有DownTrack都写一份。
所以这Receiver的结构也很清楚了,就是一个UpTrack和多个DownTrack,设置好UpTrack后开go程不断把包从UpTrack复制几份写进所有的DownTrack里面。
所以综上所述,可以看出真正的包转发操作全是在Publisher相关代码里完成的,Subscriber实际上只起一个记录的作用。
NoAutoSubscribe=false
时的AddDownTrack
前面已经介绍过在Publisher
的初始化函数比Subscriber
的初始化函数多的这么一段代码,可以看到最后也是回到Router
中的AddDownTrack
函数里。