v3.0.0 introduces a new Pion specific concept known as a interceptor. A Interceptor is a pluggable RTP/RTCP processor. Via a public API users can easily add and customize operations that are run on inbound/outbound RTP. Interceptors are an interface this means A user could provide their own implementation. Or you can use one of the interceptors Pion will have in-tree.
We designed this with the following vision.
Useful defaults. Nontechnical users should be able to build things without tuning.
Don't block unique use cases. We shouldn't paint ourself in a corner. We want to support interesting WebRTC users
Allow users to bring their own logic. We should encourage easy changing. Innovation in this area is important.
Allow users to learn. Don't put this stuff deep in the code base. Should be easy to jump into so people can learn.
In this release we only are providing a NACK Generator/Responder interceptor. We will be implementing more and you can import the latest at anytime! This means you can update your pion/interceptor version without having to upgrade pion/webrtc!
总的来说,在WebRTC通信中,要收发的包可以分为两类:
RTP包:传递媒体内容,如音视频片段等
RTCP包:传递控制信息,如NACK、接收方报告等
pion/interceptor
也是按照这样的思路进行实现的,其中实现的interceptor都同时要处理RTP包和RTCP包,处理方式基本上都是根据RTP包的收发情况构造RTCP包进行发送,以及根据收到的RTCP包调整RTP包的发送。
比如在
pion/interceptor/pkg/nack
里,就是一方根据RTP包的序列号发送NACK信息,另一方根据NACK信息重发RTP包又比如在
pion/interceptor/pkg/nack
里,就是一方统计RTP包的接收情况发送SenderReport,另一方接收并存储之再比如在
pion/interceptor/pkg/twcc
里,就是一方统计RTP包的接收情况反馈丢包信息,另一方接收然后调整RTP包发送窗口
此外,从逻辑上讲,一个协议必须得收发双方都实现了才能正常运行,而且由于WebRTC是对等连接通信,一方可能既是接收方又是发送方,所以pion/interceptor
里的interceptor都必须得实现收发双方的功能。在程序里,这个思想体现为收发方基础接口不是分SenderInterceptor
和ReceiverInterceptor
两个,而是在一个基础接口Interceptor
中同时包含收发双方的方法。
核心接口
// Interceptor can be used to add functionality to you PeerConnections by modifying any incoming/outgoing rtp/rtcp
// packets, or sending your own packets as needed.
type Interceptor interface {
Interceptor
接口是pion/interceptor
包的核心,pion/interceptor
包的主要功能代码是pion/interceptor/pkg
里继承了Interceptor
类的各种interceptor,其余的代码基本都是这个Interceptor
的方法参数里用到的类。pion/interceptor/pkg
里的这些interceptor都是pion/webrtc
里要用到的。用户也可以自己定义interceptor用在pion/webrtc
里。
BindRTCPReader
方法
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
// change in the future. The returned method will be called once per packet batch.
BindRTCPReader(reader RTCPReader) RTCPReader
从函数名上看,这是一个绑定
RTCPReader
的接口从注释上看,这个接口是为了让用户自定义修改输入RTCP数据包的过程
从函数的输入输出上看,函数输入一个
RTCPReader
输出一个RTCPReader
,这个方法在使用时应该是可以级联的
再看看这个RTCPReader
是什么
// RTCPReader is used by Interceptor.BindRTCPReader.
type RTCPReader interface {
// Read a batch of rtcp packets
Read([]byte, Attributes) (int, Attributes, error)
}
可以看到,RTCPReader
就是一个Read
函数,输入一段字节数据和Attributes
,输出整型、Attributes
和错误。
结合前面BindRTCPReader
里说的功能,这个Read
应该就是用户实现自定义修改输入RTCP数据包过程的地方。
输出里的错误自不必多讲,这里输入的字节数据应该就是修改前的RTCP数据包,修改过程应该是直接在这个字节输入上进行,后面输出的整型应该是修改后的数据长度,让之后的操作可以直接从字节数据里读出RTCP包。这个Attributes
在后面定义的,是一个map[interface{}]interface{}
,应该是用于传递一些自定义参数的。
pion/interceptor
里还提供了一种简便的构造RTCPReader
的方式:
// RTCPReaderFunc is an adapter for RTCPReader interface
type RTCPReaderFunc func([]byte, Attributes) (int, Attributes, error)
// Read a batch of rtcp packets
func (f RTCPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) {
return f(b, a)
}
这样,只要写好Read
里的代码,可以不用再定义一个RTCPReader
子类,直接把函数放进RTCPReaderFunc
就行。函数式编程思想,很妙。pion/interceptor/pkg
里的几个interceptor都是这么用的。
那么这么看,BindRTCPReader
确实是可以级联的,并且BindRTCPReader
里面要实现的操作也能大概猜得到:
以
BindRTCPReader
输入的RTCPReader
构造自己的RTCPReader
作为输出,在自己的RTCPReader
的Read
函数中:调用
BindRTCPReader
输入的RTCPReader
的Read
函数根据返回的整型值,读取修改后的字节数据,反序列化为RTCP包
修改RTCP包和
Attributes
,或进行一些其他自定义操作(比如记录统计信息、转发、筛选等)把修改后RTCP包序列化到字节数据里(可选)
返回整型值和
Attributes
pion/interceptor/pkg
里的几个interceptor都是这样的,不过它们都没有修改字节数据的操作。
比如在
pion/interceptor/pkg/nack
里,interceptor从字节数据里获取RTCP包,然后判断是不是NACK包,如果是就按照NACK里汇报的丢包情况重发RTCP包再比如在
pion/interceptor/pkg/report
里,interceptor从字节数据里获取RTCP包,然后判断是不是SenderReport包,如果是就存储之
于是,一级一级地调用一串interceptor的BindRTCPReader
,每个BindRTCPReader
都以上一个interceptor的BindRTCPReader
返回的RTCPReader
为输入;输出的RTCPReader
的Read
里面先调用了输入的RTCPReader
的Read
,再进行自定义的修改操作,返回修改后的RTCP包字节数据。这样,最后一个interceptor的BindRTCPReader
输出的RTCPReader
的Read
就是一个顺序执行所有自定义操作的RTCP包处理函数。
BindRTCPWriter
方法
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
BindRTCPWriter(writer RTCPWriter) RTCPWriter
从函数名上看,这是一个绑定
RTCPWriter
的接口从注释上看,这个接口是为了让用户自定义修改输出RTCP数据包的过程
很明显,这个方法在使用时应该和
BindRTCPReader
一样也是可以级联的,级联方式应该也大差不离
// RTCPWriter is used by Interceptor.BindRTCPWriter.
type RTCPWriter interface {
// Write a batch of rtcp packets
Write(pkts []rtcp.Packet, attributes Attributes) (int, error)
}
一股子RTCPReader
的既视感,明显也是可以级联的,要实现的操作应该也差不多:
以
BindRTCPWriter
输入的RTCPWriter
构造自己的RTCPWriter
作为输出,在RTCPWriter
的Write
函数里对输入的rtcp.Packet
列表进行增减(也就是增减要发送的)
但是pion/interceptor/pkg
里的几个interceptor好像都没这样用,它们的BindRTCPWriter
都是直接记录下RTCPWriter
,然后开了个协程写数据:
比如在
pion/interceptor/pkg/nack
里是定期读取接收日志,找到有哪些缺失的包,收集序列号构造NACK包发送再比如在
pion/interceptor/pkg/report
里是定期发送SenderReport包又比如在
pion/interceptor/pkg/twcc
里是定期发送反馈信息
pion/interceptor
里也提供了一种简便的构造RTCPWriter
的方式:
// RTCPWriterFunc is an adapter for RTCPWriter interface
type RTCPWriterFunc func(pkts []rtcp.Packet, attributes Attributes) (int, error)
// Write a batch of rtcp packets
func (f RTCPWriterFunc) Write(pkts []rtcp.Packet, attributes Attributes) (int, error) {
return f(pkts, attributes)
}
和RTCPReaderFunc
一个道理,不必多讲。
BindRemoteStream
方法
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader
// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
UnbindRemoteStream(info *StreamInfo)
绑定和解绑远端流,从方法和注释上看和BindRTCPReader
是类似的,都是用来绑定处理发送出去的数据包的方法的。
这里绑定的RTPReader
和RTCPReader
的Read
函数里的输入参数是一模一样的:
// RTPReader is used by Interceptor.BindRemoteStream.
type RTPReader interface {
// Read a rtp packet
Read([]byte, Attributes) (int, Attributes, error)
}
BindRemoteStream
和BindRTCPReader
唯一的区别在于包处理的方式:RTP包和RTCP包在逻辑上的不同之处在于,RTP包是从属于一个流的连续序列,而RTCP包是一个个独立的包。因此在BindRTCPReader
中,输入的数据直接就是一个RTCPReader
;而BindRemoteStream
不仅需要指定RTPReader
,还需要指定一个存储流信息的StreamInfo
。
这个StreamInfo
长这样:
// StreamInfo is the Context passed when a StreamLocal or StreamRemote has been Binded or Unbinded
type StreamInfo struct {
ID string
Attributes Attributes
SSRC uint32
PayloadType uint8
RTPHeaderExtensions []RTPHeaderExtension
MimeType string
ClockRate uint32
Channels uint16
SDPFmtpLine string
RTCPFeedback []RTCPFeedback
}
// RTPHeaderExtension represents a negotiated RFC5285 RTP header extension.
type RTPHeaderExtension struct {
URI string
ID int
}
// RTCPFeedback signals the connection to use additional RTCP packet types.
// https://draft.ortc.org/#dom-rtcrtcpfeedback
type RTCPFeedback struct {
// Type is the type of feedback.
// see: https://draft.ortc.org/#dom-rtcrtcpfeedback
// valid: ack, ccm, nack, goog-remb, transport-cc
Type string
// The parameter value depends on the type.
// For example, type="nack" parameter="pli" will send Picture Loss Indicator packets.
Parameter string
}
可以看到,这个StreamInfo
里面放的是一些与流有关的配置信息。由于RTP包承载的是流,流中的包可以看成是一个整体,是一系列相互关联的连续包,不像RTCP包那样是一个个独立的包,一会是NACK、一会又是SenderReport。StreamInfo
就是这些连续RTP包中与流相关的标记信息,它可以用来区分RTP包属于哪个流、区分媒体的类型、记录时钟频率等等。
一些重要的StreamInfo
参数:引自《RTP: audio and video for the Internet》
SSRC
:Synchronization Source(SSRC)标识RTP会话中的参与者。它是一个临时的,每个会话的标识符通过RTP控制协议映射到一个长期的规范名称CNAME。SSRC是一个32位整数,由参与者加入会话时随机选择。具有相同SSRC的所有数据包均构成单个时序和序列号空间的一部分,因此接收方必须按SSRC对数据包进行分组才能进行播放。如果参加者在一个RTP会话中生成多个流(例如,来自不同的摄像机),每个流都必须标识为不同的SSRC,以便接收方可以区分哪些数据包属于每个流。PayloadType
:有效负载类型。RTP头的负载类型(或者PT)与RTP传输的媒体数据关联。接收者应用检测负载类型来甄别如何处理数据,例如,传递给特定的解压缩器。MimeType
:有效负载格式。有效负载格式是根据MIME名称空间命名的。该名称空间最初是为电子邮件定义的,用于标识附件的内容,但此后它已成为媒体格式的通用名称空间,并在许多应用程序中使用。所有有效负载格式都应该具有MIME类型注册。更新的有效负载格式将其包含在其各自规范中; 在线维护MIME类型的完整列表,网址为:http://www.iana.org/assignments/media-types。
还有其他的一些参数在RTP包相关的IETF标准里都应该能找到。
pion/interceptor
里也提供了一种简便的构造RTPReader
的方式:
// RTPReaderFunc is an adapter for RTPReader interface
type RTPReaderFunc func([]byte, Attributes) (int, Attributes, error)
// Read a rtp packet
func (f RTPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) {
return f(b, a)
}
和RTCPReaderFunc
一个道理,不必多讲。
BindLocalStream
方法
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter
// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
UnbindLocalStream(info *StreamInfo)
绑定和解绑本地流,从方法和注释上看和BindRTCPWriter
是类似的,都是用来绑定处理发送出去的数据包的方法的,这里绑定的RTPWriter
也和RTCPWriter
大差不离:
// RTPWriter is used by Interceptor.BindLocalStream.
type RTPWriter interface {
// Write a rtp packet
Write(header *rtp.Header, payload []byte, attributes Attributes) (int, error)
}
可以看到,唯一的区别在于包构建方式:从代码上看,RTCPWriter.Write
的输入直接就是rtcp.Packet
的列表;而RTPWriter.Write
的输入是分开的一个包头rtp.Header
和[]byte
格式的内容。这可能是因为RTCP只会传递一些运行状态数据和控制信息,每种包都有自己独特的结构,而RTP包是由一长串媒体数据切开包装而来,结构比较规整,不能给用户随便调整,所以把包头和包内容分了两个变量。
pion/interceptor
里也提供了一种简便的构造RTPWriter
的方式:
// RTPWriterFunc is an adapter for RTPWrite interface
type RTPWriterFunc func(header *rtp.Header, payload []byte, attributes Attributes) (int, error)
// Write a rtp packet
func (f RTPWriterFunc) Write(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
return f(header, payload, attributes)
}
和RTCPReaderFunc
一个道理,不必多讲。
Close
io.Closer
}
1
2
这里好理解,当要销毁这个Interceptor
的时候,必须要解绑所有的RTCPReader
、RTCPWriter
、RTPReader
、RTPWriter
,并且停止所有的相关协程,这个只能由实现Interceptor
的用户来做。所以在这里加上了一个io.Closer
,要求用户自己实现一个Close
方法。
以下是一些关于级联和Interceptor
具体如何调用的知识。
在级联的开头,用户需要自行调用
Read
把包传进级联的Reader里在级联的末尾,用户需要自行在
Write
里写上发送包的函数,把级联的Writer传来的包发送出去
比如NACK发送方接收RTP包就是首先获取到RTPReader
:
// Create the writer just for a single SSRC stream
// this is a callback that is fired everytime a RTP packet is ready to be sent
streamReader := chain.BindRemoteStream(&interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
}, interceptor.RTPReaderFunc(func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { return len(b), nil, nil }))
然后在循环里从UDP处收包之后放进RTPReader.Read
:
i, addr, err := conn.ReadFrom(buffer)
if err != nil {
panic(err)
}
log.Println("Received RTP")
if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
panic(err)
}
由于级联了NACK Interceptor,所以就能执行一些包统计的操作,找出未接收到的RTP包,构造NACK。
然后NACK发送方发NACK包就是写在RTCPWriter.Write
里的:
chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
buf, err := rtcp.Marshal(pkts)
if err != nil {
return 0, err
}
return conn.WriteTo(buf, addr)
}))
这样就能完成“收RTP包——统计丢包——发NACK”的操作。
NACK接收方也是一样,先获取RTCPReader
:
// Set the interceptor wide RTCP Reader
// this is a handle to send NACKs back into the interceptor.
rtcpReader := chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
return len(in), nil, nil
}))
然后也是在循环里UDP收包之后放进RTCPReader.Read
:
i, err := conn.Read(rtcpBuf)
if err != nil {
panic(err)
}
log.Println("Received NACK")
if _, _, err = rtcpWriter.Read(rtcpBuf[:i], nil); err != nil {
panic(err)
}
于是获取到NACK解包出来就知道要重发哪些RTP包了。
然后NACK接收方重发RTP包就是写在RTPWriter.Write
里的:
// Create the writer just for a single SSRC stream
// this is a callback that is fired everytime a RTP packet is ready to be sent
streamWriter := chain.BindLocalStream(&interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
}, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
headerBuf, err := header.Marshal()
if err != nil {
panic(err)
}
return conn.Write(append(headerBuf, payload...))
}))
这样就能完成“接收NACK包——找出需要重发的RTP包——重发RTP包”的操作了。
总结一下Interceptor
的创建过程
首先是继承interceptor.NoOp
,因为实际情况下不一定需要把BindLocalStream
、BindRTCPWriter
、BindRemoteStream
、BindRTCPReader
四个全实现。
实现BindLocalStream
实现一个
RTPWriter
,在其中保存另一个RTPWriter
,并在其Write
函数中调用保存的RTPWriter.Write
实现
BindLocalStream
,将输入的RTPWriter
保存到你实现的RTPWriter
中并返回(常见操作)让你实现的
RTPWriter
可以读到Interceptor
里的数据,然后在BindLocalStream
里开goroutine调用RTPWriter
定期获取Interceptor
里的数据并据此调用保存的另一个RTPWriter
写一些特殊功能的包
实现BindRTCPWriter
同上,只不过RTPWriter
变成RTCPWriter
实现BindRemoteStream
实现一个
RTPReader
,在其中保存另一个RTPReader
,并在其Read
函数中调用保存的RTPReader.Read
实现
BindRemoteStream
,将输入的RTPReader
保存到你实现的RTPReader
中并返回(常见操作)让
RTPReader
可以操作Interceptor
里的数据,从而可以根据RTPReader.Read
输入的数据修改Interceptor
里的数据,进而影响其绑定的RTPWriter
和RTCPWriter
的行为
实现BindRTCPReader
同上,只不过RTPReader
变成RTCPReader