Skip to content

Commit c43d954

Browse files
committed
feat(dataplane): pipeline rework for pkt injection
Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
1 parent b292b70 commit c43d954

File tree

6 files changed

+65
-59
lines changed

6 files changed

+65
-59
lines changed

dataplane/src/packet_processor/egress.rs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@
44
//! Implements an egress stage
55
66
use std::net::IpAddr;
7+
#[allow(unused)]
78
use tracing::{debug, error, trace, warn};
89

9-
use net::eth::Eth;
1010
use net::eth::ethtype::EthType;
1111
use net::eth::mac::{DestinationMac, SourceMac};
1212
use net::{
1313
buffer::PacketBufferMut,
1414
headers::{TryIpv4, TryIpv6},
1515
};
16+
use net::{eth::Eth, headers::TryIp};
1617

1718
use net::headers::TryEthMut;
1819
use net::interface::InterfaceIndex;
@@ -131,10 +132,8 @@ impl Egress {
131132
/* do lookup on the adjacency table */
132133
let Some(adj) = atable.get_adjacency(addr, ifindex) else {
133134
warn!("{nfi}: missing L2 info for {addr}");
134-
135-
/* Todo: Trigger ARP */
136-
137-
packet.done(DoneReason::MissL2resolution);
135+
packet.get_meta_mut().set_need_arp_nd(true);
136+
packet.done(DoneReason::MissL2resolution); // temporary until ARP/ND ready
138137
return None;
139138
};
140139
/* get the mac from the adjacency */
@@ -152,25 +151,23 @@ impl Egress {
152151
}
153152
}
154153

155-
fn resolve_next_mac<Buf: PacketBufferMut>(
154+
fn resolve_next_mac_ip<Buf: PacketBufferMut>(
156155
&self,
157156
ifindex: InterfaceIndex,
157+
dst_ip: IpAddr,
158158
packet: &mut Packet<Buf>,
159159
) -> Option<DestinationMac> {
160-
let nfi = &self.name;
161160
// if packet was annotated with a next-hop address, try to resolve it using the
162161
// adjacency table. Otherwise, that means that the packet is directly connected
163162
// to us (on the same subnet). So, fetch the destination IP address and try to
164163
// resolve it with the adjacency table as well. If that fails, that's where the
165164
// ARP/ND would need to be triggered.
166-
if let Some(nh_addr) = packet.get_meta().nh_addr {
167-
self.get_adj_mac(packet, nh_addr, ifindex)
168-
} else if let Some(destination) = packet.ip_destination() {
169-
self.get_adj_mac(packet, destination, ifindex)
165+
let target = if let Some(nh_addr) = packet.get_meta().nh_addr {
166+
nh_addr
170167
} else {
171-
warn!("{nfi}: could not determine packet destination IP address");
172-
None
173-
}
168+
dst_ip
169+
};
170+
self.get_adj_mac(packet, target, ifindex)
174171
}
175172

176173
#[inline]
@@ -180,11 +177,27 @@ impl Egress {
180177
packet.done(DoneReason::RouteFailure);
181178
return;
182179
};
180+
let mut dst_mac = None;
181+
182+
/* if Ipv4/Ipv6, resolve destination mac. We can't rely on the current Eth mac since
183+
we may have not overwritten it. FIXME(fredi): we should probably do that */
184+
if packet.try_ip().is_some() {
185+
let Some(dst_ip) = packet.ip_destination() else {
186+
warn!("Failed to retrieve packet destination IP address");
187+
packet.done(DoneReason::Malformed);
188+
return;
189+
};
190+
/* resolve destination mac */
191+
dst_mac = self.resolve_next_mac_ip(oif, dst_ip, packet);
192+
if dst_mac.is_none() {
193+
warn!("Failed to resolve mac for packet to {dst_ip}");
194+
return;
195+
}
196+
}
183197

184-
/* resolve destination mac */
185-
let Some(dst_mac) = self.resolve_next_mac(oif, packet) else {
186-
// we could not figure out the destination MAC.
187-
// resolve_next_mac() already calls packet.done()
198+
/* if frame is non-ip and we don't know the dst mac, we drop the packet for the time being */
199+
let Some(dst_mac) = dst_mac else {
200+
packet.done(DoneReason::MissL2resolution);
188201
return;
189202
};
190203

dataplane/src/packet_processor/ingress.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ impl Ingress {
4848
let ifname = &interface.name;
4949
match &interface.attachment {
5050
Some(Attachment::VRF(fibkey)) => {
51+
// we should actually drop this
5152
if packet.try_ip().is_none() {
52-
warn!("{nfi}: Processing of non-ip traffic on {ifname} is not supported");
53-
packet.done(DoneReason::NotIp);
53+
packet.get_meta_mut().set_local(true);
54+
packet.done(DoneReason::Delivered);
5455
return;
5556
}
5657
let vrfid = fibkey.as_u32();
@@ -86,18 +87,21 @@ impl Ingress {
8687
}
8788

8889
#[tracing::instrument(level = "trace")]
89-
fn interface_ingress_eth_bcast<Buf: PacketBufferMut>(
90+
fn interface_ingress_eth_bcast_mcast<Buf: PacketBufferMut>(
9091
&self,
91-
interface: &Interface,
92+
_interface: &Interface,
9293
packet: &mut Packet<Buf>,
9394
) {
94-
let nfi = self.name();
9595
packet.get_meta_mut().set_l2bcast(true);
96-
packet.done(DoneReason::Unhandled);
97-
warn!(
98-
"{nfi}: Processing of broadcast ethernet frames is not supported (interface {ifname})",
99-
ifname = interface.name
100-
);
96+
packet.get_meta_mut().set_local(true);
97+
if packet.try_ip().is_none()
98+
|| packet
99+
.ip_destination()
100+
.unwrap_or_else(|| unreachable!())
101+
.is_multicast()
102+
{
103+
packet.done(DoneReason::Delivered);
104+
}
101105
}
102106

103107
#[tracing::instrument(level = "trace")]
@@ -116,8 +120,8 @@ impl Ingress {
116120
None => packet.done(DoneReason::NotEthernet),
117121
Some(eth) => {
118122
let dmac = eth.destination().inner();
119-
if dmac.is_broadcast() {
120-
self.interface_ingress_eth_bcast(interface, packet);
123+
if dmac.is_broadcast() || dmac.is_multicast() {
124+
self.interface_ingress_eth_bcast_mcast(interface, packet);
121125
} else if dmac == if_mac {
122126
self.interface_ingress_eth_ucast_local(interface, packet);
123127
} else {
@@ -126,7 +130,7 @@ impl Ingress {
126130
}
127131
}
128132
} else {
129-
unreachable!();
133+
unreachable!("We should not get packet without mac");
130134
}
131135
}
132136

@@ -164,9 +168,7 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for Ingress {
164168
if !packet.is_done() {
165169
if let Some(iftable) = self.iftr.enter() {
166170
match packet.get_meta().iif {
167-
None => {
168-
warn!("no incoming interface for packet");
169-
}
171+
None => warn!("Packet has no iff"),
170172
Some(iif) => match iftable.get_interface(iif) {
171173
None => {
172174
warn!("{nfi}: unknown incoming interface {iif}");
@@ -177,6 +179,8 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for Ingress {
177179
}
178180
},
179181
}
182+
} else {
183+
warn!("Unable to read interface table!");
180184
}
181185
}
182186
packet.enforce()

dataplane/src/packet_processor/ipforward.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,9 @@ impl IpForwarder {
144144
packet.done(DoneReason::Malformed);
145145
}
146146
None => {
147-
/* send to kernel, among other options */
148147
debug!("Packet should be delivered to kernel...");
149-
/*
150-
We can't re-inject packet on ingress, so let's disable this to avoid churn
151-
packet.get_meta_mut().oif = Some(packet.get_meta().iif);
152-
*/
153-
packet.done(DoneReason::Unhandled);
148+
packet.get_meta_mut().set_local(true);
149+
packet.done(DoneReason::Delivered);
154150
}
155151
}
156152
}
@@ -383,7 +379,7 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for IpForwarder {
383379
let vrfid = packet.get_meta_mut().vrf.take();
384380
if let Some(vrfid) = vrfid {
385381
self.forward_packet(&mut packet, vrfid);
386-
} else {
382+
} else if !packet.get_meta().local() {
387383
warn!("{}: missing information to handle packet", self.name);
388384
}
389385
}

dataplane/src/packet_processor/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,11 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
7373
let natallocator_factory = natallocatorw.get_reader_factory();
7474

7575
// build pkt io stages
76-
let pktio_ip: PktIo<Buf> = PktIo::new(0, 10_000usize).set_name("pkt-io-ip ");
77-
let pktio_egress: PktIo<Buf> = PktIo::new(10_000usize, 0).set_name("pkt-io-egress");
76+
let pktio: PktIo<Buf> = PktIo::new(10_000usize, 10_000usize).set_name("pkt-io-ip ");
7877

7978
// queues to expose
80-
let puntq = pktio_ip.get_puntq().unwrap_or_else(|| unreachable!());
81-
let injectq = pktio_egress.get_injectq().unwrap_or_else(|| unreachable!());
79+
let puntq = pktio.get_puntq().unwrap_or_else(|| unreachable!());
80+
let injectq = pktio.get_injectq().unwrap_or_else(|| unreachable!());
8281

8382
let pipeline_builder = move || {
8483
// Build network functions
@@ -94,24 +93,22 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
9493
let stats_stage = Stats::new("stats", writer.clone());
9594
let flow_lookup_nf = LookupNF::new(flow_table.clone());
9695
let flow_expirations_nf = ExpirationsNF::new(flow_table.clone());
97-
let pktio_ip_worker = pktio_ip.clone();
98-
let pktio_egress_worker = pktio_egress.clone();
96+
let pktio_worker = pktio.clone();
9997

10098
// Build the pipeline for a router. The composition of the pipeline (in stages) is currently
10199
// hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last
102100
DynPipeline::new()
103101
.add_stage(dumper1)
104102
.add_stage(stage_ingress)
105103
.add_stage(iprouter1)
106-
.add_stage(pktio_ip_worker)
107104
.add_stage(dst_vpcd_lookup)
108105
.add_stage(flow_lookup_nf)
109106
.add_stage(stateless_nat)
110107
.add_stage(stateful_nat)
111108
.add_stage(iprouter2)
112-
.add_stage(pktio_egress_worker)
113109
.add_stage(stage_egress)
114110
.add_stage(dumper2)
111+
.add_stage(pktio_worker)
115112
.add_stage(flow_expirations_nf)
116113
.add_stage(stats_stage)
117114
};

pkt-io/src/nf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for PktIo<Buf> {
140140
match &self.puntq {
141141
None => Some(packet),
142142
Some(puntq) => {
143-
if packet.get_meta().local() && !packet.is_done() {
143+
if packet.get_meta().local() {
144144
match puntq.push(Box::new(packet)) {
145145
Ok(()) => None, // punted!
146146
Err(mut packet) => {

pkt-meta/src/dst_vpcd_lookup/mod.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,17 +193,13 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for DstVpcdLookup {
193193
input: Input,
194194
) -> impl Iterator<Item = Packet<Buf>> + 'a {
195195
input.filter_map(|mut packet| {
196-
if let Some(tablesr) = &self.tablesr.enter() {
197-
if !packet.is_done() {
198-
// FIXME: ideally, we'd `enter` once for the whole batch. However,
199-
// this requires boxing the closures, which may be worse than
200-
// calling `enter` per packet? ... if not uglier
201-
196+
if !packet.get_meta().local() && !packet.is_done() && packet.try_ip().is_some() {
197+
if let Some(tablesr) = &self.tablesr.enter() {
202198
self.process_packet(tablesr, &mut packet);
199+
} else {
200+
error!("{}: failed to read vpcd tables", self.name);
201+
packet.done(DoneReason::InternalFailure);
203202
}
204-
} else {
205-
error!("{}: failed to read vpcd tables", self.name);
206-
packet.done(DoneReason::InternalFailure);
207203
}
208204
packet.enforce()
209205
})

0 commit comments

Comments
 (0)