Pion 社区 ion-sfu 源码走读记录

pion/ion-sfu和ion中的SFU服务之间的区别和联系? ion中的SFU服务是在pion/ion-sfu的基础上添加了GRPC信令传输功能得来的 ion中的SFU服务代码主要是传输信令和根据信令调用pion/ion-sfu中的函数

pion/ion-sfu和ion中的SFU服务之间的区别和联系?

  • ion中的SFU服务是在pion/ion-sfu的基础上添加了GRPC信令传输功能得来的

  • ion中的SFU服务代码主要是传输信令和根据信令调用pion/ion-sfu中的函数

可以控制pion/ion-sfu主动连接其他SFU吗

  • pion/ion-sfu主要为被动接收连接请求设计,所以不能CreateOffer,ion中的SFU服务只有一个信令服务器,想要发起连接只能用pion/ion-go-sdk将本地流推送到SFU服务,而不能控制SFU服务主动向其他SFU发起请求

  • pion/ion-sfu中有OnOffer,如果hack一下pion/ion-go-sdk

  • Session相关的代码都在pion/ion-sfu里面,ion中的SFU服务的代码中基本没有操作Session的逻辑

可以用本地视频文件创建一个没有上行流的SFU服务吗?

pion/ion-sfu中是如何处理新增的Track的?

首先,pion/ion-sfu中根据视频流的传输方向抽象出了几种传输控制类:

  • PublisherPublisherTrack:处理从外面“Publish”到本SFU的流,即上行流

  • SubscriberDownTrack:处理外面“Subscribe”本SFU的流,即下行流

这两种传输控制类分别有各自的PeerConnection,所以pion/ion-sfu中是没有双向的PeerConnection的,收和发分别由两个PeerConnection控制。两个PeerConnection怎么处理Offer和Answer过程见后文。

PublisherSubscriber的初始化函数大体相同,都会创建PeerConnection。而在Publisher的初始化函数比Subscriber的初始化函数多了这么一段代码

	pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
		Logger.V(1).Info("Peer got remote track id",
			"peer_id", p.id,
			"track_id", track.ID(),
			"mediaSSRC", track.SSRC(),
			"rid", track.RID(),
			"stream_id", track.StreamID(),
		)

		r, pub := p.router.AddReceiver(receiver, track, track.ID(), track.StreamID())
		if pub {
			p.session.Publish(p.router, r)
			p.mu.Lock()
			publisherTrack := PublisherTrack{track, r, true}
			p.tracks = append(p.tracks, publisherTrack)
			for _, rp := range p.relayPeers {
				if err = p.createRelayTrack(track, r, rp.peer); err != nil {
					Logger.V(1).Error(err, "Creating relay track.", "peer_id", p.id)
				}
			}
			p.mu.Unlock()
			if handler, ok := p.onPublisherTrack.Load().(func(PublisherTrack)); ok && handler != nil {
				handler(publisherTrack)
			}
		} else {
			p.mu.Lock()
			p.tracks = append(p.tracks, PublisherTrack{track, r, false})
			p.mu.Unlock()
		}
	})

	pc.OnDataChannel(func(dc *webrtc.DataChannel) {
		if dc.Label() == APIChannelLabel {
			// terminate api data channel
			return
		}
		p.session.AddDatachannel(id, dc)
	})

可以看到,这段代码分别注册了OnTrackOnDataChannel两个函数,在对面有新Track和新DataChannel加进来的时候执行操作,很明显最核心的就是这个p.session.Publish(p.router, r)p.session.AddDatachannel(id, dc)。把这两个函数打开看看:

首先是Publish

func (s *SessionLocal) Publish(router Router, r Receiver) {
	for _, p := range s.Peers() {
		// Don't sub to self
		if router.ID() == p.ID() || p.Subscriber() == nil {
			continue
		}

		Logger.V(0).Info("Publishing track to peer", "peer_id", p.ID())

		if err := router.AddDownTracks(p.Subscriber(), r); err != nil {
			Logger.Error(err, "Error subscribing transport to Router")
			continue
		}
	}
}

太明显了,这就是一个循环把Track加进这个Session的所有Peer的Subscriber的DownTracks里面。

然后是AddDatachannel

func (s *SessionLocal) AddDatachannel(owner string, dc *webrtc.DataChannel) {
	label := dc.Label()

	s.mu.Lock()
	for _, lbl := range s.fanOutDCs {
		if label == lbl {
			s.mu.Unlock()
			return
		}
	}
	s.fanOutDCs = append(s.fanOutDCs, label)
	peerOwner := s.peers[owner]
	s.mu.Unlock()
	peers := s.Peers()
	peerOwner.Subscriber().RegisterDatachannel(label, dc)

	dc.OnMessage(func(msg webrtc.DataChannelMessage) {
		s.FanOutMessage(owner, label, msg)
	})

	for _, p := range peers {
		peer := p
		if peer.ID() == owner || peer.Subscriber() == nil {
			continue
		}
		ndc, err := peer.Subscriber().AddDataChannel(label)

		if err != nil {
			Logger.Error(err, "error adding datachannel")
			continue
		}

		if peer.Publisher() != nil && peer.Publisher().Relayed() {
			peer.Publisher().AddRelayFanOutDataChannel(label)
		}

		pid := peer.ID()
		ndc.OnMessage(func(msg webrtc.DataChannelMessage) {
			s.FanOutMessage(pid, label, msg)

			if peer.Publisher().Relayed() {
				for _, rdc := range peer.Publisher().GetRelayedDataChannels(label) {
					if msg.IsString {
						if err = rdc.SendText(string(msg.Data)); err != nil {
							Logger.Error(err, "Sending dc message err")
						}
					} else {
						if err = rdc.Send(msg.Data); err != nil {
							Logger.Error(err, "Sending dc message err")
						}
					}
				}
			}
		})

		peer.Subscriber().negotiate()
	}
}

一看就是在搞消息转发,就不细看了

pion/ion-sfu中是如何处理关闭track的?

相关操作AddDownTrack的时候就已经通过OnCloseHandler定好了

	downTrack.OnCloseHandler(func() {
		if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
			if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
				if err == webrtc.ErrConnectionClosed {
					return
				}
				Logger.Error(err, "Error closing down track")
			} else {
				sub.RemoveDownTrack(recv.StreamID(), downTrack)
				sub.negotiate()
			}
		}
	})

一个Publisher里过来的Track可能会通过AddDownTrack加到很多个Subscriber里,当Publish侧的SDK通过UnPublish函数关闭了一个流,Publisher里会有流关闭,进而触发所有这些Subscriber里的OnCloseHandler函数,从而达到删除流的目的。

pion/ion-sfu中的JoinConfig是如何控制SFU的转发逻辑的?

JoinConfig长这样:

// JoinConfig allow adding more control to the peers joining a SessionLocal.
type JoinConfig struct {
	// If true the peer will not be allowed to publish tracks to SessionLocal.
	NoPublish bool
	// If true the peer will not be allowed to subscribe to other peers in SessionLocal.
	NoSubscribe bool
	// If true the peer will not automatically subscribe all tracks,
	// and then the peer can use peer.Subscriber().AddDownTrack/RemoveDownTrack
	// to customize the subscrbe stream combination as needed.
	// this parameter depends on NoSubscribe=false.
	NoAutoSubscribe bool
}

首先,在Peer的初始化过程中有NoSubscribeNoPublish发挥作用:

	if !conf.NoSubscribe {
		p.subscriber, err = NewSubscriber(uid, cfg)
		if err != nil {
			return fmt.Errorf("error creating transport: %v", err)
		}

		p.subscriber.noAutoSubscribe = conf.NoAutoSubscribe

		p.subscriber.OnNegotiationNeeded(func() {
			p.Lock()
			defer p.Unlock()

			if p.remoteAnswerPending {
				p.negotiationPending = true
				return
			}

			Logger.V(1).Info("Negotiation needed", "peer_id", p.id)
			offer, err := p.subscriber.CreateOffer()
			if err != nil {
				Logger.Error(err, "CreateOffer error")
				return
			}

			p.remoteAnswerPending = true
			if p.OnOffer != nil && !p.closed.get() {
				Logger.V(0).Info("Send offer", "peer_id", p.id)
				p.OnOffer(&offer)
			}
		})

		p.subscriber.OnICECandidate(func(c *webrtc.ICECandidate) {
			Logger.V(1).Info("On subscriber ice candidate called for peer", "peer_id", p.id)
			if c == nil {
				return
			}

			if p.OnIceCandidate != nil && !p.closed.get() {
				json := c.ToJSON()
				p.OnIceCandidate(&json, subscriber)
			}
		})
	}

	if !conf.NoSubscribe {
		p.session.Subscribe(p)
	}

显然,这NoSubscribe在生成PeerLocal时控制Subscriber的初始化,如果NoSubscribe=true就不会有Subscriber生成。从而也就没法AddDownTrack向外传出Track。

	if !conf.NoPublish {
		p.publisher, err = NewPublisher(uid, p.session, &cfg)
		if err != nil {
			return fmt.Errorf("error creating transport: %v", err)
		}
		if !conf.NoSubscribe {
			for _, dc := range p.session.GetDCMiddlewares() {
				if err := p.subscriber.AddDatachannel(p, dc); err != nil {
					return fmt.Errorf("setting subscriber default dc datachannel: %w", err)
				}
			}
		}

		p.publisher.OnICECandidate(func(c *webrtc.ICECandidate) {
			Logger.V(1).Info("on publisher ice candidate called for peer", "peer_id", p.id)
			if c == nil {
				return
			}

			if p.OnIceCandidate != nil && !p.closed.get() {
				json := c.ToJSON()
				p.OnIceCandidate(&json, publisher)
			}
		})

		p.publisher.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) {
			if p.OnICEConnectionStateChange != nil && !p.closed.get() {
				p.OnICEConnectionStateChange(s)
			}
		})
	}

显然,这NoPublish在生成PeerLocal时控制Publisher的初始化,如果NoPublish=true就不会有Publisher生成。根据上一节的分析,PublisherTrack增减相关的操作主要就是在Publisher的初始化过程中执行的,没有了Publisher也就不会有对传入的PublisherTrack的那些操作了,从而也就不会接收传入的Track。

此外,我们发现NoAutoSubscribe被赋值给了p.subscriber.noAutoSubscribe这个值主要在AddDownTracks的里面发挥作用:

func (r *router) AddDownTracks(s *Subscriber, recv Receiver) error {
	r.Lock()
	defer r.Unlock()

	if s.noAutoSubscribe {
		Logger.Info("peer turns off automatic subscription, skip tracks add")
		return nil
	}

	if recv != nil {
		if _, err := r.AddDownTrack(s, recv); err != nil {
			return err
		}
		s.negotiate()
		return nil
	}

	if len(r.receivers) > 0 {
		for _, rcv := range r.receivers {
			if _, err := r.AddDownTrack(s, rcv); err != nil {
				return err
			}
		}
		s.negotiate()
	}
	return nil
}

所以,当调用Publish的时候,NoAutoSubscribe=true的router不会被调用AddDownTrack。根据前面对Publisher的初始化函数的分析,Publisher有新Track到达的时候会对所有Session里的Peer调用Publish,所以NoAutoSubscribe=true不调用AddDownTrack就意味着新Track到达的时候这个Peer没法AddDownTrack,所以达到了“No Auto Subscribe”的目的。

PublisherSubscriber两个PeerConnection怎么处理Offer和Answer的?

SDK的代码上看,信令的传输也会被分类为两种。在SDK侧,接收到的所有Offer都交给negotiate函数处理,接收到的所有Answer都交给setRemoteSDP函数处理:

			var sdpType webrtc.SDPType
			if payload.Description.Type == "offer" {
				sdpType = webrtc.SDPTypeOffer
			} else {
				sdpType = webrtc.SDPTypeAnswer
			}
			sdp := webrtc.SessionDescription{
				SDP:  payload.Description.Sdp,
				Type: sdpType,
			}
			if sdp.Type == webrtc.SDPTypeOffer {
				log.Infof("[%v] [description] got offer call s.OnNegotiate sdp=%+v", r.uid, sdp)
				err := r.negotiate(sdp)
				if err != nil {
					log.Errorf("error: %v", err)
				}
			} else if sdp.Type == webrtc.SDPTypeAnswer {
				log.Infof("[%v] [description] got answer call sdp=%+v", r.uid, sdp)
				err = r.setRemoteSDP(sdp)
				if err != nil {
					log.Errorf("[%v] [description] setRemoteSDP err=%s", r.uid, err)
				}
			}

并且可以看到negotiate函数里基本上都是对Subscribe方向的操作、setRemoteSDP函数里基本上都是对Publish方向的操作。

所以,所有从SFU到SDK的流(即“Subscribe”)都是SFU向SDK发Offer、SDK向SFU回Answer;所有从SDK到SFU的流(即“Publish”)都是SDK向SFU发Offer、SFU向SDK回Answer。

所以:

  • 如果在SFU那边收到了Offer,那必然是“Publish”流里的,应该给Publisher里的PeerConnection用,并且让Publisher里的PeerConnection回复一个Answer。代码位于这里

  • 如果在SFU那边收到了Answer,那必然是“Subscribe”流里的,应该给Subscriber里的PeerConnection用。代码位于这里

  • 如果在SDK这边收到了Offer,那必然是“Subscribe”流里的,应该给Subscribe方向的PeerConnection用,并且让Subscribe方向的PeerConnection回复一个Answer。代码就是上面介绍的negotiate

  • 如果在SDK这边收到了Answer,那必然是“Publish”流里的,应该给Publish方向的PeerConnection用。代码就是上面介绍的setRemoteSDP

那两个方向的Offer都是从哪来的?

“Publish”流的Offer是SDK在Join函数里发出的:

	offer, err := r.pub.pc.CreateOffer(nil)
	if err != nil {
		return err
	}

	err = r.pub.pc.SetLocalDescription(offer)
	if err != nil {
		return err
	}

	if len(config) > 0 {
		err = r.SendJoin(sid, r.uid, offer, *config[0])
	} else {
		err = r.SendJoin(sid, r.uid, offer, nil)
	}

这里的SendJoin就是将SDP打包在rtc.Request_Join里发出。

“Subscribe”流的Offer是在SFU处理上面这SDK发的rtc.Request_Join请求时通过设置OnOffer发出的:

			// Notify user of new offer
			peer.OnOffer = func(o *webrtc.SessionDescription) {
				log.Debugf("[S=>C] peer.OnOffer: %v", o.SDP)
				err = sig.Send(&rtc.Reply{
					Payload: &rtc.Reply_Description{
						Description: &rtc.SessionDescription{
							Target: rtc.Target(rtc.Target_SUBSCRIBER),
							Sdp:    o.SDP,
							Type:   o.Type.String(),
						},
					},
				})
				if err != nil {
					log.Errorf("negotiation error: %v", err)
				}
			}

很明显,不用多讲。

进一步,SDK接收“Subscribe”流和SFU接收“Publish”流用的都是OnTrack,SFU里的操作前面已经介绍了,SDK里的OnTrack在这:

pion/ion-sfu里面是如何AddDownTrack的?

NoAutoSubscribe=true时的AddDownTrack

SFU里的Request_Subscription处理函数这里开始解析:

					for _, p := range peer.Session().Peers() {
						if p.ID() != peer.ID() {
							for _, track := range p.Publisher().PublisherTracks() {
								if track.Receiver.TrackID() == trackInfo.TrackId && track.Track.RID() == trackInfo.Layer {
									log.Infof("Add RemoteTrack: %v to peer %v %v %v", trackInfo.TrackId, peer.ID(), track.Track.Kind(), track.Track.RID())
									dt, err := peer.Publisher().GetRouter().AddDownTrack(peer.Subscriber(), track.Receiver)
									if err != nil {
										log.Errorf("AddDownTrack error: %v", err)
									}

首先很显然这里的peer.Publisher().GetRouter().AddDownTrack(peer.Subscriber(), track.Receiver)就是把别人的接收到Track的track.Receiver加进自己的发送器peer.Subscriber()里。

这个函数长这样

func (r *router) AddDownTrack(sub *Subscriber, recv Receiver) (*DownTrack, error) {
	for _, dt := range sub.GetDownTracks(recv.StreamID()) {
		if dt.ID() == recv.TrackID() {
			return dt, nil
		}
	}

	codec := recv.Codec()
	if err := sub.me.RegisterCodec(codec, recv.Kind()); err != nil {
		return nil, err
	}

	downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
		MimeType:     codec.MimeType,
		ClockRate:    codec.ClockRate,
		Channels:     codec.Channels,
		SDPFmtpLine:  codec.SDPFmtpLine,
		RTCPFeedback: []webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},
	}, recv, r.bufferFactory, sub.id, r.config.MaxPacketTrack)
	if err != nil {
		return nil, err
	}
	// Create webrtc sender for the peer we are sending track to
	if downTrack.transceiver, err = sub.pc.AddTransceiverFromTrack(downTrack, webrtc.RTPTransceiverInit{
		Direction: webrtc.RTPTransceiverDirectionSendonly,
	}); err != nil {
		return nil, err
	}

	// nolint:scopelint
	downTrack.OnCloseHandler(func() {
		if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
			if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
				if err == webrtc.ErrConnectionClosed {
					return
				}
				Logger.Error(err, "Error closing down track")
			} else {
				sub.RemoveDownTrack(recv.StreamID(), downTrack)
				sub.negotiate()
			}
		}
	})

	downTrack.OnBind(func() {
		go sub.sendStreamDownTracksReports(recv.StreamID())
	})

	sub.AddDownTrack(recv.StreamID(), downTrack)
	recv.AddDownTrack(downTrack, r.config.Simulcast.BestQualityFirst)
	return downTrack, nil
}

这里的NewDownTrack生成的downTrack是一个继承了TrackLocal的类,可以看到被AddTransceiverFromTrack加进PeerConnection里了,并且在最后用sub.AddDownTrackrecv.AddDownTrack加进Subscriber和Receiver里了。

这两个AddDownTrack都是简单的用变量记录DownTrack

func (s *Subscriber) AddDownTrack(streamID string, downTrack *DownTrack) {
	s.Lock()
	defer s.Unlock()
	if dt, ok := s.tracks[streamID]; ok {
		dt = append(dt, downTrack)
		s.tracks[streamID] = dt
	} else {
		s.tracks[streamID] = []*DownTrack{downTrack}
	}
}
func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) {

	...

	w.Lock()
	w.storeDownTrack(layer, track)
	w.Unlock()
}

...


func (w *WebRTCReceiver) storeDownTrack(layer int, dt *DownTrack) {
	dts := w.downTracks[layer].Load().([]*DownTrack)
	ndts := make([]*DownTrack, len(dts)+1)
	copy(ndts, dts)
	ndts[len(ndts)-1] = dt
	w.downTracks[layer].Store(ndts)
}

那Receiver里收到的东西到底是怎么通过这个DownTrack进的Subscriber? 在Receiver的AddUpTrack里可以看见:

func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool) {
	if w.closed.get() {
		return
	}

	var layer int
	switch track.RID() {
	case fullResolution:
		layer = 2
	case halfResolution:
		layer = 1
	default:
		layer = 0
	}

	w.Lock()
	w.upTracks[layer] = track
	w.buffers[layer] = buff
	w.available[layer].set(true)
	w.downTracks[layer].Store(make([]*DownTrack, 0, 10))
	w.pendingTracks[layer] = make([]*DownTrack, 0, 10)
	w.Unlock()

	subBestQuality := func(targetLayer int) {
		for l := 0; l < targetLayer; l++ {
			dts := w.downTracks[l].Load()
			if dts == nil {
				continue
			}
			for _, dt := range dts.([]*DownTrack) {
				_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
			}
		}
	}

	subLowestQuality := func(targetLayer int) {
		for l := 2; l != targetLayer; l-- {
			dts := w.downTracks[l].Load()
			if dts == nil {
				continue
			}
			for _, dt := range dts.([]*DownTrack) {
				_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
			}
		}
	}

	if w.isSimulcast {
		if bestQualityFirst && (!w.available[2].get() || layer == 2) {
			subBestQuality(layer)
		} else if !bestQualityFirst && (!w.available[0].get() || layer == 0) {
			subLowestQuality(layer)
		}
	}
	go w.writeRTP(layer)
}

这个函数看样子只一个只能调用一次的函数,它最后创建了一个go程w.writeRTP,在这个函数里面,我们终于看到了包转发WriteRTP的过程:

func (w *WebRTCReceiver) writeRTP(layer int) {
	defer func() {
		w.closeOnce.Do(func() {
			w.closed.set(true)
			w.closeTracks()
		})
	}()

	pli := []rtcp.Packet{
		&rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: w.SSRC(layer)},
	}

	for {
		pkt, err := w.buffers[layer].ReadExtended()
		if err == io.EOF {
			return
		}

		if w.isSimulcast {
			if w.pending[layer].get() {
				if pkt.KeyFrame {
					w.Lock()
					for idx, dt := range w.pendingTracks[layer] {
						w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.id)
						w.storeDownTrack(layer, dt)
						dt.SwitchSpatialLayerDone(int32(layer))
						w.pendingTracks[layer][idx] = nil
					}
					w.pendingTracks[layer] = w.pendingTracks[layer][:0]
					w.pending[layer].set(false)
					w.Unlock()
				} else {
					w.SendRTCP(pli)
				}
			}
		}

		for _, dt := range w.downTracks[layer].Load().([]*DownTrack) {
			if err = dt.WriteRTP(pkt, layer); err != nil {
				if err == io.EOF && err == io.ErrClosedPipe {
					w.Lock()
					w.deleteDownTrack(layer, dt.id)
					w.Unlock()
				}
				Logger.Error(err, "Error writing to down track", "id", dt.id)
			}
		}
	}

}

所以就是把包给所有DownTrack都写一份。

所以这Receiver的结构也很清楚了,就是一个UpTrack和多个DownTrack,设置好UpTrack后开go程不断把包从UpTrack复制几份写进所有的DownTrack里面。

所以综上所述,可以看出真正的包转发操作全是在Publisher相关代码里完成的,Subscriber实际上只起一个记录的作用。

NoAutoSubscribe=false时的AddDownTrack

前面已经介绍过在Publisher的初始化函数比Subscriber的初始化函数多的这么一段代码,可以看到最后也是回到Router中的AddDownTrack函数里。

Comment