@@ -98,7 +98,7 @@ impl<T: Depacketizer> SampleBuilder<T> {
9898 return false ;
9999 }
100100
101- let mut i = location. tail - 1 ;
101+ let mut i = location. tail . wrapping_sub ( 1 ) ;
102102 while i != location. head {
103103 if let Some ( ref packet) = self . buffer [ i as usize ] {
104104 found_tail = Some ( packet. header . timestamp ) ;
@@ -174,13 +174,18 @@ impl<T: Depacketizer> SampleBuilder<T> {
174174 if self . active . has_data ( ) && ( self . active . head == self . filled . head ) {
175175 // attempt to force the active packet to be consumed even though
176176 // outstanding data may be pending arrival
177- if self . build_sample ( true ) . is_some ( ) {
178- continue ;
177+ let err = match self . build_sample ( true ) {
178+ Ok ( _) => continue ,
179+ Err ( e) => e,
180+ } ;
181+
182+ if !matches ! ( err, BuildError :: InvalidParition ( _) ) {
183+ // In the InvalidParition case `build_sample` will have already adjusted `droppped_packets`.
184+ self . dropped_packets += 1 ;
179185 }
180186
181187 // could not build the sample so drop it
182188 self . active . head = self . active . head . wrapping_add ( 1 ) ;
183- self . dropped_packets += 1 ;
184189 }
185190
186191 self . release_packet ( self . filled . head ) ;
@@ -214,13 +219,16 @@ impl<T: Depacketizer> SampleBuilder<T> {
214219 /// Creates a sample from a valid collection of RTP Packets by
215220 /// walking forwards building a sample if everything looks good clear and
216221 /// update buffer+values
217- fn build_sample ( & mut self , purging_buffers : bool ) -> Option < ( ) > {
222+ fn build_sample (
223+ & mut self ,
224+ purging_buffers : bool ,
225+ ) -> Result < SampleSequenceLocation , BuildError > {
218226 if self . active . empty ( ) {
219227 self . active = self . filled ;
220228 }
221229
222230 if self . active . empty ( ) {
223- return None ;
231+ return Err ( BuildError :: NoActiveSegment ) ;
224232 }
225233
226234 if self . filled . compare ( self . active . tail ) == Comparison :: Inside {
@@ -230,37 +238,49 @@ impl<T: Depacketizer> SampleBuilder<T> {
230238 let mut consume = SampleSequenceLocation :: new ( ) ;
231239
232240 let mut i = self . active . head ;
241+ // `self.active` isn't modified in the loop, fetch the timestamp once and cache it.
242+ let head_timestamp = self . fetch_timestamp ( & self . active ) ;
233243 while let Some ( ref packet) = self . buffer [ i as usize ] {
234244 if self . active . compare ( i) == Comparison :: After {
235245 break ;
236246 }
237- if self
247+ let is_same_timestamp = head_timestamp. map ( |t| packet. header . timestamp == t) ;
248+ let is_different_timestamp = is_same_timestamp. map ( std:: ops:: Not :: not) ;
249+ let is_partition_tail = self
238250 . depacketizer
239- . is_partition_tail ( packet. header . marker , & packet. payload )
240- {
251+ . is_partition_tail ( packet. header . marker , & packet. payload ) ;
252+
253+ // If the timestamp is not the same it might be because the next packet is both a start
254+ // and end of the next parition in which case a sample should be generated now. This
255+ // can happen when padding packets are used .e.g:
256+ //
257+ // p1(t=1), p2(t=1), p3(t=1), p4(t=2, marker=true, start=true)
258+ //
259+ // In thic case the generated sample should be p1 through p3, but excluding p4 which is
260+ // its own sample.
261+ if is_partition_tail && is_same_timestamp. unwrap_or ( true ) {
241262 consume. head = self . active . head ;
242263 consume. tail = i. wrapping_add ( 1 ) ;
243264 break ;
244265 }
245- if let Some ( head_timestamp) = self . fetch_timestamp ( & self . active ) {
246- if packet. header . timestamp != head_timestamp {
247- consume. head = self . active . head ;
248- consume. tail = i;
249- break ;
250- }
266+
267+ if is_different_timestamp. unwrap_or ( false ) {
268+ consume. head = self . active . head ;
269+ consume. tail = i;
270+ break ;
251271 }
252272 i = i. wrapping_add ( 1 ) ;
253273 }
254274
255275 if consume. empty ( ) {
256- return None ;
276+ return Err ( BuildError :: NothingToConsume ) ;
257277 }
258278
259279 if !purging_buffers && self . buffer [ consume. tail as usize ] . is_none ( ) {
260280 // wait for the next packet after this set of packets to arrive
261281 // to ensure at least one post sample timestamp is known
262282 // (unless we have to release right now)
263- return None ;
283+ return Err ( BuildError :: PendingTimestampPacket ) ;
264284 }
265285
266286 let sample_timestamp = self . fetch_timestamp ( & self . active ) . unwrap_or ( 0 ) ;
@@ -274,15 +294,13 @@ impl<T: Depacketizer> SampleBuilder<T> {
274294 }
275295 }
276296
277- // the head set of packets is now fully consumed
278- self . active . head = consume. tail ;
279-
280297 // prior to decoding all the packets, check if this packet
281298 // would end being disposed anyway
282- if !self
283- . depacketizer
284- . is_partition_head ( & self . buffer [ consume. head as usize ] . as_ref ( ) ?. payload )
285- {
299+ let head_payload = self . buffer [ consume. head as usize ]
300+ . as_ref ( )
301+ . map ( |p| & p. payload )
302+ . ok_or ( BuildError :: GapInSegment ) ?;
303+ if !self . depacketizer . is_partition_head ( head_payload) {
286304 // libWebRTC will sometimes send several empty padding packets to smooth out send
287305 // rate. These packets don't carry any media payloads.
288306 let is_padding = consume. range ( & self . buffer ) . all ( |p| {
@@ -298,17 +316,28 @@ impl<T: Depacketizer> SampleBuilder<T> {
298316 }
299317 self . purge_consumed_location ( & consume, true ) ;
300318 self . purge_consumed_buffers ( ) ;
301- return None ;
319+
320+ self . active . head = consume. tail ;
321+ return Err ( BuildError :: InvalidParition ( consume) ) ;
302322 }
303323
324+ // the head set of packets is now fully consumed
325+ self . active . head = consume. tail ;
326+
304327 // merge all the buffers into a sample
305328 let mut data: Vec < u8 > = Vec :: new ( ) ;
306329 let mut i = consume. head ;
307330 while i != consume. tail {
331+ let payload = self . buffer [ i as usize ]
332+ . as_ref ( )
333+ . map ( |p| & p. payload )
334+ . ok_or ( BuildError :: GapInSegment ) ?;
335+
308336 let p = self
309337 . depacketizer
310- . depacketize ( & self . buffer [ i as usize ] . as_ref ( ) ?. payload )
311- . ok ( ) ?;
338+ . depacketize ( payload)
339+ . map_err ( |_| BuildError :: DepacketizerFailed ) ?;
340+
312341 data. extend_from_slice ( & p) ;
313342 i = i. wrapping_add ( 1 ) ;
314343 }
@@ -333,13 +362,14 @@ impl<T: Depacketizer> SampleBuilder<T> {
333362 self . purge_consumed_location ( & consume, true ) ;
334363 self . purge_consumed_buffers ( ) ;
335364
336- Some ( ( ) )
365+ Ok ( consume )
337366 }
338367
339368 /// Compiles pushed RTP packets into media samples and then
340369 /// returns the next valid sample (or None if no sample is compiled).
341370 pub fn pop ( & mut self ) -> Option < Sample > {
342- self . build_sample ( false ) ;
371+ let _ = self . build_sample ( false ) ;
372+
343373 if self . prepared . empty ( ) {
344374 return None ;
345375 }
@@ -381,3 +411,26 @@ pub(crate) fn seqnum_distance(x: u16, y: u16) -> u16 {
381411 diff
382412 }
383413}
414+
415+ #[ derive( Debug ) ]
416+ enum BuildError {
417+ /// There's no active segment of RTP packets to consider yet.
418+ NoActiveSegment ,
419+
420+ /// No sample partition could be found in the active segment.
421+ NothingToConsume ,
422+
423+ /// A segment to consume was identified, but a subsequent packet is needed to determine the
424+ /// duration of the sample.
425+ PendingTimestampPacket ,
426+
427+ /// The active segment's head was not aligned with a sample parition head. Some packets were
428+ /// dropped.
429+ InvalidParition ( SampleSequenceLocation ) ,
430+
431+ /// There was a gap in the active segment because of one or more missing RTP packets.
432+ GapInSegment ,
433+
434+ /// We failed to depacketize an RTP packet.
435+ DepacketizerFailed ,
436+ }
0 commit comments