diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000..01d43dfff --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[env] +SMOLTCP_IFACE_MAX_ADDR_COUNT = "3" +RUST_LOG = "off" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 41ca801d0..19adafb46 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target Cargo.lock *.pcap +/*.md +*.log diff --git a/Cargo.toml b/Cargo.toml index f22497499..4f41c18c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,9 @@ defmt = ["dep:defmt", "heapless/defmt"] # Enable Reno TCP congestion control algorithm, and it is used as a default congestion controller. "socket-tcp-reno" = [] +# Enable BBR TCP congestion control algorithm, and it is used as a default congestion controller. +"socket-tcp-bbr" = [] + "packetmeta-id" = [] "async" = [] @@ -323,6 +326,14 @@ required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interfac name = "benchmark" required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interface", "proto-ipv4", "socket-raw", "socket-udp"] +[[example]] +name = "perf_server" +required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interface", "proto-ipv4", "socket-tcp", "socket-tcp-cubic", "socket-tcp-bbr"] + +[[example]] +name = "perf_client" +required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interface", "proto-ipv4", "socket-tcp", "socket-tcp-cubic", "socket-tcp-bbr"] + [[example]] name = "dhcp_client" required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interface", "proto-ipv4", "proto-dhcpv4", "socket-raw"] diff --git a/examples/client.rs b/examples/client.rs index b089f7b94..e729557db 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -31,7 +31,7 @@ fn main() { // Create interface let mut config = match device.capabilities().medium { Medium::Ethernet => { - Config::new(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into()) + Config::new(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x02]).into()) } Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip), Medium::Ieee802154 => todo!(), @@ -41,13 +41,13 @@ fn main() { let mut iface = Interface::new(config, &mut device, Instant::now()); iface.update_ip_addrs(|ip_addrs| { ip_addrs - .push(IpCidr::new(IpAddress::v4(192, 168, 69, 1), 24)) + .push(IpCidr::new(IpAddress::v4(192, 168, 69, 2), 24)) .unwrap(); ip_addrs - .push(IpCidr::new(IpAddress::v6(0xfdaa, 0, 0, 0, 0, 0, 0, 1), 64)) + .push(IpCidr::new(IpAddress::v6(0xfdaa, 0, 0, 0, 0, 0, 0, 2), 64)) .unwrap(); ip_addrs - .push(IpCidr::new(IpAddress::v6(0xfe80, 0, 0, 0, 0, 0, 0, 1), 64)) + .push(IpCidr::new(IpAddress::v6(0xfe80, 0, 0, 0, 0, 0, 0, 2), 64)) .unwrap(); }); iface diff --git a/examples/perf_client.rs b/examples/perf_client.rs new file mode 100644 index 000000000..6d3d5b006 --- /dev/null +++ b/examples/perf_client.rs @@ -0,0 +1,274 @@ +mod utils; + +use log::info; +use std::os::unix::io::AsRawFd; +use std::str::FromStr; + +use smoltcp::iface::{Config, Interface, SocketSet}; +use smoltcp::phy::{Device, Medium, wait as phy_wait}; +use smoltcp::socket::tcp::{self, CongestionControl, State}; +use smoltcp::time::Instant; +use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr}; + +const BUFFER_SIZE: usize = 6 * 1024 * 1024; + +struct LatencyStats { + samples: Vec, + sum: i64, + count: usize, + min: i64, + max: i64, +} + +impl LatencyStats { + fn new() -> Self { + LatencyStats { + samples: Vec::new(), + sum: 0, + count: 0, + min: i64::MAX, + max: 0, + } + } + + fn add_sample(&mut self, latency_us: i64) { + self.samples.push(latency_us); + self.sum += latency_us; + self.count += 1; + self.min = self.min.min(latency_us); + self.max = self.max.max(latency_us); + } + + fn mean(&self) -> f64 { + if self.count == 0 { + 0.0 + } else { + self.sum as f64 / self.count as f64 + } + } + + fn percentile(&mut self, p: f64) -> i64 { + if self.samples.is_empty() { + return 0; + } + self.samples.sort_unstable(); + let idx = ((p / 100.0) * self.samples.len() as f64) as usize; + self.samples[idx.min(self.samples.len() - 1)] + } + + fn print_summary(&mut self) { + if self.count == 0 { + info!("No latency samples collected"); + return; + } + + info!(""); + info!("Latency Statistics:"); + info!(" Samples: {}", self.count); + info!(" Min: {:.3} ms", self.min as f64 / 1000.0); + info!(" Mean: {:.3} ms", self.mean() / 1000.0); + info!(" p50: {:.3} ms", self.percentile(50.0) as f64 / 1000.0); + info!(" p95: {:.3} ms", self.percentile(95.0) as f64 / 1000.0); + info!(" p99: {:.3} ms", self.percentile(99.0) as f64 / 1000.0); + info!(" Max: {:.3} ms", self.max as f64 / 1000.0); + } +} + +fn parse_congestion_control(s: &str) -> CongestionControl { + match s.to_lowercase().as_str() { + "none" => CongestionControl::None, + #[cfg(feature = "socket-tcp-reno")] + "reno" => CongestionControl::Reno, + #[cfg(feature = "socket-tcp-cubic")] + "cubic" => CongestionControl::Cubic, + #[cfg(feature = "socket-tcp-bbr")] + "bbr" => CongestionControl::Bbr, + _ => { + eprintln!("Unknown congestion control algorithm: {}", s); + eprintln!("Available options:"); + eprintln!(" none"); + #[cfg(feature = "socket-tcp-reno")] + eprintln!(" reno"); + #[cfg(feature = "socket-tcp-cubic")] + eprintln!(" cubic"); + #[cfg(feature = "socket-tcp-bbr")] + eprintln!(" bbr"); + std::process::exit(1); + } + } +} + +fn main() { + utils::setup_logging("info"); + + let (mut opts, mut free) = utils::create_options(); + utils::add_tuntap_options(&mut opts, &mut free); + utils::add_middleware_options(&mut opts, &mut free); + + opts.optopt("c", "congestion", "Congestion control algorithm (none/reno/cubic/bbr)", "ALGO"); + opts.optopt("s", "server", "Server address", "ADDRESS"); + opts.optopt("p", "port", "Server port", "PORT"); + + let mut matches = utils::parse_options(&opts, vec![]); + + let cc_algo = parse_congestion_control( + &matches.opt_str("c").unwrap_or_else(|| { + #[cfg(feature = "socket-tcp-bbr")] + { "bbr".to_string() } + #[cfg(not(feature = "socket-tcp-bbr"))] + #[cfg(feature = "socket-tcp-cubic")] + { "cubic".to_string() } + #[cfg(not(any(feature = "socket-tcp-bbr", feature = "socket-tcp-cubic")))] + #[cfg(feature = "socket-tcp-reno")] + { "reno".to_string() } + #[cfg(not(any(feature = "socket-tcp-bbr", feature = "socket-tcp-cubic", feature = "socket-tcp-reno")))] + { "none".to_string() } + }) + ); + + let server_addr = IpAddress::from_str( + &matches.opt_str("s").unwrap_or_else(|| "192.168.69.1".to_string()) + ).expect("invalid server address"); + + let server_port = matches.opt_str("p") + .and_then(|s| u16::from_str(&s).ok()) + .unwrap_or(8000); + + let device = utils::parse_tuntap_options(&mut matches); + let fd = device.as_raw_fd(); + let mut device = utils::parse_middleware_options(&mut matches, device, false); + + // Create interface + let mut config = match device.capabilities().medium { + Medium::Ethernet => { + Config::new(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x02]).into()) + } + Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip), + Medium::Ieee802154 => todo!(), + }; + config.random_seed = rand::random(); + + let mut iface = Interface::new(config, &mut device, Instant::now()); + iface.update_ip_addrs(|ip_addrs| { + ip_addrs + .push(IpCidr::new(IpAddress::v4(192, 168, 69, 2), 24)) + .unwrap(); + }); + + // Create TCP socket with large buffers + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; BUFFER_SIZE]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; BUFFER_SIZE]); + let mut tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); + + // Set congestion control algorithm + tcp_socket.set_congestion_control(cc_algo); + + let mut sockets = SocketSet::new(vec![]); + let tcp_handle = sockets.add(tcp_socket); + + info!("Performance Client"); + info!("=================="); + info!("Congestion Control: {:?}", cc_algo); + info!("Connecting to: {}:{}", server_addr, server_port); + info!("Buffer size: {} bytes", BUFFER_SIZE); + info!(""); + + // Connect to server + let socket = sockets.get_mut::(tcp_handle); + socket.connect(iface.context(), (server_addr, server_port), 49500).unwrap(); + + let mut bytes_received = 0usize; + let mut start_time: Option = None; + let mut last_report = Instant::now(); + let mut tcp_active = false; + let mut latency_stats = LatencyStats::new(); + let mut sample_interval = 0; // Sample every Nth chunk + + loop { + let timestamp = Instant::now(); + iface.poll(timestamp, &mut device, &mut sockets); + + let socket = sockets.get_mut::(tcp_handle); + + // Track connection state + if socket.is_active() && !tcp_active { + info!("Connected to server"); + start_time = Some(timestamp); + bytes_received = 0; + last_report = timestamp; + latency_stats = LatencyStats::new(); + } else if !socket.is_active() && tcp_active { + if let Some(start) = start_time { + let elapsed = (timestamp - start).total_millis() as f64 / 1000.0; + let throughput_gbps = (bytes_received as f64 * 8.0) / elapsed / 1e9; + info!(""); + info!("Connection closed"); + info!("=================="); + info!("Total received: {:.2} MB", bytes_received as f64 / 1e6); + info!("Time: {:.2} seconds", elapsed); + info!("Throughput: {:.3} Gbps", throughput_gbps); + latency_stats.print_summary(); + } + break; + } + tcp_active = socket.is_active(); + + // Check if server has closed (received FIN) + if socket.state() == State::CloseWait { + // Server has closed its side, close our side too + socket.close(); + } + + // Receive data + if socket.may_recv() { + let recv_result = socket.recv(|buffer| { + let len = buffer.len(); + + // Sample latency from timestamps in the data + // Extract timestamp from every 100000th 8-byte chunk to minimize overhead + if len >= 8 { + for chunk in buffer.chunks_exact(8) { + sample_interval += 1; + if sample_interval >= 100000 { + sample_interval = 0; + + let mut ts_bytes = [0u8; 8]; + ts_bytes.copy_from_slice(chunk); + let sent_time_us = i64::from_le_bytes(ts_bytes); + let now_us = timestamp.total_micros(); + + // Calculate one-way delay (approximation) + let latency_us = now_us - sent_time_us; + + // Only record reasonable latency values (< 10 seconds) + if latency_us > 0 && latency_us < 10_000_000 { + latency_stats.add_sample(latency_us); + } + break; // Only sample once per recv + } + } + } + + (len, len) + }).unwrap(); + + bytes_received += recv_result; + + // Report progress every 5 seconds + if (timestamp - last_report).total_millis() >= 5000 { + if let Some(start) = start_time { + let elapsed = (timestamp - start).total_millis() as f64 / 1000.0; + if elapsed > 0.0 { + let throughput_gbps = (bytes_received as f64 * 8.0) / elapsed / 1e9; + let avg_latency = latency_stats.mean() / 1000.0; // Convert to ms + info!("{:.2} MB received | {:.3} Gbps | Avg Latency: {:.3} ms", + bytes_received as f64 / 1e6, throughput_gbps, avg_latency); + } + } + last_report = timestamp; + } + } + + phy_wait(fd, iface.poll_delay(timestamp, &sockets)).expect("wait error"); + } +} diff --git a/examples/perf_server.rs b/examples/perf_server.rs new file mode 100644 index 000000000..144f85b59 --- /dev/null +++ b/examples/perf_server.rs @@ -0,0 +1,206 @@ +mod utils; + +use log::info; +use std::os::unix::io::AsRawFd; +use std::str::FromStr; + +use smoltcp::iface::{Config, Interface, SocketSet}; +use smoltcp::phy::{Device, Medium, wait as phy_wait}; +use smoltcp::socket::tcp::{self, CongestionControl}; +use smoltcp::time::Instant; +use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr}; + +const BUFFER_SIZE: usize = 6 * 1024 * 1024; +const DATA_SIZE: usize = 3 * 1024 * 1024 * 1024; // 3 GB total + +fn parse_congestion_control(s: &str) -> CongestionControl { + match s.to_lowercase().as_str() { + "none" => CongestionControl::None, + #[cfg(feature = "socket-tcp-reno")] + "reno" => CongestionControl::Reno, + #[cfg(feature = "socket-tcp-cubic")] + "cubic" => CongestionControl::Cubic, + #[cfg(feature = "socket-tcp-bbr")] + "bbr" => CongestionControl::Bbr, + _ => { + eprintln!("Unknown congestion control algorithm: {}", s); + eprintln!("Available options:"); + eprintln!(" none"); + #[cfg(feature = "socket-tcp-reno")] + eprintln!(" reno"); + #[cfg(feature = "socket-tcp-cubic")] + eprintln!(" cubic"); + #[cfg(feature = "socket-tcp-bbr")] + eprintln!(" bbr"); + std::process::exit(1); + } + } +} + +fn main() { + utils::setup_logging("info"); + + let (mut opts, mut free) = utils::create_options(); + utils::add_tuntap_options(&mut opts, &mut free); + utils::add_middleware_options(&mut opts, &mut free); + + opts.optopt("c", "congestion", "Congestion control algorithm (none/reno/cubic/bbr)", "ALGO"); + opts.optopt("p", "port", "Port to listen on", "PORT"); + + free.push(""); // Make port optional via flag + + let mut matches = utils::parse_options(&opts, vec![]); + + let cc_algo = parse_congestion_control( + &matches.opt_str("c").unwrap_or_else(|| { + #[cfg(feature = "socket-tcp-bbr")] + { "bbr".to_string() } + #[cfg(not(feature = "socket-tcp-bbr"))] + #[cfg(feature = "socket-tcp-cubic")] + { "cubic".to_string() } + #[cfg(not(any(feature = "socket-tcp-bbr", feature = "socket-tcp-cubic")))] + #[cfg(feature = "socket-tcp-reno")] + { "reno".to_string() } + #[cfg(not(any(feature = "socket-tcp-bbr", feature = "socket-tcp-cubic", feature = "socket-tcp-reno")))] + { "none".to_string() } + }) + ); + + let port = matches.opt_str("p") + .and_then(|s| u16::from_str(&s).ok()) + .unwrap_or(8000); + + let device = utils::parse_tuntap_options(&mut matches); + let fd = device.as_raw_fd(); + let mut device = utils::parse_middleware_options(&mut matches, device, false); + + // Create interface + let mut config = match device.capabilities().medium { + Medium::Ethernet => { + Config::new(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into()) + } + Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip), + Medium::Ieee802154 => todo!(), + }; + config.random_seed = rand::random(); + + let mut iface = Interface::new(config, &mut device, Instant::now()); + iface.update_ip_addrs(|ip_addrs| { + ip_addrs + .push(IpCidr::new(IpAddress::v4(192, 168, 69, 1), 24)) + .unwrap(); + }); + + // Create TCP socket with large buffers + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; BUFFER_SIZE]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; BUFFER_SIZE]); + let mut tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); + + // Set congestion control algorithm + tcp_socket.set_congestion_control(cc_algo); + + let mut sockets = SocketSet::new(vec![]); + let tcp_handle = sockets.add(tcp_socket); + + info!("Performance Server"); + info!("=================="); + info!("Congestion Control: {:?}", cc_algo); + info!("Listening on port: {}", port); + info!("Buffer size: {} bytes", BUFFER_SIZE); + info!("Total data to send: {} GB", DATA_SIZE / (1024 * 1024 * 1024)); + info!(""); + + let mut bytes_sent = 0usize; + let mut start_time: Option = None; + let mut last_report = Instant::now(); + let mut close_called = false; + let mut close_time: Option = None; + + loop { + let timestamp = Instant::now(); + iface.poll(timestamp, &mut device, &mut sockets); + + let socket = sockets.get_mut::(tcp_handle); + + // Listen for connections if not open + if !socket.is_open() { + socket.listen(port).unwrap(); + bytes_sent = 0; + start_time = None; + } + + // Track connection state + if socket.is_active() && !close_called { + if !start_time.is_some() { + info!("Client connected"); + start_time = Some(timestamp); + bytes_sent = 0; + last_report = timestamp; + } + } + + // Exit after close and grace period + if close_called { + if let Some(close) = close_time { + if (timestamp - close).total_millis() >= 1000 { + if let Some(start) = start_time { + let elapsed = (timestamp - start).total_millis() as f64 / 1000.0; + let throughput_gbps = (bytes_sent as f64 * 8.0) / elapsed / 1e9; + info!(""); + info!("Test Complete"); + info!("============="); + info!("Total sent: {:.2} MB", bytes_sent as f64 / 1e6); + info!("Time: {:.2} seconds", elapsed); + info!("Throughput: {:.3} Gbps", throughput_gbps); + } + break; + } + } + } + + // Send data if connected + if socket.can_send() && bytes_sent < DATA_SIZE { + let sent = socket + .send(|buffer| { + let to_send = std::cmp::min(buffer.len(), DATA_SIZE - bytes_sent); + + // Fill buffer with timestamp every 8 bytes for latency measurement + let now_micros = timestamp.total_micros(); + for chunk in buffer[..to_send].chunks_mut(8) { + let timestamp_bytes = now_micros.to_le_bytes(); + let copy_len = std::cmp::min(chunk.len(), 8); + chunk[..copy_len].copy_from_slice(×tamp_bytes[..copy_len]); + } + + (to_send, to_send) + }) + .unwrap(); + + bytes_sent += sent; + + // Report progress every 5 seconds + if (timestamp - last_report).total_millis() >= 5000 { + if let Some(start) = start_time { + let elapsed = (timestamp - start).total_millis() as f64 / 1000.0; + if elapsed > 0.0 { + let throughput_gbps = (bytes_sent as f64 * 8.0) / elapsed / 1e9; + let progress = (bytes_sent as f64 / DATA_SIZE as f64) * 100.0; + info!("Progress: {:.1}% | {:.2} MB sent | {:.3} Gbps", + progress, bytes_sent as f64 / 1e6, throughput_gbps); + } + } + last_report = timestamp; + } + + // Close after sending all data + if bytes_sent >= DATA_SIZE && !close_called { + info!("Finished sending {} bytes, closing connection", bytes_sent); + socket.close(); + close_called = true; + close_time = Some(timestamp); + } + } + + phy_wait(fd, iface.poll_delay(timestamp, &sockets)).expect("wait error"); + } +} diff --git a/scripts/cleanup.sh b/scripts/cleanup.sh new file mode 100755 index 000000000..e77177ec9 --- /dev/null +++ b/scripts/cleanup.sh @@ -0,0 +1,5 @@ +#!/usr/bin/bash + +sudo ip link delete dev tap0 +sudo ip link delete dev tap1 +sudo ip link delete dev br0 diff --git a/scripts/emulate_setup.sh b/scripts/emulate_setup.sh new file mode 100755 index 000000000..a22713664 --- /dev/null +++ b/scripts/emulate_setup.sh @@ -0,0 +1,61 @@ +#!/usr/bin/bash + +# Set up tap0 +sudo ip tuntap add name tap0 mode tap user $USER +sudo ip link set tap0 up +sudo ip addr add 192.168.69.1/24 dev tap0 + +# Set up tap1 +sudo ip tuntap add name tap1 mode tap user $USER +sudo ip link set tap1 up +sudo ip addr add 192.168.69.2/24 dev tap1 + +# Create a bridge +sudo ip link add name br0 type bridge + +# Add both TAP interfaces to the bridge +sudo ip link set tap0 master br0 +sudo ip link set tap1 master br0 + +# Bring up the bridge +sudo ip link set br0 up + +# Network emulation parameters (customize these!) +DELAY="10ms" # One-way delay (RTT will be 2x this) +BANDWIDTH="400mbit" # Bandwidth limit +BUFFER_PACKETS="4000" # Buffer size in packets (router queue) +MTU=1500 # Maximum transmission unit +NETEM_LIMIT="4000" # Netem queue limit in packets (must be large for high BDP!) + +# Calculate buffer size in bytes +BUFFER_BYTES=$((BUFFER_PACKETS * MTU)) + +# Calculate burst size (should be at least rate/HZ, typically rate/10 for smoother shaping) +# For 10mbit: 10000000/8/10 = 125000 bytes +BURST=$((10 * MTU)) + +# Apply traffic control on tap0 (server side) +# netem for delay with large buffer to handle BDP +sudo tc qdisc add dev tap0 root handle 1: netem delay $DELAY limit $NETEM_LIMIT +# tbf for bandwidth limiting with tail-drop +sudo tc qdisc add dev tap0 parent 1:1 handle 10: tbf rate $BANDWIDTH burst $BURST limit $BUFFER_BYTES + +# Apply traffic control on tap1 (client side) +sudo tc qdisc add dev tap1 root handle 1: netem delay $DELAY limit $NETEM_LIMIT +sudo tc qdisc add dev tap1 parent 1:1 handle 10: tbf rate $BANDWIDTH burst $BURST limit $BUFFER_BYTES + +echo "Network emulation setup complete:" +echo " - Delay: $DELAY per direction (RTT: ~$((2 * ${DELAY%ms}))ms)" +echo " - Bandwidth: $BANDWIDTH" +echo " - Buffer: $BUFFER_PACKETS packets ($BUFFER_BYTES bytes)" +echo " - Loss: Only on buffer overflow (tail-drop)" +echo "" +echo "BDP calculation for reference:" +BDP_BYTES=$((${BANDWIDTH%mbit} * 1000000 / 8 * ${DELAY%ms} / 1000)) +BDP_PACKETS=$((BDP_BYTES / MTU)) +echo " - Bandwidth-Delay Product: ~$BDP_PACKETS packets ($BDP_BYTES bytes)" +if [ $BUFFER_PACKETS -lt $BDP_PACKETS ]; then + echo " ⚠ Warning: Buffer < BDP, expect losses even at full link utilization" +else + echo " ✓ Buffer >= BDP, good for testing" +fi diff --git a/scripts/perf_client_run.sh b/scripts/perf_client_run.sh new file mode 100755 index 000000000..a17766d7d --- /dev/null +++ b/scripts/perf_client_run.sh @@ -0,0 +1,8 @@ +#!/usr/bin/bash + +# Usage: ./scripts/perf_client_run.sh [bbr|cubic|reno|none] +# Default is bbr + +ALGO=${1:-bbr} + +SMOLTCP_IFACE_MAX_ADDR_COUNT=3 ./target/release/examples/perf_client --tap tap1 -c $ALGO -s 192.168.69.1 -p 8000 diff --git a/scripts/perf_server_run.sh b/scripts/perf_server_run.sh new file mode 100755 index 000000000..d89ee46cb --- /dev/null +++ b/scripts/perf_server_run.sh @@ -0,0 +1,8 @@ +#!/usr/bin/bash + +# Usage: ./scripts/perf_server_run.sh [bbr|cubic|reno|none] +# Default is bbr + +ALGO=${1:-bbr} + +SMOLTCP_IFACE_MAX_ADDR_COUNT=3 ./target/release/examples/perf_server --tap tap0 -c $ALGO diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 2d2de388f..ef885a08f 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -161,6 +161,11 @@ const RTTE_MIN_RTO: u32 = 1000; // seconds const RTTE_MAX_RTO: u32 = 60_000; +// BBR: Window length for min_rtt filter (in seconds) +// This matches the Linux kernel BBR implementation (tcp_bbr.c:135) +#[cfg(feature = "socket-tcp-bbr")] +const BBR_MIN_RTT_WIN_SEC: u64 = 10; + #[derive(Debug, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] struct RttEstimator { @@ -176,6 +181,12 @@ struct RttEstimator { timestamp: Option<(Instant, TcpSeqNumber)>, max_seq_sent: Option, rto_count: u8, + /// BBR: Minimum RTT observed in the last 10 seconds + #[cfg(feature = "socket-tcp-bbr")] + min_rtt_value: u32, + /// BBR: Timestamp when min_rtt was last updated + #[cfg(feature = "socket-tcp-bbr")] + min_rtt_stamp: Option, } impl Default for RttEstimator { @@ -188,6 +199,10 @@ impl Default for RttEstimator { timestamp: None, max_seq_sent: None, rto_count: 0, + #[cfg(feature = "socket-tcp-bbr")] + min_rtt_value: u32::MAX, + #[cfg(feature = "socket-tcp-bbr")] + min_rtt_stamp: None, } } } @@ -197,7 +212,33 @@ impl RttEstimator { Duration::from_millis(self.rto as _) } - fn sample(&mut self, new_rtt: u32) { + #[cfg(feature = "socket-tcp-bbr")] + pub(super) fn min_rtt(&self) -> Duration { + // Return the actual minimum RTT observed in the window, not SRTT + // If no measurement yet, fall back to SRTT as a reasonable estimate + if self.min_rtt_value == u32::MAX { + Duration::from_millis(self.srtt as _) + } else { + Duration::from_millis(self.min_rtt_value as _) + } + } + + #[cfg(feature = "socket-tcp-bbr")] + pub(super) fn is_min_rtt_expired(&self, now: Instant) -> bool { + // Check if the min_rtt window has expired (10 seconds) + // This matches Linux kernel BBR behavior (tcp_bbr.c:948-949) + if let Some(stamp) = self.min_rtt_stamp { + if now >= stamp { + (now - stamp) > Duration::from_secs(BBR_MIN_RTT_WIN_SEC) + } else { + false // Time went backwards, don't consider expired + } + } else { + true // No measurement yet, consider expired + } + } + + fn sample(&mut self, new_rtt: u32, #[allow(unused_variables)] now: Instant) { if self.have_measurement { // RFC 6298 (2.3) When a subsequent RTT measurement R' is made, a host MUST set (...) let diff = (self.srtt as i32 - new_rtt as i32).unsigned_abs(); @@ -216,6 +257,27 @@ impl RttEstimator { self.rto_count = 0; + // BBR: Track minimum RTT in a sliding 10-second window + // This matches Linux kernel BBR behavior (tcp_bbr.c:947-955) + #[cfg(feature = "socket-tcp-bbr")] + { + let expired = self.is_min_rtt_expired(now); + + // Update min_rtt if: + // 1. New sample is lower than current minimum, OR + // 2. The window has expired (need fresh measurement) + if new_rtt < self.min_rtt_value || expired { + self.min_rtt_value = new_rtt; + self.min_rtt_stamp = Some(now); + + tcp_trace!( + "rtte: min_rtt updated to {:?}ms (expired={})", + new_rtt, + expired + ); + } + } + tcp_trace!( "rtte: sample={:?} srtt={:?} rttvar={:?} rto={:?}", new_rtt, @@ -242,7 +304,7 @@ impl RttEstimator { fn on_ack(&mut self, timestamp: Instant, seq: TcpSeqNumber) { if let Some((sent_timestamp, sent_seq)) = self.timestamp { if seq >= sent_seq { - self.sample((timestamp - sent_timestamp).total_millis() as u32); + self.sample((timestamp - sent_timestamp).total_millis() as u32, timestamp); self.timestamp = None; } } @@ -454,6 +516,9 @@ pub enum CongestionControl { #[cfg(feature = "socket-tcp-cubic")] Cubic, + + #[cfg(feature = "socket-tcp-bbr")] + Bbr, } /// A Transmission Control Protocol socket. @@ -533,6 +598,10 @@ pub struct Socket<'a> { /// The congestion control algorithm. congestion_controller: congestion::AnyController, + /// Pacing: next time a packet can be sent (for rate limiting). + /// If None, pacing is not active. + pacing_next_send_at: Option, + /// tsval generator - if some, tcp timestamp is enabled tsval_generator: Option, @@ -605,6 +674,7 @@ impl<'a> Socket<'a> { tsval_generator: None, last_remote_tsval: 0, congestion_controller: congestion::AnyController::new(), + pacing_next_send_at: None, #[cfg(feature = "async")] rx_waker: WakerRegistration::new(), @@ -657,6 +727,9 @@ impl<'a> Socket<'a> { #[cfg(feature = "socket-tcp-cubic")] CongestionControl::Cubic => AnyController::Cubic(cubic::Cubic::new()), + + #[cfg(feature = "socket-tcp-bbr")] + CongestionControl::Bbr => AnyController::Bbr(bbr::Bbr::new()), } } @@ -672,6 +745,9 @@ impl<'a> Socket<'a> { #[cfg(feature = "socket-tcp-cubic")] AnyController::Cubic(_) => CongestionControl::Cubic, + + #[cfg(feature = "socket-tcp-bbr")] + AnyController::Bbr(_) => CongestionControl::Bbr, } } @@ -1794,9 +1870,17 @@ impl<'a> Socket<'a> { } self.rtte.on_ack(cx.now(), ack_number); + // bytes_in_flight should be the bytes remaining in flight AFTER this ACK + // This is: (last sent seq) - (seq being ACKed) = data sent but not yet ACKed + // This excludes data still buffered but not yet sent (important for BBR Drain exit) + let bytes_in_flight = if self.remote_last_seq >= ack_number { + (self.remote_last_seq - ack_number) as usize + } else { + 0 + }; self.congestion_controller .inner_mut() - .on_ack(cx.now(), ack_len, &self.rtte); + .on_ack(cx.now(), ack_len, &self.rtte, bytes_in_flight); } } @@ -2368,6 +2452,12 @@ impl<'a> Socket<'a> { .inner_mut() .pre_transmit(cx.now()); + // Notify congestion controller about available data for app-limited tracking + let bytes_available = self.tx_buffer.len(); + self.congestion_controller + .inner_mut() + .on_send_ready(cx.now(), bytes_available); + // Check if any state needs to be changed because of a timer. if self.timed_out(cx.now()) { // If a timeout expires, we should abort the connection. @@ -2402,6 +2492,15 @@ impl<'a> Socket<'a> { return Ok(()); } + // Check if pacing delays transmission + if let Some(next_send_at) = self.pacing_next_send_at { + if cx.now() < next_send_at && self.seq_to_transmit(cx) { + // Pacing prevents us from sending data right now + tcp_trace!("pacing delayed until {:?}", next_send_at); + return Ok(()); + } + } + // Decide whether we're sending a packet. if self.seq_to_transmit(cx) { // If we have data to transmit and it fits into partner's window, do it. @@ -2643,6 +2742,26 @@ impl<'a> Socket<'a> { self.congestion_controller .inner_mut() .post_transmit(cx.now(), repr.segment_len()); + + // Update pacing: calculate when the next packet can be sent + let pacing_rate = self.congestion_controller.inner_mut().pacing_rate(); + if pacing_rate > 0 { + // Calculate delay: (packet_size_bytes * 1_000_000) / pacing_rate_bytes_per_sec + // This gives us microseconds until the next packet can be sent + let packet_size = repr.segment_len() as u64; + let delay_micros = (packet_size * 1_000_000) / pacing_rate; + let delay = crate::time::Duration::from_micros(delay_micros); + self.pacing_next_send_at = Some(cx.now() + delay); + tcp_trace!( + "pacing: sent {} bytes at rate {} bytes/s, next send at {:?}", + packet_size, + pacing_rate, + self.pacing_next_send_at + ); + } else { + // No pacing + self.pacing_next_send_at = None; + } } if repr.segment_len() > 0 && !self.timer.is_retransmit() { @@ -2680,7 +2799,11 @@ impl<'a> Socket<'a> { PollAt::Now } else if self.seq_to_transmit(cx) { // We have a data or flag packet to transmit. - PollAt::Now + // Check if pacing delays it. + match self.pacing_next_send_at { + Some(next_send_at) if cx.now() < next_send_at => PollAt::Time(next_send_at), + _ => PollAt::Now, + } } else if self.window_to_update() { // The receive window has been raised significantly. PollAt::Now @@ -8570,9 +8693,11 @@ mod test { 2076, 2060, 2048, 2036, 2028, 2024, 2020, 2016, 2012, 2012, ]; + let mut now = Instant::from_millis(0); for &rto in rtos { - r.sample(2000); + r.sample(2000, now); assert_eq!(r.retransmission_timeout(), Duration::from_millis(rto)); + now += Duration::from_millis(100); // Advance time } } diff --git a/src/socket/tcp/congestion.rs b/src/socket/tcp/congestion.rs index c904f214f..95454190f 100644 --- a/src/socket/tcp/congestion.rs +++ b/src/socket/tcp/congestion.rs @@ -10,6 +10,9 @@ pub(super) mod cubic; #[cfg(feature = "socket-tcp-reno")] pub(super) mod reno; +#[cfg(feature = "socket-tcp-bbr")] +pub(super) mod bbr; + #[allow(unused_variables)] pub(super) trait Controller { /// Returns the number of bytes that can be sent. @@ -18,7 +21,7 @@ pub(super) trait Controller { /// Set the remote window size. fn set_remote_window(&mut self, remote_window: usize) {} - fn on_ack(&mut self, now: Instant, len: usize, rtt: &RttEstimator) {} + fn on_ack(&mut self, now: Instant, len: usize, rtt: &RttEstimator, bytes_in_flight: usize) {} fn on_retransmit(&mut self, now: Instant) {} @@ -30,10 +33,23 @@ pub(super) trait Controller { /// Set the maximum segment size. fn set_mss(&mut self, mss: usize) {} + + /// Called when the socket is about to send data. + /// `bytes_available` indicates how many bytes are waiting in the send buffer. + /// This allows the congestion controller to track whether the application + /// is app-limited (not enough data to send) or cwnd-limited. + fn on_send_ready(&mut self, now: Instant, bytes_available: usize) {} + + /// Returns the pacing rate in bytes per second. + /// Returns 0 if pacing is not supported or not active. + fn pacing_rate(&self) -> u64 { + 0 + } } #[derive(Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[allow(clippy::large_enum_variant)] pub(super) enum AnyController { None(no_control::NoControl), @@ -42,15 +58,20 @@ pub(super) enum AnyController { #[cfg(feature = "socket-tcp-cubic")] Cubic(cubic::Cubic), + + #[cfg(feature = "socket-tcp-bbr")] + Bbr(bbr::Bbr), } impl AnyController { /// Create a new congestion controller. /// `AnyController::new()` selects the best congestion controller based on the features. /// + /// - If `socket-tcp-bbr` feature is enabled, it will use `Bbr`. /// - If `socket-tcp-cubic` feature is enabled, it will use `Cubic`. /// - If `socket-tcp-reno` feature is enabled, it will use `Reno`. - /// - If both `socket-tcp-cubic` and `socket-tcp-reno` features are enabled, it will use `Cubic`. + /// - Priority: BBR > Cubic > Reno > NoControl + /// - `BBR` is optimized for high bandwidth-delay product networks. /// - `Cubic` is more efficient regarding throughput. /// - `Reno` is more conservative and is suitable for low-power devices. /// - If no congestion controller is available, it will use `NoControl`. @@ -60,6 +81,11 @@ impl AnyController { #[allow(unreachable_code)] #[inline] pub fn new() -> Self { + #[cfg(feature = "socket-tcp-bbr")] + { + return AnyController::Bbr(bbr::Bbr::new()); + } + #[cfg(feature = "socket-tcp-cubic")] { return AnyController::Cubic(cubic::Cubic::new()); @@ -83,6 +109,9 @@ impl AnyController { #[cfg(feature = "socket-tcp-cubic")] AnyController::Cubic(c) => c, + + #[cfg(feature = "socket-tcp-bbr")] + AnyController::Bbr(b) => b, } } @@ -96,6 +125,9 @@ impl AnyController { #[cfg(feature = "socket-tcp-cubic")] AnyController::Cubic(c) => c, + + #[cfg(feature = "socket-tcp-bbr")] + AnyController::Bbr(b) => b, } } } diff --git a/src/socket/tcp/congestion/bbr.rs b/src/socket/tcp/congestion/bbr.rs new file mode 100644 index 000000000..2139dbe42 --- /dev/null +++ b/src/socket/tcp/congestion/bbr.rs @@ -0,0 +1,1039 @@ +use crate::time::{Duration, Instant}; + +use super::{Controller, RttEstimator}; + +mod bw_estimation; +mod min_max; + +use bw_estimation::BandwidthEstimation; + +/// Experimental BBR congestion control algorithm. +/// +/// Aims for reduced buffer bloat and improved performance over high bandwidth-delay product networks. +/// Based on google's quiche implementation +/// of BBR . +#[derive(Debug, Clone)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct Bbr { + max_bandwidth: BandwidthEstimation, + acked_bytes: u64, + mode: Mode, + loss_state: LossState, + recovery_state: RecoveryState, + recovery_window: usize, + is_at_full_bandwidth: bool, + pacing_gain: f32, + high_gain: f32, + drain_gain: f32, + cwnd_gain: f32, + high_cwnd_gain: f32, + last_cycle_start: Option, + current_cycle_offset: u8, + init_cwnd: usize, + min_cwnd: usize, + prev_in_flight_count: usize, + exit_probe_rtt_at: Option, + probe_rtt_last_started_at: Option, + min_rtt: Duration, + // Idle restart flag: set when restarting after idle period + // This matches Linux BBR (tcp_bbr.c:101) + idle_restart: bool, + pacing_rate: u64, + max_acked_packet_number: u64, + max_sent_packet_number: u64, + end_recovery_at_packet_number: u64, + cwnd: usize, + current_round_trip_end_packet_number: u64, + round_count: u64, + bw_at_last_round: u64, + round_wo_bw_gain: u64, + ack_aggregation: AckAggregationState, + rwnd: usize, + // Simple linear congruential generator for randomness (no_std compatible) + rng_state: u32, + // App-limited tracking: true when the application doesn't have enough data to fill cwnd + app_limited: bool, + // Prior cwnd before loss recovery (for restoration after recovery exits) + prior_cwnd: usize, + // Packet conservation flag: follow packet conservation principle during first round of recovery + // This matches Linux BBR (tcp_bbr.c:99) + packet_conservation: bool, + // Previous congestion avoidance state for tracking recovery entry/exit + // This matches Linux BBR (tcp_bbr.c:98) but simplified to bool (in_recovery) + prev_in_recovery: bool, + // Round start flag: indicates if we've started a new round trip + // This matches Linux BBR (tcp_bbr.c:100) + round_start: bool, +} + +impl Bbr { + pub fn new() -> Self { + let initial_window = 1024 * 10; + let min_window = 1024 * 2; + Self { + max_bandwidth: BandwidthEstimation::default(), + acked_bytes: 0, + mode: Mode::Startup, + loss_state: Default::default(), + recovery_state: RecoveryState::NotInRecovery, + recovery_window: 0, + is_at_full_bandwidth: false, + pacing_gain: K_DEFAULT_HIGH_GAIN, + high_gain: K_DEFAULT_HIGH_GAIN, + drain_gain: 1.0 / K_DEFAULT_HIGH_GAIN, + cwnd_gain: K_DEFAULT_HIGH_GAIN, + high_cwnd_gain: K_DEFAULT_HIGH_GAIN, + last_cycle_start: None, + current_cycle_offset: 0, + init_cwnd: initial_window, + min_cwnd: min_window, + prev_in_flight_count: 0, + exit_probe_rtt_at: None, + probe_rtt_last_started_at: None, + min_rtt: Duration::ZERO, + idle_restart: false, + pacing_rate: 0, + max_acked_packet_number: 0, + max_sent_packet_number: 0, + end_recovery_at_packet_number: 0, + cwnd: initial_window, + current_round_trip_end_packet_number: 0, + round_count: 0, + bw_at_last_round: 0, + round_wo_bw_gain: 0, + ack_aggregation: AckAggregationState { + extra_acked: [0, 0], + extra_acked_win_idx: 0, + extra_acked_win_rtts: 0, + ack_epoch_mstamp: None, + ack_epoch_acked: 0, + }, + rwnd: 64 * 1024, + rng_state: 12345, // Arbitrary seed + app_limited: false, + prior_cwnd: initial_window, + packet_conservation: false, + prev_in_recovery: false, + round_start: false, + } + } + + // Simple pseudo-random number generator (LCG) + fn random_range(&mut self, max: u8) -> u8 { + self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345); + ((self.rng_state / 65536) % max as u32) as u8 + } + + fn enter_startup_mode(&mut self) { + self.mode = Mode::Startup; + self.pacing_gain = self.high_gain; + self.cwnd_gain = self.high_cwnd_gain; + } + + fn enter_probe_bandwidth_mode(&mut self, now: Instant) { + self.mode = Mode::ProbeBw; + self.cwnd_gain = K_DERIVED_HIGH_CWNDGAIN; + self.last_cycle_start = Some(now); + // Pick a random offset for the gain cycle out of {0, 2..7} range. 1 is + // excluded because in that case increased gain and decreased gain would not + // follow each other. + let mut rand_index = self.random_range((K_PACING_GAIN.len() as u8) - 1); + if rand_index >= 1 { + rand_index += 1; + } + self.current_cycle_offset = rand_index; + self.pacing_gain = K_PACING_GAIN[rand_index as usize]; + } + + fn save_cwnd(&mut self) { + // Save current cwnd before entering recovery + // This matches Linux BBR (tcp_bbr.c:756, 884) + if self.recovery_state == RecoveryState::NotInRecovery && self.mode != Mode::ProbeRtt { + self.prior_cwnd = self.cwnd; + } + } + + fn restore_cwnd(&mut self) { + // Restore cwnd when exiting recovery + // This matches Linux BBR (tcp_bbr.c:785, 903) + self.cwnd = self.cwnd.max(self.prior_cwnd); + } + + /// Packet conservation: handle recovery and restoration of cwnd. + /// Matches Linux BBR bbr_set_cwnd_to_recover_or_restore() (tcp_bbr.c:480-514) + /// + /// On the first round of recovery, follow packet conservation principle: + /// send P packets per P packets acked. After that, slow-start and send + /// at most 2*P packets per P packets acked. + fn set_cwnd_to_recover_or_restore( + &mut self, + bytes_acked: usize, + bytes_lost: usize, + bytes_in_flight: usize, + ) -> Option { + let in_recovery = self.recovery_state.in_recovery(); + let mut cwnd = self.cwnd; + + // An ACK for P pkts should release at most 2*P packets. We do this + // in two steps. First, here we deduct the number of lost packets. + // Then, in calculate_cwnd() we slow start up toward the target cwnd. + // Matches tcp_bbr.c:492-493 + if bytes_lost > 0 { + cwnd = cwnd.saturating_sub(bytes_lost).max(MAX_SEGMENT_SIZE); + } + + // Entering recovery: start packet conservation + // Matches tcp_bbr.c:495-500 + if in_recovery && !self.prev_in_recovery { + // Starting 1st round of Recovery, so do packet conservation. + self.packet_conservation = true; + // Start new round now + self.current_round_trip_end_packet_number = self.max_sent_packet_number; + // Cut unused cwnd from app behavior or other factors + cwnd = bytes_in_flight.saturating_add(bytes_acked); + } + // Exiting recovery: restore cwnd + // Matches tcp_bbr.c:501-504 + else if !in_recovery && self.prev_in_recovery { + // Exiting loss recovery; restore cwnd saved before recovery. + cwnd = cwnd.max(self.prior_cwnd); + self.packet_conservation = false; + } + + // Update prev state for next time + self.prev_in_recovery = in_recovery; + + // If using packet conservation, ensure cwnd >= inflight + acked + // Matches tcp_bbr.c:508-513 + if self.packet_conservation { + let conserved_cwnd = bytes_in_flight.saturating_add(bytes_acked).max(cwnd); + Some(conserved_cwnd) + } else { + Some(cwnd) + } + } + + fn update_recovery_state(&mut self, is_round_start: bool) { + // Exit recovery when there are no losses for a round. + if self.loss_state.has_losses() { + self.end_recovery_at_packet_number = self.max_sent_packet_number; + } + match self.recovery_state { + // Enter conservation on the first loss. + RecoveryState::NotInRecovery if self.loss_state.has_losses() => { + // Save cwnd before entering recovery (matches Linux BBR) + self.save_cwnd(); + self.recovery_state = RecoveryState::Conservation; + // This will cause the |recovery_window| to be set to the + // correct value in calculate_recovery_window(). + self.recovery_window = 0; + // Since the conservation phase is meant to be lasting for a whole + // round, extend the current round as if it were started right now. + self.current_round_trip_end_packet_number = self.max_sent_packet_number; + } + RecoveryState::Growth | RecoveryState::Conservation => { + if self.recovery_state == RecoveryState::Conservation && is_round_start { + self.recovery_state = RecoveryState::Growth; + } + // Exit recovery if appropriate. + if !self.loss_state.has_losses() + && self.max_acked_packet_number > self.end_recovery_at_packet_number + { + // Restore cwnd when exiting recovery (matches Linux BBR) + self.restore_cwnd(); + self.recovery_state = RecoveryState::NotInRecovery; + } + } + _ => {} + } + } + + fn update_gain_cycle_phase(&mut self, now: Instant, in_flight: usize) { + // In most cases, the cycle is advanced after an RTT passes. + let mut should_advance_gain_cycling = self + .last_cycle_start + .map(|last_cycle_start| { + if now > last_cycle_start { + now - last_cycle_start > self.min_rtt + } else { + false + } + }) + .unwrap_or(false); + + // If the pacing gain is above 1.0, the connection is trying to probe the + // bandwidth by increasing the number of bytes in flight to at least + // pacing_gain * BDP. Make sure that it actually reaches the target, as + // long as there are no losses suggesting that the buffers are not able to + // hold that much. + if self.pacing_gain > 1.0 + && !self.loss_state.has_losses() + && self.prev_in_flight_count < self.get_target_cwnd(self.pacing_gain) + { + should_advance_gain_cycling = false; + } + + // If pacing gain is below 1.0, the connection is trying to drain the extra + // queue which could have been incurred by probing prior to it. If the + // number of bytes in flight falls down to the estimated BDP value earlier, + // conclude that the queue has been successfully drained and exit this cycle + // early. + if self.pacing_gain < 1.0 && in_flight <= self.get_target_cwnd(1.0) { + should_advance_gain_cycling = true; + } + + if should_advance_gain_cycling { + self.current_cycle_offset = (self.current_cycle_offset + 1) % K_PACING_GAIN.len() as u8; + self.last_cycle_start = Some(now); + // Stay in low gain mode until the target BDP is hit. Low gain mode + // will be exited immediately when the target BDP is achieved. + if DRAIN_TO_TARGET + && self.pacing_gain < 1.0 + && (K_PACING_GAIN[self.current_cycle_offset as usize] - 1.0).abs() < f32::EPSILON + && in_flight > self.get_target_cwnd(1.0) + { + return; + } + self.pacing_gain = K_PACING_GAIN[self.current_cycle_offset as usize]; + } + } + + fn maybe_exit_startup_or_drain(&mut self, now: Instant, in_flight: usize) { + if self.mode == Mode::Startup && self.is_at_full_bandwidth { + #[cfg(feature = "log")] + log::info!("[BBR MODE] Startup → Drain | pacing_gain={:.2}", self.drain_gain); + + self.mode = Mode::Drain; + self.pacing_gain = self.drain_gain; + self.cwnd_gain = self.high_cwnd_gain; + } + if self.mode == Mode::Drain { + let target = self.get_target_cwnd(1.0); + #[cfg(feature = "log")] + if self.round_start { + log::debug!("[BBR DRAIN] in_flight={} | target_cwnd={} | will_exit={}", + in_flight, target, in_flight <= target); + } + + if in_flight <= target { + #[cfg(feature = "log")] + log::info!("[BBR MODE] Drain → ProbeBw | in_flight={} | target_cwnd={}", + in_flight, target); + + self.enter_probe_bandwidth_mode(now); + } + } + } + + fn is_min_rtt_expired(&self, now: Instant) -> bool { + !self.app_limited + && self + .probe_rtt_last_started_at + .map(|last| { + if now > last { + now - last > Duration::from_secs(10) + } else { + false + } + }) + .unwrap_or(false) // Never entered ProbeRtt before -> not expired + } + + fn maybe_enter_or_exit_probe_rtt( + &mut self, + now: Instant, + is_round_start: bool, + bytes_in_flight: usize, + _app_limited: bool, + ) { + let min_rtt_expired = self.is_min_rtt_expired(now); + // Enter ProbeRTT if min_rtt expired, not restarting from idle, and not already in ProbeRTT + // CRITICAL: Don't enter ProbeRtt during Startup - let BBR probe bandwidth first! + // Matches tcp_bbr.c:957-962 + if min_rtt_expired && !self.idle_restart && self.mode != Mode::ProbeRtt && self.mode != Mode::Startup { + // Save cwnd before entering ProbeRTT (matches Linux BBR tcp_bbr.c:960) + self.save_cwnd(); + self.mode = Mode::ProbeRtt; + self.pacing_gain = 1.0; + // Do not decide on the time to exit ProbeRtt until the + // |bytes_in_flight| is at the target small value. + self.exit_probe_rtt_at = None; + self.probe_rtt_last_started_at = Some(now); + + // CRITICAL FIX: Actually reduce cwnd when entering ProbeRtt! + // This is what makes bytes_in_flight drain down + self.cwnd = self.get_probe_rtt_cwnd(); + + #[cfg(feature = "log")] + log::info!( + "[BBR ProbeRtt] ENTERED ProbeRtt | old_cwnd saved | new_cwnd={} | bytes_in_flight={} | target={}", + self.cwnd, + bytes_in_flight, + self.get_probe_rtt_cwnd() + MAX_SEGMENT_SIZE + ); + } + + if self.mode == Mode::ProbeRtt { + if self.exit_probe_rtt_at.is_none() { + // If the window has reached the appropriate size, schedule exiting + // ProbeRtt. The CWND during ProbeRtt is + // kMinimumCongestionWindow, but we allow an extra packet since QUIC + // checks CWND before sending a packet. + + #[cfg(feature = "log")] + if bytes_in_flight >= self.get_probe_rtt_cwnd() + MAX_SEGMENT_SIZE { + log::debug!( + "[BBR ProbeRtt] WAITING for cwnd drain | bytes_in_flight={} | target={} | cwnd={}", + bytes_in_flight, + self.get_probe_rtt_cwnd() + MAX_SEGMENT_SIZE, + self.cwnd + ); + } + + if bytes_in_flight < self.get_probe_rtt_cwnd() + MAX_SEGMENT_SIZE { + const K_PROBE_RTT_TIME: Duration = Duration::from_millis(200); + self.exit_probe_rtt_at = Some(now + K_PROBE_RTT_TIME); + + #[cfg(feature = "log")] + log::debug!( + "[BBR ProbeRtt] SCHEDULED EXIT in 200ms | bytes_in_flight={} | target={}", + bytes_in_flight, + self.get_probe_rtt_cwnd() + MAX_SEGMENT_SIZE + ); + } + } else { + // Check if we can exit ProbeRtt (after 200ms has passed) + if let Some(exit_time) = self.exit_probe_rtt_at { + if now >= exit_time { + // Restore cwnd when exiting ProbeRTT (matches Linux BBR tcp_bbr.c:918) + self.restore_cwnd(); + + #[cfg(feature = "log")] + log::info!( + "[BBR ProbeRtt] EXITING ProbeRtt | restored_cwnd={} | is_at_full_bandwidth={} | is_round_start={}", + self.cwnd, + self.is_at_full_bandwidth, + is_round_start + ); + + if !self.is_at_full_bandwidth { + self.enter_startup_mode(); + } else { + self.enter_probe_bandwidth_mode(now); + } + } + } + } + } + } + + fn get_target_cwnd(&self, gain: f32) -> usize { + let bw = self.max_bandwidth.get_estimate(); + let bdp = self.min_rtt.total_micros() * bw; + let bdpf = bdp as f64; + let cwnd = ((gain as f64 * bdpf) / 1_000_000f64) as usize; + // BDP estimate will be zero if no bandwidth samples are available yet. + if cwnd == 0 { + return self.init_cwnd; + } + cwnd.max(self.min_cwnd) + } + + fn get_probe_rtt_cwnd(&self) -> usize { + const K_MODERATE_PROBE_RTT_MULTIPLIER: f32 = 0.75; + if PROBE_RTT_BASED_ON_BDP { + return self.get_target_cwnd(K_MODERATE_PROBE_RTT_MULTIPLIER); + } + self.min_cwnd + } + + fn calculate_pacing_rate(&mut self) { + let bw = self.max_bandwidth.get_estimate(); + + // If no bandwidth estimate yet, initialize pacing rate from cwnd/RTT + // Matches Linux BBR bbr_init_pacing_rate_from_rtt (tcp_bbr.c:266-283) + if bw == 0 { + // Use measured RTT if available, otherwise use 1ms default (like Linux) + let rtt_us = if self.min_rtt.total_micros() != 0 { + self.min_rtt.total_micros() + } else { + 1000 // 1ms default RTT (USEC_PER_MSEC) + }; + + // Calculate initial bandwidth: init_cwnd / RTT + let rtt_duration = Duration::from_micros(rtt_us); + let initial_bw = BandwidthEstimation::bw_from_delta(self.init_cwnd as u64, rtt_duration) + .unwrap_or(0); + + if initial_bw > 0 { + // Apply high_gain to initial pacing rate (Startup mode) + let initial_rate = (initial_bw as f64 * self.pacing_gain as f64) as u64; + self.pacing_rate = initial_rate; + + #[cfg(feature = "log")] + log::debug!( + "[BBR PACING] Initial pacing rate: cwnd={} / rtt={}us * gain={:.2} = {} B/s ({:.3} Mbps)", + self.init_cwnd, + rtt_us, + self.pacing_gain, + self.pacing_rate, + (self.pacing_rate as f64 * 8.0) / 1_000_000.0 + ); + } + return; + } + + // Calculate target rate with pacing gain + let mut target_rate = (bw as f64 * self.pacing_gain as f64) as u64; + + // Apply pacing margin: pace at ~1% below estimated bandwidth + // This matches Linux BBR (tcp_bbr.c:251) to reduce queue buildup at bottleneck + target_rate = (target_rate * (100 - BBR_PACING_MARGIN_PERCENT as u64)) / 100; + + #[cfg(feature = "log")] + log::trace!( + "[BBR PACING] bw_estimate={} B/s ({:.3} Mbps) | pacing_gain={} | target_rate={} B/s ({:.3} Mbps) | mode={:?}", + bw, + (bw as f64 * 8.0) / 1_000_000.0, + self.pacing_gain, + target_rate, + (target_rate as f64 * 8.0) / 1_000_000.0, + self.mode + ); + + if self.is_at_full_bandwidth { + self.pacing_rate = target_rate; + return; + } + + // Do not decrease the pacing rate during startup. + // Matches Linux BBR (tcp_bbr.c:294) + if self.pacing_rate < target_rate { + self.pacing_rate = target_rate; + } + } + + fn calculate_cwnd(&mut self, bytes_acked: usize) { + if self.mode == Mode::ProbeRtt { + return; + } + + // No packet fully ACKed; just apply caps + // Matches tcp_bbr.c:526-527 + if bytes_acked == 0 { + // Enforce minimum cwnd + if self.cwnd < self.min_cwnd { + self.cwnd = self.min_cwnd; + } + return; + } + + // Handle recovery and restoration with packet conservation + // Matches tcp_bbr.c:529-530 + if let Some(new_cwnd) = self.set_cwnd_to_recover_or_restore( + bytes_acked, + self.loss_state.lost_bytes, + self.cwnd, // Use current cwnd as bytes_in_flight approximation + ) { + self.cwnd = new_cwnd; + // If packet conservation is active, skip normal cwnd growth + // and just enforce minimum. Matches tcp_bbr.c:529-530 (goto done) + if self.packet_conservation { + if self.cwnd < self.min_cwnd { + self.cwnd = self.min_cwnd; + } + return; + } + } + + // Normal cwnd calculation: compute target cwnd based on BDP + // Matches tcp_bbr.c:532-538 + let mut target_window = self.get_target_cwnd(self.cwnd_gain); + + // Add ACK aggregation cwnd increment + // Matches tcp_bbr.c:537 + let bw = self.max_bandwidth.get_estimate(); + target_window = target_window.saturating_add( + self.ack_aggregation + .ack_aggregation_cwnd(bw, self.is_at_full_bandwidth), + ); + + // Note: bbr_quantization_budget (tcp_bbr.c:538) is omitted as it's + // TSO-specific and not applicable to smoltcp + + // Slow start cwnd toward target cwnd + // Matches tcp_bbr.c:541-545 + if self.is_at_full_bandwidth { + // Only cut cwnd if we filled the pipe + self.cwnd = target_window.min(self.cwnd.saturating_add(bytes_acked)); + } else if (self.cwnd < target_window) + || (self.acked_bytes < self.init_cwnd as u64) + { + // If the connection is not yet out of startup phase, do not decrease + // the window. + self.cwnd = self.cwnd.saturating_add(bytes_acked); + } + + // Enforce the limits on the congestion window. + // Matches tcp_bbr.c:545 + if self.cwnd < self.min_cwnd { + self.cwnd = self.min_cwnd; + } + } + + fn calculate_recovery_window( + &mut self, + bytes_acked: usize, + bytes_lost: usize, + in_flight: usize, + ) { + if !self.recovery_state.in_recovery() { + return; + } + // Set up the initial recovery window. + if self.recovery_window == 0 { + self.recovery_window = self.min_cwnd.max(in_flight.saturating_add(bytes_acked)); + return; + } + + // Remove losses from the recovery window, while accounting for a potential + // integer underflow. + if self.recovery_window >= bytes_lost { + self.recovery_window -= bytes_lost; + } else { + self.recovery_window = MAX_SEGMENT_SIZE; + } + // In CONSERVATION mode, just subtracting losses is sufficient. In GROWTH, + // release additional |bytes_acked| to achieve a slow-start-like behavior. + if self.recovery_state == RecoveryState::Growth { + self.recovery_window = self.recovery_window.saturating_add(bytes_acked); + } + + // Sanity checks. Ensure that we always allow to send at least an MSS or + // |bytes_acked| in response, whichever is larger. + self.recovery_window = self + .recovery_window + .max(in_flight.saturating_add(bytes_acked)) + .max(self.min_cwnd); + } + + /// + /// + fn check_if_full_bw_reached(&mut self) { + if self.app_limited { + return; + } + let target = (self.bw_at_last_round as f64 * K_STARTUP_GROWTH_TARGET as f64) as u64; + let bw = self.max_bandwidth.get_estimate(); + + #[cfg(feature = "log")] + log::info!( + "[BBR STARTUP] Check full BW: current={} B/s ({:.1} Mbps) | target={} B/s ({:.1} Mbps) | last={} B/s | rounds_wo_gain={} | in_recovery={}", + bw, (bw as f64 * 8.0) / 1e6, + target, (target as f64 * 8.0) / 1e6, + self.bw_at_last_round, + self.round_wo_bw_gain, + self.recovery_state.in_recovery() + ); + + if bw >= target { + self.bw_at_last_round = bw; + self.round_wo_bw_gain = 0; + // Reset ACK aggregation tracking when bandwidth increases + self.ack_aggregation.extra_acked = [0, 0]; + self.ack_aggregation.extra_acked_win_rtts = 0; + + #[cfg(feature = "log")] + log::info!("[BBR STARTUP] Bandwidth grew! Resetting counter"); + return; + } + + self.round_wo_bw_gain += 1; + + #[cfg(feature = "log")] + log::info!("[BBR STARTUP] No growth, counter now: {}", self.round_wo_bw_gain); + + if self.round_wo_bw_gain >= K_ROUND_TRIPS_WITHOUT_GROWTH_BEFORE_EXITING_STARTUP as u64 + || (self.recovery_state.in_recovery()) + { + self.is_at_full_bandwidth = true; + + #[cfg(feature = "log")] + log::info!("[BBR STARTUP] *** EXITING STARTUP *** is_at_full_bandwidth=true"); + } + } + + fn on_ack_impl(&mut self, now: Instant, len: usize, rtt: &RttEstimator, bytes_in_flight: usize) { + let bytes = len as u64; + // Simulate packet numbers using bytes + let packet_number = self.max_acked_packet_number + 1; + self.max_acked_packet_number = packet_number; + + // Track round start BEFORE updating bandwidth estimation + // This ensures bandwidth estimation sees the correct round number + // Matches tcp_bbr.c:767, 772-777 + self.round_start = false; + let is_round_start = + self.max_acked_packet_number > self.current_round_trip_end_packet_number; + + if is_round_start { + self.round_start = true; + self.current_round_trip_end_packet_number = self.max_sent_packet_number; + self.round_count += 1; + // Reset packet conservation on round start + // Matches tcp_bbr.c:776 + self.packet_conservation = false; + + #[cfg(feature = "log")] + log::trace!( + "[BBR ROUND] round={} | is_round_start={} | max_acked={} | round_end={} | max_sent={} | mode={:?}", + self.round_count, + is_round_start, + self.max_acked_packet_number, + self.current_round_trip_end_packet_number, + self.max_sent_packet_number, + self.mode + ); + } + + // Update bandwidth estimation with app_limited state + // Now uses the UPDATED round_count if we just started a new round + self.max_bandwidth + .on_ack(now, now, bytes, self.round_count, self.app_limited); + self.acked_bytes += bytes; + + // Update min_rtt from the RttEstimator's windowed minimum + // The RttEstimator now properly tracks the minimum RTT over a 10-second window + // and handles expiration, matching Linux BBR behavior + let current_min_rtt = rtt.min_rtt(); + if self.min_rtt == Duration::ZERO || self.min_rtt > current_min_rtt { + self.min_rtt = current_min_rtt; + } + + // End of acks processing + let bytes_acked = self.max_bandwidth.bytes_acked_this_window() as usize; + self.max_bandwidth + .end_acks(self.round_count, self.app_limited); + + self.update_recovery_state(self.round_start); + + // Update ACK aggregation tracking + // Matches tcp_bbr.c:1019 (bbr_update_ack_aggregation call in bbr_update_model) + self.ack_aggregation.update_ack_aggregation( + bytes_acked as u64, + now, + self.round_start, + self.max_bandwidth.get_estimate(), + self.cwnd, + ); + + if self.mode == Mode::ProbeBw { + self.update_gain_cycle_phase(now, bytes_in_flight); + } + + if self.round_start && !self.is_at_full_bandwidth { + self.check_if_full_bw_reached(); + } + + self.maybe_exit_startup_or_drain(now, bytes_in_flight); + + self.maybe_enter_or_exit_probe_rtt(now, self.round_start, bytes_in_flight, self.app_limited); + + // After the model is updated, recalculate the pacing rate and congestion window. + self.calculate_pacing_rate(); + self.calculate_cwnd(bytes_acked); + self.calculate_recovery_window(bytes_acked, self.loss_state.lost_bytes, self.cwnd); + + // Reset idle_restart after processing new data delivery + // Matches tcp_bbr.c:983-984: "Restart after idle ends only once we process a new S/ACK for data" + if bytes_acked > 0 { + self.idle_restart = false; + } + + self.prev_in_flight_count = bytes_in_flight; + self.loss_state.reset(); + } + + fn on_transmit_impl(&mut self, now: Instant, len: usize) { + let bytes = len as u64; + let packet_number = self.max_sent_packet_number + 1; + self.max_sent_packet_number = packet_number; + self.max_bandwidth.on_sent(now, bytes); + } +} + +impl Controller for Bbr { + fn window(&self) -> usize { + let cwnd = if self.mode == Mode::ProbeRtt { + self.get_probe_rtt_cwnd() + } else if self.recovery_state.in_recovery() && self.mode != Mode::Startup { + self.cwnd.min(self.recovery_window) + } else { + self.cwnd + }; + cwnd.min(self.rwnd) + } + + fn set_remote_window(&mut self, remote_window: usize) { + if self.rwnd < remote_window { + self.rwnd = remote_window; + } + } + + fn on_ack(&mut self, now: Instant, len: usize, rtt: &RttEstimator, bytes_in_flight: usize) { + self.on_ack_impl(now, len, rtt, bytes_in_flight); + } + + fn on_retransmit(&mut self, _now: Instant) { + self.loss_state.lost_bytes = self.loss_state.lost_bytes.saturating_add(1); + } + + fn on_duplicate_ack(&mut self, _now: Instant) { + self.loss_state.lost_bytes = self.loss_state.lost_bytes.saturating_add(1); + } + + fn pre_transmit(&mut self, _now: Instant) { + // BBR doesn't need pre-transmission processing + } + + fn post_transmit(&mut self, now: Instant, len: usize) { + self.on_transmit_impl(now, len); + } + + fn set_mss(&mut self, mss: usize) { + self.min_cwnd = mss * 2; + if self.cwnd < self.min_cwnd { + self.cwnd = self.min_cwnd; + } + } + + fn on_send_ready(&mut self, now: Instant, bytes_available: usize) { + // Detect idle restart: transmission starting when app_limited + // Matches tcp_bbr.c:337-348 (CA_EVENT_TX_START) + if self.app_limited && bytes_available > 0 { + self.idle_restart = true; + // Reset ACK aggregation epoch on idle restart + self.ack_aggregation.ack_epoch_mstamp = Some(now); + self.ack_aggregation.ack_epoch_acked = 0; + // Note: Pacing rate adjustment happens in set_pacing_rate() calls + // which are made during normal cwnd/pacing updates + } + + // Track app-limited state: true when bytes_available < cwnd + // This follows Quinn's approach where app_limited indicates the application + // doesn't have enough data to fill the congestion window. + let cwnd = self.window(); + self.app_limited = bytes_available < cwnd; + } + + fn pacing_rate(&self) -> u64 { + self.pacing_rate + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +enum Mode { + // Startup phase of the connection. + Startup, + // After achieving the highest possible bandwidth during the startup, lower + // the pacing rate in order to drain the queue. + Drain, + // Cruising mode. + ProbeBw, + // Temporarily slow down sending in order to empty the buffer and measure + // the real minimum RTT. + ProbeRtt, +} + +// Indicates how the congestion control limits the amount of bytes in flight. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +enum RecoveryState { + // Do not limit. + NotInRecovery, + // Allow an extra outstanding byte for each byte acknowledged. + Conservation, + // Allow two extra outstanding bytes for each byte acknowledged (slow + // start). + Growth, +} + +impl RecoveryState { + pub fn in_recovery(&self) -> bool { + !matches!(self, RecoveryState::NotInRecovery) + } +} + +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +struct AckAggregationState { + // Windowed max filter for tracking maximum extra acked + // Matches tcp_bbr.c:123 (extra_acked[2]) + extra_acked: [u64; 2], + // Current window index for extra_acked array + // Matches tcp_bbr.c:126 (extra_acked_win_idx) + extra_acked_win_idx: usize, + // Age of extra_acked window in round trips + // Matches tcp_bbr.c:125 (extra_acked_win_rtts) + extra_acked_win_rtts: u32, + // Start time of current ACK aggregation epoch + // Matches tcp_bbr.c:122 (ack_epoch_mstamp) + ack_epoch_mstamp: Option, + // Packets ACKed in current sampling epoch + // Matches tcp_bbr.c:124 (ack_epoch_acked) + ack_epoch_acked: u64, +} + +impl AckAggregationState { + /// Return maximum extra acked in past k-2k round trips, where k = BBR_EXTRA_ACKED_WIN_RTTS + /// Matches Linux BBR bbr_extra_acked() (tcp_bbr.c:233-238) + fn extra_acked(&self) -> u64 { + self.extra_acked[0].max(self.extra_acked[1]) + } + + /// Estimates the windowed max degree of ACK aggregation. + /// Matches Linux BBR bbr_update_ack_aggregation() (tcp_bbr.c:817-863) + fn update_ack_aggregation( + &mut self, + newly_acked_bytes: u64, + now: Instant, + round_start: bool, + max_bandwidth: u64, + cwnd: usize, + ) { + // Check if we should skip (no gain configured or invalid input) + // Matches tcp_bbr.c:824-826 + if BBR_EXTRA_ACKED_GAIN == 0 || newly_acked_bytes == 0 { + return; + } + + // Advance the windowed max filter on round start + // Matches tcp_bbr.c:828-836 + if round_start { + self.extra_acked_win_rtts = (self.extra_acked_win_rtts + 1).min(0x1F); + if self.extra_acked_win_rtts >= BBR_EXTRA_ACKED_WIN_RTTS { + self.extra_acked_win_rtts = 0; + self.extra_acked_win_idx = if self.extra_acked_win_idx == 0 { 1 } else { 0 }; + self.extra_acked[self.extra_acked_win_idx] = 0; + } + } + + // Compute how many bytes we expected to be delivered over this epoch + // Matches tcp_bbr.c:839-842 + let expected_acked = if let Some(epoch_start) = self.ack_epoch_mstamp { + if now > epoch_start { + let epoch_us = (now - epoch_start).total_micros(); + max_bandwidth * epoch_us / 1_000_000 + } else { + 0 + } + } else { + 0 + }; + + // Reset the aggregation epoch if ACK rate is below expected rate or + // epoch has become too large (stale) + // Matches tcp_bbr.c:844-854 + if self.ack_epoch_acked <= expected_acked + || self.ack_epoch_acked + newly_acked_bytes >= BBR_ACK_EPOCH_ACKED_RESET_THRESH + { + self.ack_epoch_acked = 0; + self.ack_epoch_mstamp = Some(now); + // expected_acked = 0 after reset (implicitly used below) + // Matches tcp_bbr.c:853 + } + + // Compute excess data delivered, beyond what was expected + // Matches tcp_bbr.c:856-862 + self.ack_epoch_acked = (self.ack_epoch_acked + newly_acked_bytes).min(0xFFFFF); + + let extra_acked = if self.ack_epoch_acked > expected_acked { + self.ack_epoch_acked - expected_acked + } else { + 0 + }; + + // Clamp by cwnd + let extra_acked = extra_acked.min(cwnd as u64); + + // Update windowed max + if extra_acked > self.extra_acked[self.extra_acked_win_idx] { + self.extra_acked[self.extra_acked_win_idx] = extra_acked; + } + } + + /// Find the cwnd increment based on estimate of ack aggregation + /// Matches Linux BBR bbr_ack_aggregation_cwnd() (tcp_bbr.c:457-470) + fn ack_aggregation_cwnd(&self, bw: u64, is_at_full_bandwidth: bool) -> usize { + if BBR_EXTRA_ACKED_GAIN == 0 || !is_at_full_bandwidth { + return 0; + } + + // max_aggr_cwnd = bw * 100ms + // Matches tcp_bbr.c:462-463 + let max_aggr_cwnd = (bw * BBR_EXTRA_ACKED_MAX_US / 1_000_000) as usize; + + // aggr_cwnd = (gain * extra_acked) >> BBR_SCALE + // Matches tcp_bbr.c:464-465 + let extra = self.extra_acked(); + let aggr_cwnd = ((BBR_EXTRA_ACKED_GAIN as u64 * extra) >> BBR_SCALE) as usize; + + // Clamp by max + // Matches tcp_bbr.c:466 + aggr_cwnd.min(max_aggr_cwnd) + } +} + +#[derive(Debug, Clone, Default)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +struct LossState { + lost_bytes: usize, +} + +impl LossState { + pub fn reset(&mut self) { + self.lost_bytes = 0; + } + + pub fn has_losses(&self) -> bool { + self.lost_bytes != 0 + } +} + +// The gain used for the STARTUP, equal to 2/ln(2). +const K_DEFAULT_HIGH_GAIN: f32 = 2.885; +// The newly derived CWND gain for STARTUP, 2. +const K_DERIVED_HIGH_CWNDGAIN: f32 = 2.0; +// The cycle of gains used during the ProbeBw stage. +const K_PACING_GAIN: [f32; 8] = [1.25, 0.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]; + +const K_STARTUP_GROWTH_TARGET: f32 = 1.25; +const K_ROUND_TRIPS_WITHOUT_GROWTH_BEFORE_EXITING_STARTUP: u8 = 3; + +// Pacing margin: pace at ~1% below estimated bandwidth to reduce queue buildup +// This matches Linux BBR (tcp_bbr.c:147) +const BBR_PACING_MARGIN_PERCENT: u8 = 1; + +// ACK aggregation constants +// Gain factor for adding extra_acked to target cwnd +// Matches tcp_bbr.c:196 (bbr_extra_acked_gain = BBR_UNIT = 256) +const BBR_EXTRA_ACKED_GAIN: u32 = 256; +// Window length of extra_acked window in round trips +// Matches tcp_bbr.c:198 +const BBR_EXTRA_ACKED_WIN_RTTS: u32 = 5; +// Max allowed value for ack_epoch_acked, after which sampling epoch is reset +// Matches tcp_bbr.c:200 +const BBR_ACK_EPOCH_ACKED_RESET_THRESH: u64 = 1u64 << 20; +// Time period for clamping cwnd increment due to ack aggregation (100ms in microseconds) +// Matches tcp_bbr.c:202 +const BBR_EXTRA_ACKED_MAX_US: u64 = 100 * 1000; +// BBR_SCALE for gain calculations +// Matches tcp_bbr.c:77 (BBR_SCALE = 8, BBR_UNIT = 1 << 8 = 256) +const BBR_SCALE: u32 = 8; + +const MAX_SEGMENT_SIZE: usize = 1460; + +const PROBE_RTT_BASED_ON_BDP: bool = true; +const DRAIN_TO_TARGET: bool = true; + diff --git a/src/socket/tcp/congestion/bbr/bw_estimation.rs b/src/socket/tcp/congestion/bbr/bw_estimation.rs new file mode 100644 index 000000000..3dc87a5ff --- /dev/null +++ b/src/socket/tcp/congestion/bbr/bw_estimation.rs @@ -0,0 +1,163 @@ +use crate::time::{Duration, Instant}; + +use super::min_max::MinMax; + +#[derive(Debug, Clone)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub(crate) struct BandwidthEstimation { + total_acked: u64, + prev_total_acked: u64, + acked_time: Option, + prev_acked_time: Option, + total_sent: u64, + prev_total_sent: u64, + sent_time: Option, + prev_sent_time: Option, + max_filter: MinMax, + acked_at_last_window: u64, + last_round: u64, // Track RTT rounds to detect measurement window boundaries +} + +impl Default for BandwidthEstimation { + fn default() -> Self { + BandwidthEstimation { + total_acked: 0, + prev_total_acked: 0, + acked_time: None, + prev_acked_time: None, + total_sent: 0, + prev_total_sent: 0, + sent_time: None, + prev_sent_time: None, + max_filter: MinMax::new(10), + acked_at_last_window: 0, + last_round: 0, + } + } +} + +impl BandwidthEstimation { + pub fn on_sent(&mut self, now: Instant, bytes: u64) { + // Only update prev_* when we haven't sent anything in this window yet + // This allows accumulating sent bytes over the window + if self.sent_time.is_none() { + self.prev_total_sent = self.total_sent; + self.prev_sent_time = None; + } + + self.total_sent += bytes; + self.sent_time = Some(now); + } + + pub fn on_ack( + &mut self, + now: Instant, + _sent: Instant, + bytes: u64, + round: u64, + app_limited: bool, + ) { + #[cfg(feature = "log")] + log::trace!( + "[BBR BW] on_ack called: round={} bytes={} last_round={}", + round, + bytes, + self.last_round + ); + + // Initialize on first ACK + if self.acked_time.is_none() { + self.prev_total_acked = 0; + self.prev_acked_time = Some(now); + self.total_acked = bytes; + self.acked_time = Some(now); + self.last_round = round; + return; + } + + // Detect new round - this means we completed the previous round! + // Calculate bandwidth for the COMPLETED round and update filter ONCE per round + #[cfg(feature = "log")] + if round != self.last_round { + log::trace!( + "[BBR BW] Round change detected: {} -> {}", + self.last_round, + round + ); + } + + if round != self.last_round { + // Calculate bandwidth for the completed round using accumulated data + let completed_round_bw = if let Some(prev_acked_time) = self.prev_acked_time { + if self.acked_time.unwrap() > prev_acked_time { + let delta_bytes = self.total_acked - self.prev_total_acked; + let delta_time = self.acked_time.unwrap() - prev_acked_time; + BandwidthEstimation::bw_from_delta(delta_bytes, delta_time).unwrap_or(0) + } else { + 0 + } + } else { + 0 + }; + + // Update the MinMax filter ONCE per round with the completed round's bandwidth + if !app_limited && completed_round_bw > 0 { + let old_max = self.max_filter.get(); + self.max_filter.update_max(self.last_round, completed_round_bw); + let new_max = self.max_filter.get(); + + #[cfg(feature = "log")] + log::debug!( + "[BBR BW] Round {} complete: bw={} B/s ({:.3} Mbps) | old_max={} B/s ({:.3} Mbps) | new_max={} B/s ({:.3} Mbps)", + self.last_round, + completed_round_bw, + (completed_round_bw as f64 * 8.0) / 1_000_000.0, + old_max, + (old_max as f64 * 8.0) / 1_000_000.0, + new_max, + (new_max as f64 * 8.0) / 1_000_000.0 + ); + } + + // Now start the new round - reset accumulators + self.prev_total_acked = self.total_acked; + self.prev_acked_time = self.acked_time; + self.last_round = round; + + // Also reset sent tracking for new round + if self.sent_time.is_some() { + self.prev_total_sent = self.total_sent; + self.prev_sent_time = self.sent_time; + } + } + + // Accumulate bytes for this ACK + self.total_acked += bytes; + self.acked_time = Some(now); + + // Note: MinMax filter is only updated once per round (above when round changes) + // get_estimate() returns the max bandwidth over the last 10 completed rounds + // This is correct BBR behavior - don't update filter on every ACK + } + + pub fn bytes_acked_this_window(&self) -> u64 { + self.total_acked - self.acked_at_last_window + } + + pub fn end_acks(&mut self, _current_round: u64, _app_limited: bool) { + self.acked_at_last_window = self.total_acked; + } + + pub fn get_estimate(&self) -> u64 { + self.max_filter.get() + } + + pub const fn bw_from_delta(bytes: u64, delta: Duration) -> Option { + let window_duration_micros = delta.total_micros(); + if window_duration_micros == 0 { + return None; + } + let bytes_per_second = bytes * 1_000_000 / window_duration_micros; + Some(bytes_per_second) + } +} diff --git a/src/socket/tcp/congestion/bbr/min_max.rs b/src/socket/tcp/congestion/bbr/min_max.rs new file mode 100644 index 000000000..b5ad98dac --- /dev/null +++ b/src/socket/tcp/congestion/bbr/min_max.rs @@ -0,0 +1,154 @@ +/* + * Based on Google code released under BSD license here: + * https://groups.google.com/forum/#!topic/bbr-dev/3RTgkzi5ZD8 + */ + +/* + * Kathleen Nichols' algorithm for tracking the minimum (or maximum) + * value of a data stream over some fixed time interval. (E.g., + * the minimum RTT over the past five minutes.) It uses constant + * space and constant time per update yet almost always delivers + * the same minimum as an implementation that has to keep all the + * data in the window. + * + * The algorithm keeps track of the best, 2nd best & 3rd best min + * values, maintaining an invariant that the measurement time of + * the n'th best >= n-1'th best. It also makes sure that the three + * values are widely separated in the time window since that bounds + * the worse case error when that data is monotonically increasing + * over the window. + * + * Upon getting a new min, we can forget everything earlier because + * it has no value - the new min is <= everything else in the window + * by definition and it samples the most recent. So we restart fresh on + * every new min and overwrites 2nd & 3rd choices. The same property + * holds for 2nd & 3rd best. + */ + +#[cfg(feature = "defmt")] +use defmt::Format; + +#[derive(Debug, Copy, Clone, Default)] +#[cfg_attr(feature = "defmt", derive(Format))] +struct MinMaxSample { + /// round number, not a timestamp + time: u64, + value: u64, +} + +#[derive(Debug, Copy, Clone)] +#[cfg_attr(feature = "defmt", derive(Format))] +pub(crate) struct MinMax { + /// round count, not a timestamp + window: u64, + samples: [MinMaxSample; 3], +} + +impl MinMax { + pub fn new(round_window: u64) -> Self { + MinMax { + window: round_window, + samples: [Default::default(); 3], + } + } + + pub fn get(&self) -> u64 { + self.samples[0].value + } + + fn fill(&mut self, sample: MinMaxSample) { + self.samples.fill(sample); + } + + #[allow(dead_code)] + pub fn reset(&mut self) { + self.fill(Default::default()) + } + + /// update_min is also defined in the original source, but removed here since it is not used. + pub fn update_max(&mut self, current_round: u64, measurement: u64) { + let sample = MinMaxSample { + time: current_round, + value: measurement, + }; + + if self.samples[0].value == 0 /* uninitialised */ + || /* found new max? */ sample.value >= self.samples[0].value + || /* nothing left in window? */ sample.time - self.samples[2].time > self.window + { + self.fill(sample); /* forget earlier samples */ + return; + } + + if sample.value >= self.samples[1].value { + self.samples[2] = sample; + self.samples[1] = sample; + } else if sample.value >= self.samples[2].value { + self.samples[2] = sample; + } + + self.subwin_update(sample); + } + + /* As time advances, update the 1st, 2nd, and 3rd choices. */ + fn subwin_update(&mut self, sample: MinMaxSample) { + let dt = sample.time - self.samples[0].time; + if dt > self.window { + /* + * Passed entire window without a new sample so make 2nd + * choice the new sample & 3rd choice the new 2nd choice. + * we may have to iterate this since our 2nd choice + * may also be outside the window (we checked on entry + * that the third choice was in the window). + */ + self.samples[0] = self.samples[1]; + self.samples[1] = self.samples[2]; + self.samples[2] = sample; + if sample.time - self.samples[0].time > self.window { + self.samples[0] = self.samples[1]; + self.samples[1] = self.samples[2]; + self.samples[2] = sample; + } + } else if self.samples[1].time == self.samples[0].time && dt > self.window / 4 { + /* + * We've passed a quarter of the window without a new sample + * so take a 2nd choice from the 2nd quarter of the window. + */ + self.samples[2] = sample; + self.samples[1] = sample; + } else if self.samples[2].time == self.samples[1].time && dt > self.window / 2 { + /* + * We've passed half the window without finding a new sample + * so take a 3rd choice from the last half of the window + */ + self.samples[2] = sample; + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test() { + let round = 25; + let mut min_max = MinMax::new(10); + min_max.update_max(round + 1, 100); + assert_eq!(100, min_max.get()); + min_max.update_max(round + 3, 120); + assert_eq!(120, min_max.get()); + min_max.update_max(round + 5, 160); + assert_eq!(160, min_max.get()); + min_max.update_max(round + 7, 100); + assert_eq!(160, min_max.get()); + min_max.update_max(round + 10, 100); + assert_eq!(160, min_max.get()); + min_max.update_max(round + 14, 100); + assert_eq!(160, min_max.get()); + min_max.update_max(round + 16, 100); + assert_eq!(100, min_max.get()); + min_max.update_max(round + 18, 130); + assert_eq!(130, min_max.get()); + } +} diff --git a/src/socket/tcp/congestion/cubic.rs b/src/socket/tcp/congestion/cubic.rs index 76b45039e..f555f3f72 100644 --- a/src/socket/tcp/congestion/cubic.rs +++ b/src/socket/tcp/congestion/cubic.rs @@ -56,7 +56,7 @@ impl Controller for Cubic { } } - fn on_ack(&mut self, _now: Instant, len: usize, _rtt: &crate::socket::tcp::RttEstimator) { + fn on_ack(&mut self, _now: Instant, len: usize, _rtt: &crate::socket::tcp::RttEstimator, _bytes_in_flight: usize) { // Slow start. if self.cwnd < self.ssthresh { self.cwnd = self diff --git a/src/socket/tcp/congestion/reno.rs b/src/socket/tcp/congestion/reno.rs index 8c0295487..0f7e8354e 100644 --- a/src/socket/tcp/congestion/reno.rs +++ b/src/socket/tcp/congestion/reno.rs @@ -27,7 +27,7 @@ impl Controller for Reno { self.cwnd } - fn on_ack(&mut self, _now: Instant, len: usize, _rtt: &RttEstimator) { + fn on_ack(&mut self, _now: Instant, len: usize, _rtt: &RttEstimator, _bytes_in_flight: usize) { let len = if self.cwnd < self.ssthresh { // Slow start. len diff --git a/tcp_bbr.c b/tcp_bbr.c new file mode 100644 index 000000000..e4963acd3 --- /dev/null +++ b/tcp_bbr.c @@ -0,0 +1,1199 @@ +/* Bottleneck Bandwidth and RTT (BBR) congestion control + * + * BBR congestion control computes the sending rate based on the delivery + * rate (throughput) estimated from ACKs. In a nutshell: + * + * On each ACK, update our model of the network path: + * bottleneck_bandwidth = windowed_max(delivered / elapsed, 10 round trips) + * min_rtt = windowed_min(rtt, 10 seconds) + * pacing_rate = pacing_gain * bottleneck_bandwidth + * cwnd = max(cwnd_gain * bottleneck_bandwidth * min_rtt, 4) + * + * The core algorithm does not react directly to packet losses or delays, + * although BBR may adjust the size of next send per ACK when loss is + * observed, or adjust the sending rate if it estimates there is a + * traffic policer, in order to keep the drop rate reasonable. + * + * Here is a state transition diagram for BBR: + * + * | + * V + * +---> STARTUP ----+ + * | | | + * | V | + * | DRAIN ----+ + * | | | + * | V | + * +---> PROBE_BW ----+ + * | ^ | | + * | | | | + * | +----+ | + * | | + * +---- PROBE_RTT <--+ + * + * A BBR flow starts in STARTUP, and ramps up its sending rate quickly. + * When it estimates the pipe is full, it enters DRAIN to drain the queue. + * In steady state a BBR flow only uses PROBE_BW and PROBE_RTT. + * A long-lived BBR flow spends the vast majority of its time remaining + * (repeatedly) in PROBE_BW, fully probing and utilizing the pipe's bandwidth + * in a fair manner, with a small, bounded queue. *If* a flow has been + * continuously sending for the entire min_rtt window, and hasn't seen an RTT + * sample that matches or decreases its min_rtt estimate for 10 seconds, then + * it briefly enters PROBE_RTT to cut inflight to a minimum value to re-probe + * the path's two-way propagation delay (min_rtt). When exiting PROBE_RTT, if + * we estimated that we reached the full bw of the pipe then we enter PROBE_BW; + * otherwise we enter STARTUP to try to fill the pipe. + * + * BBR is described in detail in: + * "BBR: Congestion-Based Congestion Control", + * Neal Cardwell, Yuchung Cheng, C. Stephen Gunn, Soheil Hassas Yeganeh, + * Van Jacobson. ACM Queue, Vol. 14 No. 5, September-October 2016. + * + * There is a public e-mail list for discussing BBR development and testing: + * https://groups.google.com/forum/#!forum/bbr-dev + * + * NOTE: BBR might be used with the fq qdisc ("man tc-fq") with pacing enabled, + * otherwise TCP stack falls back to an internal pacing using one high + * resolution timer per TCP socket and may use more resources. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +/* Scale factor for rate in pkt/uSec unit to avoid truncation in bandwidth + * estimation. The rate unit ~= (1500 bytes / 1 usec / 2^24) ~= 715 bps. + * This handles bandwidths from 0.06pps (715bps) to 256Mpps (3Tbps) in a u32. + * Since the minimum window is >=4 packets, the lower bound isn't + * an issue. The upper bound isn't an issue with existing technologies. + */ +#define BW_SCALE 24 +#define BW_UNIT (1 << BW_SCALE) + +#define BBR_SCALE 8 /* scaling factor for fractions in BBR (e.g. gains) */ +#define BBR_UNIT (1 << BBR_SCALE) + +/* BBR has the following modes for deciding how fast to send: */ +enum bbr_mode { + BBR_STARTUP, /* ramp up sending rate rapidly to fill pipe */ + BBR_DRAIN, /* drain any queue created during startup */ + BBR_PROBE_BW, /* discover, share bw: pace around estimated bw */ + BBR_PROBE_RTT, /* cut inflight to min to probe min_rtt */ +}; + +/* BBR congestion control block */ +struct bbr { + u32 min_rtt_us; /* min RTT in min_rtt_win_sec window */ + u32 min_rtt_stamp; /* timestamp of min_rtt_us */ + u32 probe_rtt_done_stamp; /* end time for BBR_PROBE_RTT mode */ + struct minmax bw; /* Max recent delivery rate in pkts/uS << 24 */ + u32 rtt_cnt; /* count of packet-timed rounds elapsed */ + u32 next_rtt_delivered; /* scb->tx.delivered at end of round */ + u64 cycle_mstamp; /* time of this cycle phase start */ + u32 mode:3, /* current bbr_mode in state machine */ + prev_ca_state:3, /* CA state on previous ACK */ + packet_conservation:1, /* use packet conservation? */ + round_start:1, /* start of packet-timed tx->ack round? */ + idle_restart:1, /* restarting after idle? */ + probe_rtt_round_done:1, /* a BBR_PROBE_RTT round at 4 pkts? */ + unused:13, + lt_is_sampling:1, /* taking long-term ("LT") samples now? */ + lt_rtt_cnt:7, /* round trips in long-term interval */ + lt_use_bw:1; /* use lt_bw as our bw estimate? */ + u32 lt_bw; /* LT est delivery rate in pkts/uS << 24 */ + u32 lt_last_delivered; /* LT intvl start: tp->delivered */ + u32 lt_last_stamp; /* LT intvl start: tp->delivered_mstamp */ + u32 lt_last_lost; /* LT intvl start: tp->lost */ + u32 pacing_gain:10, /* current gain for setting pacing rate */ + cwnd_gain:10, /* current gain for setting cwnd */ + full_bw_reached:1, /* reached full bw in Startup? */ + full_bw_cnt:2, /* number of rounds without large bw gains */ + cycle_idx:3, /* current index in pacing_gain cycle array */ + has_seen_rtt:1, /* have we seen an RTT sample yet? */ + unused_b:5; + u32 prior_cwnd; /* prior cwnd upon entering loss recovery */ + u32 full_bw; /* recent bw, to estimate if pipe is full */ + + /* For tracking ACK aggregation: */ + u64 ack_epoch_mstamp; /* start of ACK sampling epoch */ + u16 extra_acked[2]; /* max excess data ACKed in epoch */ + u32 ack_epoch_acked:20, /* packets (S)ACKed in sampling epoch */ + extra_acked_win_rtts:5, /* age of extra_acked, in round trips */ + extra_acked_win_idx:1, /* current index in extra_acked array */ + unused_c:6; +}; + +#define CYCLE_LEN 8 /* number of phases in a pacing gain cycle */ + +/* Window length of bw filter (in rounds): */ +static const int bbr_bw_rtts = CYCLE_LEN + 2; +/* Window length of min_rtt filter (in sec): */ +static const u32 bbr_min_rtt_win_sec = 10; +/* Minimum time (in ms) spent at bbr_cwnd_min_target in BBR_PROBE_RTT mode: */ +static const u32 bbr_probe_rtt_mode_ms = 200; +/* Skip TSO below the following bandwidth (bits/sec): */ +static const int bbr_min_tso_rate = 1200000; + +/* Pace at ~1% below estimated bw, on average, to reduce queue at bottleneck. + * In order to help drive the network toward lower queues and low latency while + * maintaining high utilization, the average pacing rate aims to be slightly + * lower than the estimated bandwidth. This is an important aspect of the + * design. + */ +static const int bbr_pacing_margin_percent = 1; + +/* We use a high_gain value of 2/ln(2) because it's the smallest pacing gain + * that will allow a smoothly increasing pacing rate that will double each RTT + * and send the same number of packets per RTT that an un-paced, slow-starting + * Reno or CUBIC flow would: + */ +static const int bbr_high_gain = BBR_UNIT * 2885 / 1000 + 1; +/* The pacing gain of 1/high_gain in BBR_DRAIN is calculated to typically drain + * the queue created in BBR_STARTUP in a single round: + */ +static const int bbr_drain_gain = BBR_UNIT * 1000 / 2885; +/* The gain for deriving steady-state cwnd tolerates delayed/stretched ACKs: */ +static const int bbr_cwnd_gain = BBR_UNIT * 2; +/* The pacing_gain values for the PROBE_BW gain cycle, to discover/share bw: */ +static const int bbr_pacing_gain[] = { + BBR_UNIT * 5 / 4, /* probe for more available bw */ + BBR_UNIT * 3 / 4, /* drain queue and/or yield bw to other flows */ + BBR_UNIT, BBR_UNIT, BBR_UNIT, /* cruise at 1.0*bw to utilize pipe, */ + BBR_UNIT, BBR_UNIT, BBR_UNIT /* without creating excess queue... */ +}; +/* Randomize the starting gain cycling phase over N phases: */ +static const u32 bbr_cycle_rand = 7; + +/* Try to keep at least this many packets in flight, if things go smoothly. For + * smooth functioning, a sliding window protocol ACKing every other packet + * needs at least 4 packets in flight: + */ +static const u32 bbr_cwnd_min_target = 4; + +/* To estimate if BBR_STARTUP mode (i.e. high_gain) has filled pipe... */ +/* If bw has increased significantly (1.25x), there may be more bw available: */ +static const u32 bbr_full_bw_thresh = BBR_UNIT * 5 / 4; +/* But after 3 rounds w/o significant bw growth, estimate pipe is full: */ +static const u32 bbr_full_bw_cnt = 3; + +/* "long-term" ("LT") bandwidth estimator parameters... */ +/* The minimum number of rounds in an LT bw sampling interval: */ +static const u32 bbr_lt_intvl_min_rtts = 4; +/* If lost/delivered ratio > 20%, interval is "lossy" and we may be policed: */ +static const u32 bbr_lt_loss_thresh = 50; +/* If 2 intervals have a bw ratio <= 1/8, their bw is "consistent": */ +static const u32 bbr_lt_bw_ratio = BBR_UNIT / 8; +/* If 2 intervals have a bw diff <= 4 Kbit/sec their bw is "consistent": */ +static const u32 bbr_lt_bw_diff = 4000 / 8; +/* If we estimate we're policed, use lt_bw for this many round trips: */ +static const u32 bbr_lt_bw_max_rtts = 48; + +/* Gain factor for adding extra_acked to target cwnd: */ +static const int bbr_extra_acked_gain = BBR_UNIT; +/* Window length of extra_acked window. */ +static const u32 bbr_extra_acked_win_rtts = 5; +/* Max allowed val for ack_epoch_acked, after which sampling epoch is reset */ +static const u32 bbr_ack_epoch_acked_reset_thresh = 1U << 20; +/* Time period for clamping cwnd increment due to ack aggregation */ +static const u32 bbr_extra_acked_max_us = 100 * 1000; + +static void bbr_check_probe_rtt_done(struct sock *sk); + +/* Do we estimate that STARTUP filled the pipe? */ +static bool bbr_full_bw_reached(const struct sock *sk) +{ + const struct bbr *bbr = inet_csk_ca(sk); + + return bbr->full_bw_reached; +} + +/* Return the windowed max recent bandwidth sample, in pkts/uS << BW_SCALE. */ +static u32 bbr_max_bw(const struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + return minmax_get(&bbr->bw); +} + +/* Return the estimated bandwidth of the path, in pkts/uS << BW_SCALE. */ +static u32 bbr_bw(const struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + return bbr->lt_use_bw ? bbr->lt_bw : bbr_max_bw(sk); +} + +/* Return maximum extra acked in past k-2k round trips, + * where k = bbr_extra_acked_win_rtts. + */ +static u16 bbr_extra_acked(const struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + return max(bbr->extra_acked[0], bbr->extra_acked[1]); +} + +/* Return rate in bytes per second, optionally with a gain. + * The order here is chosen carefully to avoid overflow of u64. This should + * work for input rates of up to 2.9Tbit/sec and gain of 2.89x. + */ +static u64 bbr_rate_bytes_per_sec(struct sock *sk, u64 rate, int gain) +{ + unsigned int mss = tcp_sk(sk)->mss_cache; + + rate *= mss; + rate *= gain; + rate >>= BBR_SCALE; + rate *= USEC_PER_SEC / 100 * (100 - bbr_pacing_margin_percent); + return rate >> BW_SCALE; +} + +/* Convert a BBR bw and gain factor to a pacing rate in bytes per second. */ +static unsigned long bbr_bw_to_pacing_rate(struct sock *sk, u32 bw, int gain) +{ + u64 rate = bw; + + rate = bbr_rate_bytes_per_sec(sk, rate, gain); + rate = min_t(u64, rate, READ_ONCE(sk->sk_max_pacing_rate)); + return rate; +} + +/* Initialize pacing rate to: high_gain * init_cwnd / RTT. */ +static void bbr_init_pacing_rate_from_rtt(struct sock *sk) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + u64 bw; + u32 rtt_us; + + if (tp->srtt_us) { /* any RTT sample yet? */ + rtt_us = max(tp->srtt_us >> 3, 1U); + bbr->has_seen_rtt = 1; + } else { /* no RTT sample yet */ + rtt_us = USEC_PER_MSEC; /* use nominal default RTT */ + } + bw = (u64)tcp_snd_cwnd(tp) * BW_UNIT; + do_div(bw, rtt_us); + WRITE_ONCE(sk->sk_pacing_rate, + bbr_bw_to_pacing_rate(sk, bw, bbr_high_gain)); +} + +/* Pace using current bw estimate and a gain factor. */ +static void bbr_set_pacing_rate(struct sock *sk, u32 bw, int gain) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + unsigned long rate = bbr_bw_to_pacing_rate(sk, bw, gain); + + if (unlikely(!bbr->has_seen_rtt && tp->srtt_us)) + bbr_init_pacing_rate_from_rtt(sk); + if (bbr_full_bw_reached(sk) || rate > READ_ONCE(sk->sk_pacing_rate)) + WRITE_ONCE(sk->sk_pacing_rate, rate); +} + +/* override sysctl_tcp_min_tso_segs */ +__bpf_kfunc static u32 bbr_min_tso_segs(struct sock *sk) +{ + return READ_ONCE(sk->sk_pacing_rate) < (bbr_min_tso_rate >> 3) ? 1 : 2; +} + +static u32 bbr_tso_segs_goal(struct sock *sk) +{ + struct tcp_sock *tp = tcp_sk(sk); + u32 segs, bytes; + + /* Sort of tcp_tso_autosize() but ignoring + * driver provided sk_gso_max_size. + */ + bytes = min_t(unsigned long, + READ_ONCE(sk->sk_pacing_rate) >> READ_ONCE(sk->sk_pacing_shift), + GSO_LEGACY_MAX_SIZE - 1 - MAX_TCP_HEADER); + segs = max_t(u32, bytes / tp->mss_cache, bbr_min_tso_segs(sk)); + + return min(segs, 0x7FU); +} + +/* Save "last known good" cwnd so we can restore it after losses or PROBE_RTT */ +static void bbr_save_cwnd(struct sock *sk) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + + if (bbr->prev_ca_state < TCP_CA_Recovery && bbr->mode != BBR_PROBE_RTT) + bbr->prior_cwnd = tcp_snd_cwnd(tp); /* this cwnd is good enough */ + else /* loss recovery or BBR_PROBE_RTT have temporarily cut cwnd */ + bbr->prior_cwnd = max(bbr->prior_cwnd, tcp_snd_cwnd(tp)); +} + +__bpf_kfunc static void bbr_cwnd_event(struct sock *sk, enum tcp_ca_event event) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + + if (event == CA_EVENT_TX_START && tp->app_limited) { + bbr->idle_restart = 1; + bbr->ack_epoch_mstamp = tp->tcp_mstamp; + bbr->ack_epoch_acked = 0; + /* Avoid pointless buffer overflows: pace at est. bw if we don't + * need more speed (we're restarting from idle and app-limited). + */ + if (bbr->mode == BBR_PROBE_BW) + bbr_set_pacing_rate(sk, bbr_bw(sk), BBR_UNIT); + else if (bbr->mode == BBR_PROBE_RTT) + bbr_check_probe_rtt_done(sk); + } +} + +/* Calculate bdp based on min RTT and the estimated bottleneck bandwidth: + * + * bdp = ceil(bw * min_rtt * gain) + * + * The key factor, gain, controls the amount of queue. While a small gain + * builds a smaller queue, it becomes more vulnerable to noise in RTT + * measurements (e.g., delayed ACKs or other ACK compression effects). This + * noise may cause BBR to under-estimate the rate. + */ +static u32 bbr_bdp(struct sock *sk, u32 bw, int gain) +{ + struct bbr *bbr = inet_csk_ca(sk); + u32 bdp; + u64 w; + + /* If we've never had a valid RTT sample, cap cwnd at the initial + * default. This should only happen when the connection is not using TCP + * timestamps and has retransmitted all of the SYN/SYNACK/data packets + * ACKed so far. In this case, an RTO can cut cwnd to 1, in which + * case we need to slow-start up toward something safe: TCP_INIT_CWND. + */ + if (unlikely(bbr->min_rtt_us == ~0U)) /* no valid RTT samples yet? */ + return TCP_INIT_CWND; /* be safe: cap at default initial cwnd*/ + + w = (u64)bw * bbr->min_rtt_us; + + /* Apply a gain to the given value, remove the BW_SCALE shift, and + * round the value up to avoid a negative feedback loop. + */ + bdp = (((w * gain) >> BBR_SCALE) + BW_UNIT - 1) / BW_UNIT; + + return bdp; +} + +/* To achieve full performance in high-speed paths, we budget enough cwnd to + * fit full-sized skbs in-flight on both end hosts to fully utilize the path: + * - one skb in sending host Qdisc, + * - one skb in sending host TSO/GSO engine + * - one skb being received by receiver host LRO/GRO/delayed-ACK engine + * Don't worry, at low rates (bbr_min_tso_rate) this won't bloat cwnd because + * in such cases tso_segs_goal is 1. The minimum cwnd is 4 packets, + * which allows 2 outstanding 2-packet sequences, to try to keep pipe + * full even with ACK-every-other-packet delayed ACKs. + */ +static u32 bbr_quantization_budget(struct sock *sk, u32 cwnd) +{ + struct bbr *bbr = inet_csk_ca(sk); + + /* Allow enough full-sized skbs in flight to utilize end systems. */ + cwnd += 3 * bbr_tso_segs_goal(sk); + + /* Reduce delayed ACKs by rounding up cwnd to the next even number. */ + cwnd = (cwnd + 1) & ~1U; + + /* Ensure gain cycling gets inflight above BDP even for small BDPs. */ + if (bbr->mode == BBR_PROBE_BW && bbr->cycle_idx == 0) + cwnd += 2; + + return cwnd; +} + +/* Find inflight based on min RTT and the estimated bottleneck bandwidth. */ +static u32 bbr_inflight(struct sock *sk, u32 bw, int gain) +{ + u32 inflight; + + inflight = bbr_bdp(sk, bw, gain); + inflight = bbr_quantization_budget(sk, inflight); + + return inflight; +} + +/* With pacing at lower layers, there's often less data "in the network" than + * "in flight". With TSQ and departure time pacing at lower layers (e.g. fq), + * we often have several skbs queued in the pacing layer with a pre-scheduled + * earliest departure time (EDT). BBR adapts its pacing rate based on the + * inflight level that it estimates has already been "baked in" by previous + * departure time decisions. We calculate a rough estimate of the number of our + * packets that might be in the network at the earliest departure time for the + * next skb scheduled: + * in_network_at_edt = inflight_at_edt - (EDT - now) * bw + * If we're increasing inflight, then we want to know if the transmit of the + * EDT skb will push inflight above the target, so inflight_at_edt includes + * bbr_tso_segs_goal() from the skb departing at EDT. If decreasing inflight, + * then estimate if inflight will sink too low just before the EDT transmit. + */ +static u32 bbr_packets_in_net_at_edt(struct sock *sk, u32 inflight_now) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + u64 now_ns, edt_ns, interval_us; + u32 interval_delivered, inflight_at_edt; + + now_ns = tp->tcp_clock_cache; + edt_ns = max(tp->tcp_wstamp_ns, now_ns); + interval_us = div_u64(edt_ns - now_ns, NSEC_PER_USEC); + interval_delivered = (u64)bbr_bw(sk) * interval_us >> BW_SCALE; + inflight_at_edt = inflight_now; + if (bbr->pacing_gain > BBR_UNIT) /* increasing inflight */ + inflight_at_edt += bbr_tso_segs_goal(sk); /* include EDT skb */ + if (interval_delivered >= inflight_at_edt) + return 0; + return inflight_at_edt - interval_delivered; +} + +/* Find the cwnd increment based on estimate of ack aggregation */ +static u32 bbr_ack_aggregation_cwnd(struct sock *sk) +{ + u32 max_aggr_cwnd, aggr_cwnd = 0; + + if (bbr_extra_acked_gain && bbr_full_bw_reached(sk)) { + max_aggr_cwnd = ((u64)bbr_bw(sk) * bbr_extra_acked_max_us) + / BW_UNIT; + aggr_cwnd = (bbr_extra_acked_gain * bbr_extra_acked(sk)) + >> BBR_SCALE; + aggr_cwnd = min(aggr_cwnd, max_aggr_cwnd); + } + + return aggr_cwnd; +} + +/* An optimization in BBR to reduce losses: On the first round of recovery, we + * follow the packet conservation principle: send P packets per P packets acked. + * After that, we slow-start and send at most 2*P packets per P packets acked. + * After recovery finishes, or upon undo, we restore the cwnd we had when + * recovery started (capped by the target cwnd based on estimated BDP). + * + * TODO(ycheng/ncardwell): implement a rate-based approach. + */ +static bool bbr_set_cwnd_to_recover_or_restore( + struct sock *sk, const struct rate_sample *rs, u32 acked, u32 *new_cwnd) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + u8 prev_state = bbr->prev_ca_state, state = inet_csk(sk)->icsk_ca_state; + u32 cwnd = tcp_snd_cwnd(tp); + + /* An ACK for P pkts should release at most 2*P packets. We do this + * in two steps. First, here we deduct the number of lost packets. + * Then, in bbr_set_cwnd() we slow start up toward the target cwnd. + */ + if (rs->losses > 0) + cwnd = max_t(s32, cwnd - rs->losses, 1); + + if (state == TCP_CA_Recovery && prev_state != TCP_CA_Recovery) { + /* Starting 1st round of Recovery, so do packet conservation. */ + bbr->packet_conservation = 1; + bbr->next_rtt_delivered = tp->delivered; /* start round now */ + /* Cut unused cwnd from app behavior, TSQ, or TSO deferral: */ + cwnd = tcp_packets_in_flight(tp) + acked; + } else if (prev_state >= TCP_CA_Recovery && state < TCP_CA_Recovery) { + /* Exiting loss recovery; restore cwnd saved before recovery. */ + cwnd = max(cwnd, bbr->prior_cwnd); + bbr->packet_conservation = 0; + } + bbr->prev_ca_state = state; + + if (bbr->packet_conservation) { + *new_cwnd = max(cwnd, tcp_packets_in_flight(tp) + acked); + return true; /* yes, using packet conservation */ + } + *new_cwnd = cwnd; + return false; +} + +/* Slow-start up toward target cwnd (if bw estimate is growing, or packet loss + * has drawn us down below target), or snap down to target if we're above it. + */ +static void bbr_set_cwnd(struct sock *sk, const struct rate_sample *rs, + u32 acked, u32 bw, int gain) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + u32 cwnd = tcp_snd_cwnd(tp), target_cwnd = 0; + + if (!acked) + goto done; /* no packet fully ACKed; just apply caps */ + + if (bbr_set_cwnd_to_recover_or_restore(sk, rs, acked, &cwnd)) + goto done; + + target_cwnd = bbr_bdp(sk, bw, gain); + + /* Increment the cwnd to account for excess ACKed data that seems + * due to aggregation (of data and/or ACKs) visible in the ACK stream. + */ + target_cwnd += bbr_ack_aggregation_cwnd(sk); + target_cwnd = bbr_quantization_budget(sk, target_cwnd); + + /* If we're below target cwnd, slow start cwnd toward target cwnd. */ + if (bbr_full_bw_reached(sk)) /* only cut cwnd if we filled the pipe */ + cwnd = min(cwnd + acked, target_cwnd); + else if (cwnd < target_cwnd || tp->delivered < TCP_INIT_CWND) + cwnd = cwnd + acked; + cwnd = max(cwnd, bbr_cwnd_min_target); + +done: + tcp_snd_cwnd_set(tp, min(cwnd, tp->snd_cwnd_clamp)); /* apply global cap */ + if (bbr->mode == BBR_PROBE_RTT) /* drain queue, refresh min_rtt */ + tcp_snd_cwnd_set(tp, min(tcp_snd_cwnd(tp), bbr_cwnd_min_target)); +} + +/* End cycle phase if it's time and/or we hit the phase's in-flight target. */ +static bool bbr_is_next_cycle_phase(struct sock *sk, + const struct rate_sample *rs) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + bool is_full_length = + tcp_stamp_us_delta(tp->delivered_mstamp, bbr->cycle_mstamp) > + bbr->min_rtt_us; + u32 inflight, bw; + + /* The pacing_gain of 1.0 paces at the estimated bw to try to fully + * use the pipe without increasing the queue. + */ + if (bbr->pacing_gain == BBR_UNIT) + return is_full_length; /* just use wall clock time */ + + inflight = bbr_packets_in_net_at_edt(sk, rs->prior_in_flight); + bw = bbr_max_bw(sk); + + /* A pacing_gain > 1.0 probes for bw by trying to raise inflight to at + * least pacing_gain*BDP; this may take more than min_rtt if min_rtt is + * small (e.g. on a LAN). We do not persist if packets are lost, since + * a path with small buffers may not hold that much. + */ + if (bbr->pacing_gain > BBR_UNIT) + return is_full_length && + (rs->losses || /* perhaps pacing_gain*BDP won't fit */ + inflight >= bbr_inflight(sk, bw, bbr->pacing_gain)); + + /* A pacing_gain < 1.0 tries to drain extra queue we added if bw + * probing didn't find more bw. If inflight falls to match BDP then we + * estimate queue is drained; persisting would underutilize the pipe. + */ + return is_full_length || + inflight <= bbr_inflight(sk, bw, BBR_UNIT); +} + +static void bbr_advance_cycle_phase(struct sock *sk) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + + bbr->cycle_idx = (bbr->cycle_idx + 1) & (CYCLE_LEN - 1); + bbr->cycle_mstamp = tp->delivered_mstamp; +} + +/* Gain cycling: cycle pacing gain to converge to fair share of available bw. */ +static void bbr_update_cycle_phase(struct sock *sk, + const struct rate_sample *rs) +{ + struct bbr *bbr = inet_csk_ca(sk); + + if (bbr->mode == BBR_PROBE_BW && bbr_is_next_cycle_phase(sk, rs)) + bbr_advance_cycle_phase(sk); +} + +static void bbr_reset_startup_mode(struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + bbr->mode = BBR_STARTUP; +} + +static void bbr_reset_probe_bw_mode(struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + bbr->mode = BBR_PROBE_BW; + bbr->cycle_idx = CYCLE_LEN - 1 - get_random_u32_below(bbr_cycle_rand); + bbr_advance_cycle_phase(sk); /* flip to next phase of gain cycle */ +} + +static void bbr_reset_mode(struct sock *sk) +{ + if (!bbr_full_bw_reached(sk)) + bbr_reset_startup_mode(sk); + else + bbr_reset_probe_bw_mode(sk); +} + +/* Start a new long-term sampling interval. */ +static void bbr_reset_lt_bw_sampling_interval(struct sock *sk) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + + bbr->lt_last_stamp = div_u64(tp->delivered_mstamp, USEC_PER_MSEC); + bbr->lt_last_delivered = tp->delivered; + bbr->lt_last_lost = tp->lost; + bbr->lt_rtt_cnt = 0; +} + +/* Completely reset long-term bandwidth sampling. */ +static void bbr_reset_lt_bw_sampling(struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + bbr->lt_bw = 0; + bbr->lt_use_bw = 0; + bbr->lt_is_sampling = false; + bbr_reset_lt_bw_sampling_interval(sk); +} + +/* Long-term bw sampling interval is done. Estimate whether we're policed. */ +static void bbr_lt_bw_interval_done(struct sock *sk, u32 bw) +{ + struct bbr *bbr = inet_csk_ca(sk); + u32 diff; + + if (bbr->lt_bw) { /* do we have bw from a previous interval? */ + /* Is new bw close to the lt_bw from the previous interval? */ + diff = abs(bw - bbr->lt_bw); + if ((diff * BBR_UNIT <= bbr_lt_bw_ratio * bbr->lt_bw) || + (bbr_rate_bytes_per_sec(sk, diff, BBR_UNIT) <= + bbr_lt_bw_diff)) { + /* All criteria are met; estimate we're policed. */ + bbr->lt_bw = (bw + bbr->lt_bw) >> 1; /* avg 2 intvls */ + bbr->lt_use_bw = 1; + bbr->pacing_gain = BBR_UNIT; /* try to avoid drops */ + bbr->lt_rtt_cnt = 0; + return; + } + } + bbr->lt_bw = bw; + bbr_reset_lt_bw_sampling_interval(sk); +} + +/* Token-bucket traffic policers are common (see "An Internet-Wide Analysis of + * Traffic Policing", SIGCOMM 2016). BBR detects token-bucket policers and + * explicitly models their policed rate, to reduce unnecessary losses. We + * estimate that we're policed if we see 2 consecutive sampling intervals with + * consistent throughput and high packet loss. If we think we're being policed, + * set lt_bw to the "long-term" average delivery rate from those 2 intervals. + */ +static void bbr_lt_bw_sampling(struct sock *sk, const struct rate_sample *rs) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + u32 lost, delivered; + u64 bw; + u32 t; + + if (bbr->lt_use_bw) { /* already using long-term rate, lt_bw? */ + if (bbr->mode == BBR_PROBE_BW && bbr->round_start && + ++bbr->lt_rtt_cnt >= bbr_lt_bw_max_rtts) { + bbr_reset_lt_bw_sampling(sk); /* stop using lt_bw */ + bbr_reset_probe_bw_mode(sk); /* restart gain cycling */ + } + return; + } + + /* Wait for the first loss before sampling, to let the policer exhaust + * its tokens and estimate the steady-state rate allowed by the policer. + * Starting samples earlier includes bursts that over-estimate the bw. + */ + if (!bbr->lt_is_sampling) { + if (!rs->losses) + return; + bbr_reset_lt_bw_sampling_interval(sk); + bbr->lt_is_sampling = true; + } + + /* To avoid underestimates, reset sampling if we run out of data. */ + if (rs->is_app_limited) { + bbr_reset_lt_bw_sampling(sk); + return; + } + + if (bbr->round_start) + bbr->lt_rtt_cnt++; /* count round trips in this interval */ + if (bbr->lt_rtt_cnt < bbr_lt_intvl_min_rtts) + return; /* sampling interval needs to be longer */ + if (bbr->lt_rtt_cnt > 4 * bbr_lt_intvl_min_rtts) { + bbr_reset_lt_bw_sampling(sk); /* interval is too long */ + return; + } + + /* End sampling interval when a packet is lost, so we estimate the + * policer tokens were exhausted. Stopping the sampling before the + * tokens are exhausted under-estimates the policed rate. + */ + if (!rs->losses) + return; + + /* Calculate packets lost and delivered in sampling interval. */ + lost = tp->lost - bbr->lt_last_lost; + delivered = tp->delivered - bbr->lt_last_delivered; + /* Is loss rate (lost/delivered) >= lt_loss_thresh? If not, wait. */ + if (!delivered || (lost << BBR_SCALE) < bbr_lt_loss_thresh * delivered) + return; + + /* Find average delivery rate in this sampling interval. */ + t = div_u64(tp->delivered_mstamp, USEC_PER_MSEC) - bbr->lt_last_stamp; + if ((s32)t < 1) + return; /* interval is less than one ms, so wait */ + /* Check if can multiply without overflow */ + if (t >= ~0U / USEC_PER_MSEC) { + bbr_reset_lt_bw_sampling(sk); /* interval too long; reset */ + return; + } + t *= USEC_PER_MSEC; + bw = (u64)delivered * BW_UNIT; + do_div(bw, t); + bbr_lt_bw_interval_done(sk, bw); +} + +/* Estimate the bandwidth based on how fast packets are delivered */ +static void bbr_update_bw(struct sock *sk, const struct rate_sample *rs) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + u64 bw; + + bbr->round_start = 0; + if (rs->delivered < 0 || rs->interval_us <= 0) + return; /* Not a valid observation */ + + /* See if we've reached the next RTT */ + if (!before(rs->prior_delivered, bbr->next_rtt_delivered)) { + bbr->next_rtt_delivered = tp->delivered; + bbr->rtt_cnt++; + bbr->round_start = 1; + bbr->packet_conservation = 0; + } + + bbr_lt_bw_sampling(sk, rs); + + /* Divide delivered by the interval to find a (lower bound) bottleneck + * bandwidth sample. Delivered is in packets and interval_us in uS and + * ratio will be <<1 for most connections. So delivered is first scaled. + */ + bw = div64_long((u64)rs->delivered * BW_UNIT, rs->interval_us); + + /* If this sample is application-limited, it is likely to have a very + * low delivered count that represents application behavior rather than + * the available network rate. Such a sample could drag down estimated + * bw, causing needless slow-down. Thus, to continue to send at the + * last measured network rate, we filter out app-limited samples unless + * they describe the path bw at least as well as our bw model. + * + * So the goal during app-limited phase is to proceed with the best + * network rate no matter how long. We automatically leave this + * phase when app writes faster than the network can deliver :) + */ + if (!rs->is_app_limited || bw >= bbr_max_bw(sk)) { + /* Incorporate new sample into our max bw filter. */ + minmax_running_max(&bbr->bw, bbr_bw_rtts, bbr->rtt_cnt, bw); + } +} + +/* Estimates the windowed max degree of ack aggregation. + * This is used to provision extra in-flight data to keep sending during + * inter-ACK silences. + * + * Degree of ack aggregation is estimated as extra data acked beyond expected. + * + * max_extra_acked = "maximum recent excess data ACKed beyond max_bw * interval" + * cwnd += max_extra_acked + * + * Max extra_acked is clamped by cwnd and bw * bbr_extra_acked_max_us (100 ms). + * Max filter is an approximate sliding window of 5-10 (packet timed) round + * trips. + */ +static void bbr_update_ack_aggregation(struct sock *sk, + const struct rate_sample *rs) +{ + u32 epoch_us, expected_acked, extra_acked; + struct bbr *bbr = inet_csk_ca(sk); + struct tcp_sock *tp = tcp_sk(sk); + + if (!bbr_extra_acked_gain || rs->acked_sacked <= 0 || + rs->delivered < 0 || rs->interval_us <= 0) + return; + + if (bbr->round_start) { + bbr->extra_acked_win_rtts = min(0x1F, + bbr->extra_acked_win_rtts + 1); + if (bbr->extra_acked_win_rtts >= bbr_extra_acked_win_rtts) { + bbr->extra_acked_win_rtts = 0; + bbr->extra_acked_win_idx = bbr->extra_acked_win_idx ? + 0 : 1; + bbr->extra_acked[bbr->extra_acked_win_idx] = 0; + } + } + + /* Compute how many packets we expected to be delivered over epoch. */ + epoch_us = tcp_stamp_us_delta(tp->delivered_mstamp, + bbr->ack_epoch_mstamp); + expected_acked = ((u64)bbr_bw(sk) * epoch_us) / BW_UNIT; + + /* Reset the aggregation epoch if ACK rate is below expected rate or + * significantly large no. of ack received since epoch (potentially + * quite old epoch). + */ + if (bbr->ack_epoch_acked <= expected_acked || + (bbr->ack_epoch_acked + rs->acked_sacked >= + bbr_ack_epoch_acked_reset_thresh)) { + bbr->ack_epoch_acked = 0; + bbr->ack_epoch_mstamp = tp->delivered_mstamp; + expected_acked = 0; + } + + /* Compute excess data delivered, beyond what was expected. */ + bbr->ack_epoch_acked = min_t(u32, 0xFFFFF, + bbr->ack_epoch_acked + rs->acked_sacked); + extra_acked = bbr->ack_epoch_acked - expected_acked; + extra_acked = min(extra_acked, tcp_snd_cwnd(tp)); + if (extra_acked > bbr->extra_acked[bbr->extra_acked_win_idx]) + bbr->extra_acked[bbr->extra_acked_win_idx] = extra_acked; +} + +/* Estimate when the pipe is full, using the change in delivery rate: BBR + * estimates that STARTUP filled the pipe if the estimated bw hasn't changed by + * at least bbr_full_bw_thresh (25%) after bbr_full_bw_cnt (3) non-app-limited + * rounds. Why 3 rounds: 1: rwin autotuning grows the rwin, 2: we fill the + * higher rwin, 3: we get higher delivery rate samples. Or transient + * cross-traffic or radio noise can go away. CUBIC Hystart shares a similar + * design goal, but uses delay and inter-ACK spacing instead of bandwidth. + */ +static void bbr_check_full_bw_reached(struct sock *sk, + const struct rate_sample *rs) +{ + struct bbr *bbr = inet_csk_ca(sk); + u32 bw_thresh; + + if (bbr_full_bw_reached(sk) || !bbr->round_start || rs->is_app_limited) + return; + + bw_thresh = (u64)bbr->full_bw * bbr_full_bw_thresh >> BBR_SCALE; + if (bbr_max_bw(sk) >= bw_thresh) { + bbr->full_bw = bbr_max_bw(sk); + bbr->full_bw_cnt = 0; + return; + } + ++bbr->full_bw_cnt; + bbr->full_bw_reached = bbr->full_bw_cnt >= bbr_full_bw_cnt; +} + +/* If pipe is probably full, drain the queue and then enter steady-state. */ +static void bbr_check_drain(struct sock *sk, const struct rate_sample *rs) +{ + struct bbr *bbr = inet_csk_ca(sk); + + if (bbr->mode == BBR_STARTUP && bbr_full_bw_reached(sk)) { + bbr->mode = BBR_DRAIN; /* drain queue we created */ + tcp_sk(sk)->snd_ssthresh = + bbr_inflight(sk, bbr_max_bw(sk), BBR_UNIT); + } /* fall through to check if in-flight is already small: */ + if (bbr->mode == BBR_DRAIN && + bbr_packets_in_net_at_edt(sk, tcp_packets_in_flight(tcp_sk(sk))) <= + bbr_inflight(sk, bbr_max_bw(sk), BBR_UNIT)) + bbr_reset_probe_bw_mode(sk); /* we estimate queue is drained */ +} + +static void bbr_check_probe_rtt_done(struct sock *sk) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + + if (!(bbr->probe_rtt_done_stamp && + after(tcp_jiffies32, bbr->probe_rtt_done_stamp))) + return; + + bbr->min_rtt_stamp = tcp_jiffies32; /* wait a while until PROBE_RTT */ + tcp_snd_cwnd_set(tp, max(tcp_snd_cwnd(tp), bbr->prior_cwnd)); + bbr_reset_mode(sk); +} + +/* The goal of PROBE_RTT mode is to have BBR flows cooperatively and + * periodically drain the bottleneck queue, to converge to measure the true + * min_rtt (unloaded propagation delay). This allows the flows to keep queues + * small (reducing queuing delay and packet loss) and achieve fairness among + * BBR flows. + * + * The min_rtt filter window is 10 seconds. When the min_rtt estimate expires, + * we enter PROBE_RTT mode and cap the cwnd at bbr_cwnd_min_target=4 packets. + * After at least bbr_probe_rtt_mode_ms=200ms and at least one packet-timed + * round trip elapsed with that flight size <= 4, we leave PROBE_RTT mode and + * re-enter the previous mode. BBR uses 200ms to approximately bound the + * performance penalty of PROBE_RTT's cwnd capping to roughly 2% (200ms/10s). + * + * Note that flows need only pay 2% if they are busy sending over the last 10 + * seconds. Interactive applications (e.g., Web, RPCs, video chunks) often have + * natural silences or low-rate periods within 10 seconds where the rate is low + * enough for long enough to drain its queue in the bottleneck. We pick up + * these min RTT measurements opportunistically with our min_rtt filter. :-) + */ +static void bbr_update_min_rtt(struct sock *sk, const struct rate_sample *rs) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + bool filter_expired; + + /* Track min RTT seen in the min_rtt_win_sec filter window: */ + filter_expired = after(tcp_jiffies32, + bbr->min_rtt_stamp + bbr_min_rtt_win_sec * HZ); + if (rs->rtt_us >= 0 && + (rs->rtt_us < bbr->min_rtt_us || + (filter_expired && !rs->is_ack_delayed))) { + bbr->min_rtt_us = rs->rtt_us; + bbr->min_rtt_stamp = tcp_jiffies32; + } + + if (bbr_probe_rtt_mode_ms > 0 && filter_expired && + !bbr->idle_restart && bbr->mode != BBR_PROBE_RTT) { + bbr->mode = BBR_PROBE_RTT; /* dip, drain queue */ + bbr_save_cwnd(sk); /* note cwnd so we can restore it */ + bbr->probe_rtt_done_stamp = 0; + } + + if (bbr->mode == BBR_PROBE_RTT) { + /* Ignore low rate samples during this mode. */ + tp->app_limited = + (tp->delivered + tcp_packets_in_flight(tp)) ? : 1; + /* Maintain min packets in flight for max(200 ms, 1 round). */ + if (!bbr->probe_rtt_done_stamp && + tcp_packets_in_flight(tp) <= bbr_cwnd_min_target) { + bbr->probe_rtt_done_stamp = tcp_jiffies32 + + msecs_to_jiffies(bbr_probe_rtt_mode_ms); + bbr->probe_rtt_round_done = 0; + bbr->next_rtt_delivered = tp->delivered; + } else if (bbr->probe_rtt_done_stamp) { + if (bbr->round_start) + bbr->probe_rtt_round_done = 1; + if (bbr->probe_rtt_round_done) + bbr_check_probe_rtt_done(sk); + } + } + /* Restart after idle ends only once we process a new S/ACK for data */ + if (rs->delivered > 0) + bbr->idle_restart = 0; +} + +static void bbr_update_gains(struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + switch (bbr->mode) { + case BBR_STARTUP: + bbr->pacing_gain = bbr_high_gain; + bbr->cwnd_gain = bbr_high_gain; + break; + case BBR_DRAIN: + bbr->pacing_gain = bbr_drain_gain; /* slow, to drain */ + bbr->cwnd_gain = bbr_high_gain; /* keep cwnd */ + break; + case BBR_PROBE_BW: + bbr->pacing_gain = (bbr->lt_use_bw ? + BBR_UNIT : + bbr_pacing_gain[bbr->cycle_idx]); + bbr->cwnd_gain = bbr_cwnd_gain; + break; + case BBR_PROBE_RTT: + bbr->pacing_gain = BBR_UNIT; + bbr->cwnd_gain = BBR_UNIT; + break; + default: + WARN_ONCE(1, "BBR bad mode: %u\n", bbr->mode); + break; + } +} + +static void bbr_update_model(struct sock *sk, const struct rate_sample *rs) +{ + bbr_update_bw(sk, rs); + bbr_update_ack_aggregation(sk, rs); + bbr_update_cycle_phase(sk, rs); + bbr_check_full_bw_reached(sk, rs); + bbr_check_drain(sk, rs); + bbr_update_min_rtt(sk, rs); + bbr_update_gains(sk); +} + +__bpf_kfunc static void bbr_main(struct sock *sk, u32 ack, int flag, const struct rate_sample *rs) +{ + struct bbr *bbr = inet_csk_ca(sk); + u32 bw; + + bbr_update_model(sk, rs); + + bw = bbr_bw(sk); + bbr_set_pacing_rate(sk, bw, bbr->pacing_gain); + bbr_set_cwnd(sk, rs, rs->acked_sacked, bw, bbr->cwnd_gain); +} + +__bpf_kfunc static void bbr_init(struct sock *sk) +{ + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + + bbr->prior_cwnd = 0; + tp->snd_ssthresh = TCP_INFINITE_SSTHRESH; + bbr->rtt_cnt = 0; + bbr->next_rtt_delivered = tp->delivered; + bbr->prev_ca_state = TCP_CA_Open; + bbr->packet_conservation = 0; + + bbr->probe_rtt_done_stamp = 0; + bbr->probe_rtt_round_done = 0; + bbr->min_rtt_us = tcp_min_rtt(tp); + bbr->min_rtt_stamp = tcp_jiffies32; + + minmax_reset(&bbr->bw, bbr->rtt_cnt, 0); /* init max bw to 0 */ + + bbr->has_seen_rtt = 0; + bbr_init_pacing_rate_from_rtt(sk); + + bbr->round_start = 0; + bbr->idle_restart = 0; + bbr->full_bw_reached = 0; + bbr->full_bw = 0; + bbr->full_bw_cnt = 0; + bbr->cycle_mstamp = 0; + bbr->cycle_idx = 0; + bbr_reset_lt_bw_sampling(sk); + bbr_reset_startup_mode(sk); + + bbr->ack_epoch_mstamp = tp->tcp_mstamp; + bbr->ack_epoch_acked = 0; + bbr->extra_acked_win_rtts = 0; + bbr->extra_acked_win_idx = 0; + bbr->extra_acked[0] = 0; + bbr->extra_acked[1] = 0; + + cmpxchg(&sk->sk_pacing_status, SK_PACING_NONE, SK_PACING_NEEDED); +} + +__bpf_kfunc static u32 bbr_sndbuf_expand(struct sock *sk) +{ + /* Provision 3 * cwnd since BBR may slow-start even during recovery. */ + return 3; +} + +/* In theory BBR does not need to undo the cwnd since it does not + * always reduce cwnd on losses (see bbr_main()). Keep it for now. + */ +__bpf_kfunc static u32 bbr_undo_cwnd(struct sock *sk) +{ + struct bbr *bbr = inet_csk_ca(sk); + + bbr->full_bw = 0; /* spurious slow-down; reset full pipe detection */ + bbr->full_bw_cnt = 0; + bbr_reset_lt_bw_sampling(sk); + return tcp_snd_cwnd(tcp_sk(sk)); +} + +/* Entering loss recovery, so save cwnd for when we exit or undo recovery. */ +__bpf_kfunc static u32 bbr_ssthresh(struct sock *sk) +{ + bbr_save_cwnd(sk); + return tcp_sk(sk)->snd_ssthresh; +} + +static size_t bbr_get_info(struct sock *sk, u32 ext, int *attr, + union tcp_cc_info *info) +{ + if (ext & (1 << (INET_DIAG_BBRINFO - 1)) || + ext & (1 << (INET_DIAG_VEGASINFO - 1))) { + struct tcp_sock *tp = tcp_sk(sk); + struct bbr *bbr = inet_csk_ca(sk); + u64 bw = bbr_bw(sk); + + bw = bw * tp->mss_cache * USEC_PER_SEC >> BW_SCALE; + memset(&info->bbr, 0, sizeof(info->bbr)); + info->bbr.bbr_bw_lo = (u32)bw; + info->bbr.bbr_bw_hi = (u32)(bw >> 32); + info->bbr.bbr_min_rtt = bbr->min_rtt_us; + info->bbr.bbr_pacing_gain = bbr->pacing_gain; + info->bbr.bbr_cwnd_gain = bbr->cwnd_gain; + *attr = INET_DIAG_BBRINFO; + return sizeof(info->bbr); + } + return 0; +} + +__bpf_kfunc static void bbr_set_state(struct sock *sk, u8 new_state) +{ + struct bbr *bbr = inet_csk_ca(sk); + + if (new_state == TCP_CA_Loss) { + struct rate_sample rs = { .losses = 1 }; + + bbr->prev_ca_state = TCP_CA_Loss; + bbr->full_bw = 0; + bbr->round_start = 1; /* treat RTO like end of a round */ + bbr_lt_bw_sampling(sk, &rs); + } +} + +static struct tcp_congestion_ops tcp_bbr_cong_ops __read_mostly = { + .flags = TCP_CONG_NON_RESTRICTED, + .name = "bbr", + .owner = THIS_MODULE, + .init = bbr_init, + .cong_control = bbr_main, + .sndbuf_expand = bbr_sndbuf_expand, + .undo_cwnd = bbr_undo_cwnd, + .cwnd_event = bbr_cwnd_event, + .ssthresh = bbr_ssthresh, + .min_tso_segs = bbr_min_tso_segs, + .get_info = bbr_get_info, + .set_state = bbr_set_state, +}; + +BTF_KFUNCS_START(tcp_bbr_check_kfunc_ids) +BTF_ID_FLAGS(func, bbr_init) +BTF_ID_FLAGS(func, bbr_main) +BTF_ID_FLAGS(func, bbr_sndbuf_expand) +BTF_ID_FLAGS(func, bbr_undo_cwnd) +BTF_ID_FLAGS(func, bbr_cwnd_event) +BTF_ID_FLAGS(func, bbr_ssthresh) +BTF_ID_FLAGS(func, bbr_min_tso_segs) +BTF_ID_FLAGS(func, bbr_set_state) +BTF_KFUNCS_END(tcp_bbr_check_kfunc_ids) + +static const struct btf_kfunc_id_set tcp_bbr_kfunc_set = { + .owner = THIS_MODULE, + .set = &tcp_bbr_check_kfunc_ids, +}; + +static int __init bbr_register(void) +{ + int ret; + + BUILD_BUG_ON(sizeof(struct bbr) > ICSK_CA_PRIV_SIZE); + + ret = register_btf_kfunc_id_set(BPF_PROG_TYPE_STRUCT_OPS, &tcp_bbr_kfunc_set); + if (ret < 0) + return ret; + return tcp_register_congestion_control(&tcp_bbr_cong_ops); +} + +static void __exit bbr_unregister(void) +{ + tcp_unregister_congestion_control(&tcp_bbr_cong_ops); +} + +module_init(bbr_register); +module_exit(bbr_unregister); + +MODULE_AUTHOR("Van Jacobson "); +MODULE_AUTHOR("Neal Cardwell "); +MODULE_AUTHOR("Yuchung Cheng "); +MODULE_AUTHOR("Soheil Hassas Yeganeh "); +MODULE_LICENSE("Dual BSD/GPL"); +MODULE_DESCRIPTION("TCP BBR (Bottleneck Bandwidth and RTT)"); \ No newline at end of file