|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// Copyright Open Network Fabric Authors |
| 3 | + |
| 4 | +//! Local IO |
| 5 | +
|
| 6 | +#![allow(unused)] |
| 7 | + |
| 8 | +use futures::future::join_all; |
| 9 | +use tokio::{io, spawn}; |
| 10 | + |
| 11 | +use crate::PktQueue; |
| 12 | +use interface_manager::interface::TapDevice; |
| 13 | + |
| 14 | +use net::buffer::PacketBufferMut; |
| 15 | +use net::interface::InterfaceIndex; |
| 16 | +use net::packet::Packet; |
| 17 | + |
| 18 | +use std::cell::RefCell; |
| 19 | +use std::collections::HashMap; |
| 20 | +use std::sync::Arc; |
| 21 | + |
| 22 | +use tokio::sync::Mutex; |
| 23 | + |
| 24 | +#[allow(unused)] |
| 25 | +use tracing::{debug, error, warn}; |
| 26 | + |
| 27 | +/// A mapping between an interface and a tap, and the tap itself |
| 28 | +struct TapMapping { |
| 29 | + tapid: InterfaceIndex, |
| 30 | + ifid: InterfaceIndex, |
| 31 | + tap: Mutex<TapDevice>, |
| 32 | +} |
| 33 | +impl TapMapping { |
| 34 | + fn new(tapid: InterfaceIndex, ifid: InterfaceIndex, tap: TapDevice) -> Self { |
| 35 | + Self { |
| 36 | + tapid, |
| 37 | + ifid, |
| 38 | + tap: Mutex::new(tap), |
| 39 | + } |
| 40 | + } |
| 41 | +} |
| 42 | + |
| 43 | +/// Collection of mappings doubly indexed. We need to cope with the case that |
| 44 | +/// tap ids clash with interfaceids, which could happen with distinct network namespaces. |
| 45 | +/// We need distinct indices because the action after a lookup depends on the 'type' of key. |
| 46 | +struct TapMappingDb { |
| 47 | + by_tapid: HashMap<InterfaceIndex, Arc<TapMapping>>, |
| 48 | + by_ifid: HashMap<InterfaceIndex, Arc<TapMapping>>, |
| 49 | +} |
| 50 | +impl TapMappingDb { |
| 51 | + fn add_mapping(&mut self, tapid: InterfaceIndex, ifid: InterfaceIndex, tap: TapDevice) { |
| 52 | + let mapping = Arc::new(TapMapping::new(tapid, ifid, tap)); |
| 53 | + self.by_tapid.insert(tapid, Arc::clone(&mapping)); |
| 54 | + self.by_ifid.insert(ifid, mapping); |
| 55 | + } |
| 56 | + fn get_by_ifid(&self, id: InterfaceIndex) -> Option<&TapMapping> { |
| 57 | + self.by_ifid.get(&id).map(|m| &**m) |
| 58 | + } |
| 59 | + fn get_by_tapid(&self, id: InterfaceIndex) -> Option<&TapMapping> { |
| 60 | + self.by_tapid.get(&id).map(|m| &**m) |
| 61 | + } |
| 62 | +} |
| 63 | + |
| 64 | +struct IoManager<Buf: PacketBufferMut> { |
| 65 | + puntq: PktQueue<Buf>, // for the time being only 2 queues. May need more with multiple PktIo stages |
| 66 | + injectq: PktQueue<Buf>, |
| 67 | + pub(crate) mapdb: TapMappingDb, |
| 68 | +} |
| 69 | +impl<Buf: PacketBufferMut> IoManager<Buf> { |
| 70 | + // handle incoming packets: fetch packets punted by pipeline and write them to the corresponding tap |
| 71 | + async fn handle_incoming_pkts(&self) { |
| 72 | + loop { |
| 73 | + if let Some(packet) = self.puntq.pop() { |
| 74 | + let Some(ifindex) = packet.get_meta().iif else { |
| 75 | + warn!("Packet has no incoming interface annotation. Dropping it..."); |
| 76 | + return; |
| 77 | + }; |
| 78 | + debug!("Got a packet from interface {ifindex}"); |
| 79 | + |
| 80 | + // lookup tap from the id of the interface it was received |
| 81 | + let Some(tap) = self.mapdb.get_by_ifid(ifindex).map(|m| &m.tap) else { |
| 82 | + warn!("Could not find tap for interface {ifindex}. Dropping packet..."); |
| 83 | + return; |
| 84 | + }; |
| 85 | + |
| 86 | + debug!("Will write packet to tap"); |
| 87 | + match packet.serialize() { |
| 88 | + Err(e) => error!("Failed to serialize packet prior to tap write: {e}"), |
| 89 | + Ok(buf) => match tap.lock().await.write(buf).await { |
| 90 | + Err(e) => error!("Failed to write buffer to tap: {e}"), |
| 91 | + Ok(()) => debug!("Wrote packet to tap!"), |
| 92 | + }, |
| 93 | + } |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + |
| 98 | + // handle outgoing traffic. We read from tap and push to injection queue |
| 99 | + async fn send_outgoing_packet(&self, m: &TapMapping) { |
| 100 | + let mut buffer = Buf::create(); |
| 101 | + match m.tap.lock().await.read(&mut buffer).await { |
| 102 | + Ok(size) => match Packet::new(buffer) { |
| 103 | + Err(e) => error!("Failed to build packet from buffer: {e}"), |
| 104 | + Ok(mut packet) => { |
| 105 | + debug!( |
| 106 | + "Got buffer of size {size} from tap {}. Target oif: {}", |
| 107 | + m.tapid, m.ifid |
| 108 | + ); |
| 109 | + packet.get_meta_mut().oif = Some(m.ifid); |
| 110 | + |
| 111 | + // inject |
| 112 | + match self.injectq.push(Box::new(packet)) { |
| 113 | + Ok(()) => debug!("Injected packet from tap"), |
| 114 | + Err(dropped) => warn!("Could not inject packet {dropped}"), |
| 115 | + } |
| 116 | + } |
| 117 | + }, |
| 118 | + Err(e) => error!("Failure reading packet from tap: {e}"), |
| 119 | + } |
| 120 | + } |
| 121 | + |
| 122 | + async fn handle_outgoing_pkts(&self, injectq: PktQueue<Buf>) { |
| 123 | + // unsure if this is the best way to do this. |
| 124 | + loop { |
| 125 | + let x = self |
| 126 | + .mapdb |
| 127 | + .by_tapid |
| 128 | + .values() |
| 129 | + .map(|m| self.send_outgoing_packet(m)); |
| 130 | + join_all(x).await; |
| 131 | + } |
| 132 | + } |
| 133 | +} |
0 commit comments