pion/interceptor浅析

v3.0.0 introduces a new Pion specific concept known as a interceptor. A Interceptor is a pluggable RTP/RTCP processor. Via a public API users can easi

v3.0.0 introduces a new Pion specific concept known as a interceptor. A Interceptor is a pluggable RTP/RTCP processor. Via a public API users can easily add and customize operations that are run on inbound/outbound RTP. Interceptors are an interface this means A user could provide their own implementation. Or you can use one of the interceptors Pion will have in-tree.

We designed this with the following vision.

  • Useful defaults. Nontechnical users should be able to build things without tuning.

  • Don't block unique use cases. We shouldn't paint ourself in a corner. We want to support interesting WebRTC users

  • Allow users to bring their own logic. We should encourage easy changing. Innovation in this area is important.

  • Allow users to learn. Don't put this stuff deep in the code base. Should be easy to jump into so people can learn.

In this release we only are providing a NACK Generator/Responder interceptor. We will be implementing more and you can import the latest at anytime! This means you can update your pion/interceptor version without having to upgrade pion/webrtc!

总的来说,在WebRTC通信中,要收发的包可以分为两类:

  • RTP包:传递媒体内容,如音视频片段等

  • RTCP包:传递控制信息,如NACK、接收方报告等

pion/interceptor也是按照这样的思路进行实现的,其中实现的interceptor都同时要处理RTP包和RTCP包,处理方式基本上都是根据RTP包的收发情况构造RTCP包进行发送,以及根据收到的RTCP包调整RTP包的发送。

  • 比如在pion/interceptor/pkg/nack里,就是一方根据RTP包的序列号发送NACK信息,另一方根据NACK信息重发RTP包

  • 又比如在pion/interceptor/pkg/nack里,就是一方统计RTP包的接收情况发送SenderReport,另一方接收并存储之

  • 再比如在pion/interceptor/pkg/twcc里,就是一方统计RTP包的接收情况反馈丢包信息,另一方接收然后调整RTP包发送窗口

此外,从逻辑上讲,一个协议必须得收发双方都实现了才能正常运行,而且由于WebRTC是对等连接通信,一方可能既是接收方又是发送方,所以pion/interceptor里的interceptor都必须得实现收发双方的功能。在程序里,这个思想体现为收发方基础接口不是分SenderInterceptorReceiverInterceptor两个,而是在一个基础接口Interceptor中同时包含收发双方的方法。

核心接口

// Interceptor can be used to add functionality to you PeerConnections by modifying any incoming/outgoing rtp/rtcp
// packets, or sending your own packets as needed.
type Interceptor interface {

Interceptor接口是pion/interceptor包的核心,pion/interceptor包的主要功能代码是pion/interceptor/pkg里继承了Interceptor类的各种interceptor,其余的代码基本都是这个Interceptor的方法参数里用到的类。pion/interceptor/pkg里的这些interceptor都是pion/webrtc里要用到的。用户也可以自己定义interceptor用在pion/webrtc里。

BindRTCPReader方法

	// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
	// change in the future. The returned method will be called once per packet batch.
	BindRTCPReader(reader RTCPReader) RTCPReader

  • 从函数名上看,这是一个绑定RTCPReader的接口

  • 从注释上看,这个接口是为了让用户自定义修改输入RTCP数据包的过程

  • 从函数的输入输出上看,函数输入一个RTCPReader输出一个RTCPReader,这个方法在使用时应该是可以级联的

再看看这个RTCPReader是什么

// RTCPReader is used by Interceptor.BindRTCPReader.
type RTCPReader interface {
	// Read a batch of rtcp packets
	Read([]byte, Attributes) (int, Attributes, error)
}

可以看到,RTCPReader就是一个Read函数,输入一段字节数据和Attributes,输出整型、Attributes和错误。

结合前面BindRTCPReader里说的功能,这个Read应该就是用户实现自定义修改输入RTCP数据包过程的地方。

输出里的错误自不必多讲,这里输入的字节数据应该就是修改前的RTCP数据包,修改过程应该是直接在这个字节输入上进行,后面输出的整型应该是修改后的数据长度,让之后的操作可以直接从字节数据里读出RTCP包。这个Attributes在后面定义的,是一个map[interface{}]interface{},应该是用于传递一些自定义参数的。

pion/interceptor里还提供了一种简便的构造RTCPReader的方式:

// RTCPReaderFunc is an adapter for RTCPReader interface
type RTCPReaderFunc func([]byte, Attributes) (int, Attributes, error)

// Read a batch of rtcp packets
func (f RTCPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) {
	return f(b, a)
}

这样,只要写好Read里的代码,可以不用再定义一个RTCPReader子类,直接把函数放进RTCPReaderFunc就行。函数式编程思想,很妙。pion/interceptor/pkg里的几个interceptor都是这么用的。

那么这么看,BindRTCPReader确实是可以级联的,并且BindRTCPReader里面要实现的操作也能大概猜得到:

  • BindRTCPReader输入的RTCPReader构造自己的RTCPReader作为输出,在自己的RTCPReaderRead函数中:

    1. 调用BindRTCPReader输入的RTCPReaderRead函数

    2. 根据返回的整型值,读取修改后的字节数据,反序列化为RTCP包

    3. 修改RTCP包和Attributes,或进行一些其他自定义操作(比如记录统计信息、转发、筛选等)

    4. 把修改后RTCP包序列化到字节数据里(可选)

    5. 返回整型值和Attributes

pion/interceptor/pkg里的几个interceptor都是这样的,不过它们都没有修改字节数据的操作。

  • 比如在pion/interceptor/pkg/nack里,interceptor从字节数据里获取RTCP包,然后判断是不是NACK包,如果是就按照NACK里汇报的丢包情况重发RTCP包

  • 再比如在pion/interceptor/pkg/report里,interceptor从字节数据里获取RTCP包,然后判断是不是SenderReport包,如果是就存储之

于是,一级一级地调用一串interceptor的BindRTCPReader,每个BindRTCPReader都以上一个interceptor的BindRTCPReader返回的RTCPReader为输入;输出的RTCPReaderRead里面先调用了输入的RTCPReaderRead,再进行自定义的修改操作,返回修改后的RTCP包字节数据。这样,最后一个interceptor的BindRTCPReader输出的RTCPReaderRead就是一个顺序执行所有自定义操作的RTCP包处理函数。

BindRTCPWriter方法

	// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
	// will be called once per packet batch.
	BindRTCPWriter(writer RTCPWriter) RTCPWriter

  • 从函数名上看,这是一个绑定RTCPWriter的接口

  • 从注释上看,这个接口是为了让用户自定义修改输出RTCP数据包的过程

  • 很明显,这个方法在使用时应该和BindRTCPReader一样也是可以级联的,级联方式应该也大差不离

// RTCPWriter is used by Interceptor.BindRTCPWriter.
type RTCPWriter interface {
	// Write a batch of rtcp packets
	Write(pkts []rtcp.Packet, attributes Attributes) (int, error)
}

一股子RTCPReader的既视感,明显也是可以级联的,要实现的操作应该也差不多:

  • BindRTCPWriter输入的RTCPWriter构造自己的RTCPWriter作为输出,在RTCPWriterWrite函数里对输入的rtcp.Packet列表进行增减(也就是增减要发送的)

但是pion/interceptor/pkg里的几个interceptor好像都没这样用,它们的BindRTCPWriter都是直接记录下RTCPWriter,然后开了个协程写数据:

  • 比如在pion/interceptor/pkg/nack里是定期读取接收日志,找到有哪些缺失的包,收集序列号构造NACK包发送

  • 再比如在pion/interceptor/pkg/report里是定期发送SenderReport包

  • 又比如在pion/interceptor/pkg/twcc里是定期发送反馈信息

pion/interceptor里也提供了一种简便的构造RTCPWriter的方式:

// RTCPWriterFunc is an adapter for RTCPWriter interface
type RTCPWriterFunc func(pkts []rtcp.Packet, attributes Attributes) (int, error)

// Write a batch of rtcp packets
func (f RTCPWriterFunc) Write(pkts []rtcp.Packet, attributes Attributes) (int, error) {
	return f(pkts, attributes)
}

RTCPReaderFunc一个道理,不必多讲。

BindRemoteStream方法

	// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
	// will be called once per rtp packet.
	BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader

	// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
	UnbindRemoteStream(info *StreamInfo)

绑定和解绑远端流,从方法和注释上看和BindRTCPReader是类似的,都是用来绑定处理发送出去的数据包的方法的。

这里绑定的RTPReaderRTCPReaderRead函数里的输入参数是一模一样的:

// RTPReader is used by Interceptor.BindRemoteStream.
type RTPReader interface {
	// Read a rtp packet
	Read([]byte, Attributes) (int, Attributes, error)
}

BindRemoteStreamBindRTCPReader唯一的区别在于包处理的方式:RTP包和RTCP包在逻辑上的不同之处在于,RTP包是从属于一个流的连续序列,而RTCP包是一个个独立的包。因此在BindRTCPReader中,输入的数据直接就是一个RTCPReader;而BindRemoteStream不仅需要指定RTPReader,还需要指定一个存储流信息的StreamInfo

这个StreamInfo长这样:

// StreamInfo is the Context passed when a StreamLocal or StreamRemote has been Binded or Unbinded
type StreamInfo struct {
	ID                  string
	Attributes          Attributes
	SSRC                uint32
	PayloadType         uint8
	RTPHeaderExtensions []RTPHeaderExtension
	MimeType            string
	ClockRate           uint32
	Channels            uint16
	SDPFmtpLine         string
	RTCPFeedback        []RTCPFeedback
}

// RTPHeaderExtension represents a negotiated RFC5285 RTP header extension.
type RTPHeaderExtension struct {
	URI string
	ID  int
}

// RTCPFeedback signals the connection to use additional RTCP packet types.
// https://draft.ortc.org/#dom-rtcrtcpfeedback
type RTCPFeedback struct {
	// Type is the type of feedback.
	// see: https://draft.ortc.org/#dom-rtcrtcpfeedback
	// valid: ack, ccm, nack, goog-remb, transport-cc
	Type string

	// The parameter value depends on the type.
	// For example, type="nack" parameter="pli" will send Picture Loss Indicator packets.
	Parameter string
}

可以看到,这个StreamInfo里面放的是一些与流有关的配置信息。由于RTP包承载的是流,流中的包可以看成是一个整体,是一系列相互关联的连续包,不像RTCP包那样是一个个独立的包,一会是NACK、一会又是SenderReport。StreamInfo就是这些连续RTP包中与流相关的标记信息,它可以用来区分RTP包属于哪个流、区分媒体的类型、记录时钟频率等等。

一些重要的StreamInfo参数:引自《RTP: audio and video for the Internet》

  • SSRC:Synchronization Source(SSRC)标识RTP会话中的参与者。它是一个临时的,每个会话的标识符通过RTP控制协议映射到一个长期的规范名称CNAME。SSRC是一个32位整数,由参与者加入会话时随机选择。具有相同SSRC的所有数据包均构成单个时序和序列号空间的一部分,因此接收方必须按SSRC对数据包进行分组才能进行播放。如果参加者在一个RTP会话中生成多个流(例如,来自不同的摄像机),每个流都必须标识为不同的SSRC,以便接收方可以区分哪些数据包属于每个流。

  • PayloadType:有效负载类型。RTP头的负载类型(或者PT)与RTP传输的媒体数据关联。接收者应用检测负载类型来甄别如何处理数据,例如,传递给特定的解压缩器。

  • MimeType:有效负载格式。有效负载格式是根据MIME名称空间命名的。该名称空间最初是为电子邮件定义的,用于标识附件的内容,但此后它已成为媒体格式的通用名称空间,并在许多应用程序中使用。所有有效负载格式都应该具有MIME类型注册。更新的有效负载格式将其包含在其各自规范中; 在线维护MIME类型的完整列表,网址为:http://www.iana.org/assignments/media-types

还有其他的一些参数在RTP包相关的IETF标准里都应该能找到。

pion/interceptor里也提供了一种简便的构造RTPReader的方式:

// RTPReaderFunc is an adapter for RTPReader interface
type RTPReaderFunc func([]byte, Attributes) (int, Attributes, error)

// Read a rtp packet
func (f RTPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) {
	return f(b, a)
}

RTCPReaderFunc一个道理,不必多讲。

BindLocalStream方法

	// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
	// will be called once per rtp packet.
	BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter

	// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
	UnbindLocalStream(info *StreamInfo)

绑定和解绑本地流,从方法和注释上看和BindRTCPWriter是类似的,都是用来绑定处理发送出去的数据包的方法的,这里绑定的RTPWriter也和RTCPWriter大差不离:

// RTPWriter is used by Interceptor.BindLocalStream.
type RTPWriter interface {
	// Write a rtp packet
	Write(header *rtp.Header, payload []byte, attributes Attributes) (int, error)
}

可以看到,唯一的区别在于包构建方式:从代码上看,RTCPWriter.Write的输入直接就是rtcp.Packet的列表;而RTPWriter.Write的输入是分开的一个包头rtp.Header[]byte格式的内容。这可能是因为RTCP只会传递一些运行状态数据和控制信息,每种包都有自己独特的结构,而RTP包是由一长串媒体数据切开包装而来,结构比较规整,不能给用户随便调整,所以把包头和包内容分了两个变量。

pion/interceptor里也提供了一种简便的构造RTPWriter的方式:

// RTPWriterFunc is an adapter for RTPWrite interface
type RTPWriterFunc func(header *rtp.Header, payload []byte, attributes Attributes) (int, error)

// Write a rtp packet
func (f RTPWriterFunc) Write(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
	return f(header, payload, attributes)
}

RTCPReaderFunc一个道理,不必多讲。

Close

	io.Closer
}

1
2

这里好理解,当要销毁这个Interceptor的时候,必须要解绑所有的RTCPReaderRTCPWriterRTPReaderRTPWriter,并且停止所有的相关协程,这个只能由实现Interceptor的用户来做。所以在这里加上了一个io.Closer,要求用户自己实现一个Close方法。

以下是一些关于级联和Interceptor具体如何调用的知识。

  • 在级联的开头,用户需要自行调用Read把包传进级联的Reader里

  • 在级联的末尾,用户需要自行在Write里写上发送包的函数,把级联的Writer传来的包发送出去

比如NACK发送方接收RTP包就是首先获取到RTPReader

// Create the writer just for a single SSRC stream
// this is a callback that is fired everytime a RTP packet is ready to be sent
streamReader := chain.BindRemoteStream(&interceptor.StreamInfo{
	SSRC:         ssrc,
	RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
}, interceptor.RTPReaderFunc(func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { return len(b), nil, nil }))

然后在循环里从UDP处收包之后放进RTPReader.Read

i, addr, err := conn.ReadFrom(buffer)
if err != nil {
	panic(err)
}

log.Println("Received RTP")

if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
	panic(err)
}

由于级联了NACK Interceptor,所以就能执行一些包统计的操作,找出未接收到的RTP包,构造NACK。

然后NACK发送方发NACK包就是写在RTCPWriter.Write里的:

chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
	buf, err := rtcp.Marshal(pkts)
	if err != nil {
		return 0, err
	}

	return conn.WriteTo(buf, addr)
}))

这样就能完成“收RTP包——统计丢包——发NACK”的操作。

NACK接收方也是一样,先获取RTCPReader

// Set the interceptor wide RTCP Reader
// this is a handle to send NACKs back into the interceptor.
rtcpReader := chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
	return len(in), nil, nil
}))

然后也是在循环里UDP收包之后放进RTCPReader.Read

i, err := conn.Read(rtcpBuf)
if err != nil {
	panic(err)
}

log.Println("Received NACK")

if _, _, err = rtcpWriter.Read(rtcpBuf[:i], nil); err != nil {
	panic(err)
}

于是获取到NACK解包出来就知道要重发哪些RTP包了。

然后NACK接收方重发RTP包就是写在RTPWriter.Write里的:

// Create the writer just for a single SSRC stream
// this is a callback that is fired everytime a RTP packet is ready to be sent
streamWriter := chain.BindLocalStream(&interceptor.StreamInfo{
	SSRC:         ssrc,
	RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
}, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
	headerBuf, err := header.Marshal()
	if err != nil {
		panic(err)
	}

	return conn.Write(append(headerBuf, payload...))
}))

这样就能完成“接收NACK包——找出需要重发的RTP包——重发RTP包”的操作了。

总结一下Interceptor的创建过程

首先是继承interceptor.NoOp,因为实际情况下不一定需要把BindLocalStreamBindRTCPWriterBindRemoteStreamBindRTCPReader四个全实现。

实现BindLocalStream

  1. 实现一个RTPWriter,在其中保存另一个RTPWriter,并在其Write函数中调用保存的RTPWriter.Write

  2. 实现BindLocalStream,将输入的RTPWriter保存到你实现的RTPWriter中并返回

  3. (常见操作)让你实现的RTPWriter可以读到Interceptor里的数据,然后在BindLocalStream里开goroutine调用RTPWriter定期获取Interceptor里的数据并据此调用保存的另一个RTPWriter写一些特殊功能的包

实现BindRTCPWriter

同上,只不过RTPWriter变成RTCPWriter

实现BindRemoteStream

  1. 实现一个RTPReader,在其中保存另一个RTPReader,并在其Read函数中调用保存的RTPReader.Read

  2. 实现BindRemoteStream,将输入的RTPReader保存到你实现的RTPReader中并返回

  3. (常见操作)让RTPReader可以操作Interceptor里的数据,从而可以根据RTPReader.Read输入的数据修改Interceptor里的数据,进而影响其绑定的RTPWriterRTCPWriter的行为

实现BindRTCPReader

同上,只不过RTPReader变成RTCPReader

LICENSED UNDER CC BY-NC-SA 4.0
Comment