Skip to content

Commit 9262c9e

Browse files
committed
Implement writing statistics data to a writer
1 parent ce14147 commit 9262c9e

File tree

2 files changed

+119
-59
lines changed

2 files changed

+119
-59
lines changed

src/stats/mod.rs

Lines changed: 110 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
#![allow(dead_code)]
55
#![allow(unused_variables)]
66

7+
use core::fmt::Debug;
78
use std::{
9+
io::Write,
810
net::IpAddr,
911
sync::{
10-
LazyLock,
11-
atomic::{AtomicI32, AtomicPtr, AtomicU64, Ordering},
12+
atomic::{AtomicI32, AtomicPtr, AtomicU64, Ordering}, Arc, LazyLock, Mutex
1213
},
1314
};
1415

@@ -17,7 +18,7 @@ use chrono::{DateTime, Duration, FixedOffset, Utc};
1718
use papaya::{Guard, HashMap};
1819
use sarlacc::Intern;
1920
use seize::Collector;
20-
use tracing::{info, instrument};
21+
use tracing::{error, info, instrument};
2122

2223
/// The TTL for IP tracking entries, after which they are considered stale and removed.
2324
const IP_TRACKING_TTL: chrono::TimeDelta = Duration::days(1);
@@ -40,34 +41,44 @@ struct IpInfo {
4041
type Counts = HashMap<(Intern<Authority>, Intern<Authority>, Intern<Authority>), AtomicU64>;
4142

4243
/// The counts for all of the possible webring redirects
43-
#[derive(Debug)]
44-
struct AggregatedStats {
44+
struct AggregatedStats<W: Write + Send + 'static> {
4545
/// The collector for the atomic data that we're handling
46-
collector: Collector,
46+
collector: Arc<Collector>,
4747
/// The last date that a redirect was tracked (hopefully today)
4848
today: AtomicI32,
4949
/// (From, To, Started From) → Count
5050
/// Invariant: This MUST ALWAYS be a valid pointer
5151
counter: AtomicPtr<Counts>,
52+
/// The writer for the statistics output file
53+
output: Arc<Mutex<W>>,
5254
}
5355

54-
impl AggregatedStats {
56+
impl<W: Write + Send + 'static> Debug for AggregatedStats<W> {
57+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58+
f.debug_struct("AggregatedStats")
59+
.field("today", &self.today.load(Ordering::Relaxed))
60+
.finish_non_exhaustive()
61+
}
62+
}
63+
64+
/// Convert the current time into the days since the epoch
65+
fn mk_num(time: DateTime<Utc>) -> i32 {
66+
time.date_naive().to_epoch_days()
67+
}
68+
69+
impl<W: Write + Send + 'static> AggregatedStats<W> {
5570
/// Create a new `AggregatedStats`
56-
fn new(now: DateTime<Utc>) -> Self {
71+
fn new(now: DateTime<Utc>, writer: W) -> Self {
5772
let counter: Counts = HashMap::default();
5873

5974
AggregatedStats {
60-
today: AtomicI32::new(Self::mk_num(now)),
75+
today: AtomicI32::new(mk_num(now)),
6176
counter: AtomicPtr::new(Box::into_raw(Box::new(counter))),
62-
collector: Collector::new(),
77+
collector: Arc::new(Collector::new()),
78+
output: Arc::new(Mutex::new(writer)),
6379
}
6480
}
6581

66-
/// Convert the current time into the days since the epoch
67-
fn mk_num(time: DateTime<Utc>) -> i32 {
68-
time.date_naive().to_epoch_days()
69-
}
70-
7182
/// Retrieve the current counter from a guard
7283
fn counter<'a>(&'a self, guard: &'a impl Guard) -> &'a Counts {
7384
// SAFETY: The counter is guaranteed to be a valid pointer and we are using Acquire ordering to synchronize-with its initialization
@@ -76,40 +87,71 @@ impl AggregatedStats {
7687

7788
/// Retrieve the current counter from a guard while updating it if the current time is a new calendar date
7889
fn maybe_update_counter<'a>(&'a self, now: DateTime<Utc>, guard: &'a impl Guard) -> &'a Counts {
79-
let now = AggregatedStats::mk_num(now);
90+
let now = mk_num(now);
8091

8192
let prev_day = self.today.swap(now, Ordering::Relaxed);
8293

8394
if prev_day != now {
8495
let new_counter: *mut Counts = Box::into_raw(Box::new(HashMap::new()));
8596

86-
// Release to synchronize-with `counter`. We don't need Acquire because we won't read the previous pointer.
87-
let prev = guard.swap(&self.counter, new_counter, Ordering::Release);
88-
// SAFETY: The pointer can no longer be accessed now that it has been swapped into `prev`, and `Box::from_raw` is the correct way to drop the pointer.
89-
unsafe {
90-
self.collector
91-
.retire(prev, |ptr, _| drop(Box::from_raw(ptr)));
92-
}
97+
// We need this guard to go into our task so it needs to be owned
98+
let guard_owned = self.collector.enter();
99+
100+
// Release to synchronize-with `counter` and Acquire to ensure that we can see the initialization of the previous one so that we can properly access and write it.
101+
let prev_ptr = guard_owned.swap(&self.counter, new_counter, Ordering::AcqRel);
102+
103+
let output = Arc::clone(&self.output);
104+
105+
// Allow it to be moved to our task
106+
let prev_ptr = prev_ptr as usize;
107+
108+
let this_collector = Arc::clone(&self.collector);
109+
110+
tokio::task::spawn_blocking(move || {
111+
let mut output = output.lock().unwrap();
112+
113+
let prev_ptr = prev_ptr as *mut Counts;
114+
// SAFETY: Since this pointer hasn't been retired yet, we have access to it until we do retire it.
115+
let prev = unsafe { &*prev_ptr }.pin();
116+
117+
for ((from, to, started_from), count) in &prev {
118+
let count = count.load(Ordering::Relaxed);
119+
if let Err(e) = output.write_fmt(format_args!("{prev_day},{from},{to},{started_from},{count}\n")) {
120+
error!("Error writing statistics: {e}");
121+
}
122+
}
123+
124+
// SAFETY: The pointer can no longer be accessed from a new location since we previously overwrote the atomic pointer, and `Box::from_raw` is the correct way to drop the pointer. This task is also finished with its access to it.
125+
unsafe { this_collector.retire(prev_ptr, |ptr, _| drop(Box::from_raw(ptr))) }
126+
});
93127
}
94128

95129
self.counter(guard)
96130
}
97131
}
98132

99133
/// Statistics tracking for the webring
100-
#[derive(Debug)]
101-
pub struct Stats {
134+
pub struct Stats<W: Write + Send + 'static> {
102135
/// Aggregated statistics
103-
aggregated: AggregatedStats,
136+
aggregated: AggregatedStats<W>,
104137
/// Map of IP information keyed by IP address
105138
ip_tracking: HashMap<IpAddr, IpInfo>,
106139
}
107140

108-
impl Stats {
141+
impl<W: Write + Send + 'static> Debug for Stats<W> {
142+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143+
f.debug_struct("Stats")
144+
.field("aggregated", &self.aggregated)
145+
.field("ip_tracking", &self.ip_tracking)
146+
.finish()
147+
}
148+
}
149+
150+
impl<W: Write + Send> Stats<W> {
109151
/// Creates a new instance of `Stats`.
110-
pub fn new(now: DateTime<Utc>) -> Stats {
152+
pub fn new(now: DateTime<Utc>, writer: W) -> Stats<W> {
111153
Stats {
112-
aggregated: AggregatedStats::new(now),
154+
aggregated: AggregatedStats::new(now, writer),
113155
ip_tracking: HashMap::new(),
114156
}
115157
}
@@ -144,8 +186,7 @@ impl Stats {
144186

145187
let guard = self.aggregated.collector.enter();
146188
let pinned_map = self.aggregated.maybe_update_counter(now, &guard).pin();
147-
let counter =
148-
pinned_map.get_or_insert((from, to, ip_info.started_from), AtomicU64::new(0));
189+
let counter = pinned_map.get_or_insert((from, to, ip_info.started_from), AtomicU64::new(0));
149190

150191
counter.fetch_add(1, Ordering::Relaxed);
151192
}
@@ -176,13 +217,11 @@ impl Stats {
176217
self.aggregated
177218
.counter(&guard)
178219
.pin()
179-
.get(
180-
&(
181-
Intern::new(entry.0.parse::<Authority>().unwrap()),
182-
Intern::new(entry.1.parse::<Authority>().unwrap()),
183-
Intern::new(entry.2.parse::<Authority>().unwrap()),
184-
),
185-
)
220+
.get(&(
221+
Intern::new(entry.0.parse::<Authority>().unwrap()),
222+
Intern::new(entry.1.parse::<Authority>().unwrap()),
223+
Intern::new(entry.2.parse::<Authority>().unwrap()),
224+
),)
186225
.map_or(0, |v| v.load(Ordering::Relaxed)),
187226
count,
188227
"{self:#?}\n{entry:?}"
@@ -192,11 +231,13 @@ impl Stats {
192231

193232
#[cfg(test)]
194233
mod tests {
195-
use std::net::IpAddr;
234+
use std::{collections::HashSet, net::IpAddr, str::from_utf8};
196235

197236
use axum::http::uri::Authority;
198237
use chrono::{DateTime, Duration, NaiveDate, Utc};
238+
use indoc::indoc;
199239
use sarlacc::Intern;
240+
use tokio::sync::mpsc::{self, UnboundedReceiver};
200241

201242
use crate::stats::IP_TRACKING_TTL;
202243

@@ -218,9 +259,31 @@ mod tests {
218259
t(timestamp).with_timezone(&TIMEZONE).date_naive()
219260
}
220261

262+
struct TestWriter(mpsc::UnboundedSender<Vec<u8>>);
263+
264+
impl std::io::Write for TestWriter {
265+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
266+
self.0.send(buf.to_owned()).unwrap();
267+
Ok(buf.len())
268+
}
269+
270+
fn flush(&mut self) -> std::io::Result<()> {
271+
Ok(())
272+
}
273+
}
274+
275+
async fn assert_same_data(rx: &mut UnboundedReceiver<Vec<u8>>, expected: &str) {
276+
let mut data = Vec::new();
277+
while data.len() != expected.len() {
278+
data.extend(rx.recv().await.unwrap());
279+
}
280+
assert_eq!(from_utf8(&data).unwrap().split('\n').collect::<HashSet<_>>(), expected.split('\n').collect::<HashSet<_>>());
281+
}
282+
221283
#[tokio::test]
222284
async fn test_stat_tracking() {
223-
let stats = Stats::new(t(0));
285+
let (tx, mut rx) = mpsc::unbounded_channel();
286+
let stats = Stats::new(t(0), TestWriter(tx));
224287

225288
stats.redirected_impl(a("0.0.0.0"), i("a.com"), i("b.com"), t(0));
226289
stats.redirected_impl(a("0.0.0.0"), i("b.com"), i("c.com"), t(1));
@@ -269,7 +332,14 @@ mod tests {
269332

270333
let day = Duration::days(1);
271334

335+
assert!(rx.is_empty());
272336
stats.redirected_impl(a("0.0.0.0"), i("b.com"), i("c.com"), t(1) + day);
337+
assert_same_data(&mut rx, indoc! {"
338+
0,a.com,b.com,a.com,2
339+
0,b.com,c.com,a.com,1
340+
0,b.com,homepage.com,a.com,1
341+
0,homepage.com,c.com,a.com,1
342+
"}).await;
273343
stats.redirected_impl(a("0.0.0.0"), i("c.com"), i("a.com"), t(2) + day);
274344

275345
assert_eq!(stats.aggregated.counter(&guard).len(), 2);

src/webring.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
//! Ring behavior and data structures
22
33
use std::{
4-
net::IpAddr,
5-
path::{Path, PathBuf},
6-
sync::{
7-
Arc, OnceLock, RwLock, RwLockReadGuard,
8-
atomic::{AtomicBool, Ordering},
9-
},
4+
io::{empty, Empty}, net::IpAddr, path::{Path, PathBuf}, sync::{
5+
atomic::{AtomicBool, Ordering}, Arc, OnceLock, RwLock, RwLockReadGuard
6+
}
107
};
118

129
use axum::http::{Uri, uri::Authority};
13-
use chrono::{DateTime, TimeDelta, Utc};
10+
use chrono::{TimeDelta, Utc};
1411
use futures::{StreamExt, future::join, stream::FuturesUnordered};
1512
use indexmap::IndexMap;
1613
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher as _};
@@ -184,7 +181,7 @@ pub struct Webring {
184181
/// Discord notifier for notifying members of issues with their sites
185182
notifier: Option<Arc<DiscordNotifier>>,
186183
/// Statistics collected about the ring
187-
stats: Arc<Stats>,
184+
stats: Arc<Stats<Empty>>,
188185
/// Current configuration of the webring, used for detecting changes when reloading
189186
config: Arc<AsyncRwLock<Option<Config>>>,
190187
}
@@ -199,7 +196,7 @@ impl Webring {
199196
members: RwLock::new(member_map_from_config_table(&config.members)),
200197
static_dir_path: config.webring.static_dir.clone(),
201198
homepage: AsyncRwLock::new(None),
202-
stats: Arc::new(Stats::new(Utc::now())),
199+
stats: Arc::new(Stats::new(Utc::now(), empty())),
203200
file_watcher: OnceLock::default(),
204201
base_address: config.webring.base_url(),
205202
notifier: config
@@ -644,11 +641,10 @@ mod tests {
644641
use std::{
645642
collections::HashSet,
646643
fs::{File, OpenOptions},
647-
io::Write as _,
644+
io::{empty, Write as _},
648645
path::PathBuf,
649646
sync::{
650-
Arc, OnceLock, RwLock,
651-
atomic::{AtomicBool, Ordering},
647+
atomic::{AtomicBool, Ordering}, Arc, OnceLock, RwLock
652648
},
653649
time::Duration,
654650
};
@@ -687,7 +683,7 @@ mod tests {
687683
base_address: Intern::default(),
688684
base_authority: Intern::new("ring.purduehackers.com".parse().unwrap()),
689685
notifier: None,
690-
stats: Arc::new(Stats::new(Utc::now())),
686+
stats: Arc::new(Stats::new(Utc::now(), empty())),
691687
config: Arc::new(AsyncRwLock::new(None)),
692688
}
693689
}
@@ -818,8 +814,6 @@ mod tests {
818814
assert_eq!(*inner, expected);
819815
}
820816

821-
let today = Utc::now().with_timezone(&TIMEZONE).date_naive();
822-
823817
webring.assert_next(
824818
"https://hrovnyak.gitlab.io/bruh/bruh/bruh?bruh=bruh",
825819
Ok("kasad.com"),
@@ -1016,8 +1010,6 @@ mod tests {
10161010
.unwrap();
10171011
let webring = Webring::new(&config);
10181012

1019-
let today = Utc::now().with_timezone(&TIMEZONE).date_naive();
1020-
10211013
let uri = webring
10221014
.random_page(None, "0.0.0.0".parse().unwrap())
10231015
.unwrap();
@@ -1041,8 +1033,6 @@ mod tests {
10411033
"# }).unwrap();
10421034
let webring = Webring::new(&config);
10431035

1044-
let today = Utc::now().with_timezone(&TIMEZONE).date_naive();
1045-
10461036
let uri = webring
10471037
.random_page(
10481038
Some(&"clementine.viridian.page".parse().unwrap()),

0 commit comments

Comments
 (0)