diff --git a/src/commands/set.rs b/src/commands/set.rs index a2304a6..d714d92 100644 --- a/src/commands/set.rs +++ b/src/commands/set.rs @@ -43,7 +43,7 @@ impl Set { Ok(Self { key, - value: Value::new(data), + value: Value::new(data, None), }) } } diff --git a/src/database.rs b/src/database.rs index 6e30da8..3999801 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,31 +1,47 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use bytes::{BufMut, Bytes, BytesMut}; use byteyarn::Yarn; -use tokio::sync::Mutex; +use tokio::{ + sync::{Mutex, Notify}, + time::Instant, +}; use crate::Result; #[derive(Debug, Clone)] pub struct Database { - entries: Arc>>, + state: Arc>, + notify: Arc, +} + +#[derive(Debug, Default)] +pub struct DatabaseState { + entries: BTreeMap, + expirations: BTreeSet<(Instant, Yarn)>, } #[derive(Debug, Clone)] pub struct Value { data: Box<[u8]>, + pub expiration: Option, } impl Value { - pub fn new(data: Bytes) -> Self { + pub fn new(data: Bytes, expiration: Option) -> Self { Self { data: data.into_iter().collect(), + expiration, } } - pub fn from_string(data: String) -> Self { + pub fn from_string(data: String, expiration: Option) -> Self { Self { data: data.as_bytes().into(), + expiration, } } @@ -37,34 +53,108 @@ impl Value { impl Database { pub(crate) fn new() -> Self { + let state = Arc::default(); + Self { - entries: Arc::default(), + state, + notify: Arc::default(), } } pub async fn get(&self, key: &str) -> Option> { - let entries = self.entries.lock().await; + let state = self.state.lock().await; - entries.get(key).map(|v| v.data.clone()) + state.entries.get(key).map(|v| v.data.clone()) } pub async fn set(&self, key: String, value: Value) -> Result<()> { - let mut entries = self.entries.lock().await; + let mut state = self.state.lock().await; - entries.insert(key.into(), value); + let expiration = value.expiration.clone(); + + let key = Yarn::from(key.clone()); + + let previous = state.entries.insert(key.clone(), value); + + let mut notify = false; + + if let Some(previous_expiration) = previous.map(|v| v.expiration).flatten() { + if state + .expirations + .iter() + .next() + .is_some_and(|&(instant, ref next_key)| { + previous_expiration == instant && key == next_key + }) + { + notify = true; + } + + state + .expirations + .remove(&(previous_expiration, key.clone().into())); + } + + if let Some(expiration) = expiration { + if state + .expirations + .iter() + .next() + .is_none_or(|&(instant, _)| expiration > instant) + { + notify = true; + } + + state.expirations.insert((expiration, key.into())); + }; + + if notify { + self.notify.notify_one(); + } Ok(()) } pub async fn delete(&self, key: &str) -> Option> { - let mut entries = self.entries.lock().await; + let mut state = self.state.lock().await; - entries.remove(key).map(|v| v.data) + state.entries.remove(key).map(|v| v.data) } pub async fn has(&self, key: &str) -> bool { - let entries = self.entries.lock().await; + let state = self.state.lock().await; - entries.contains_key(key) + state.entries.contains_key(key) + } +} + +// TODO: Add shutdown stuff +pub async fn key_expiration_manager(db: Database) { + 'outer: loop { + let mut state_lock = db.state.lock().await; + let state = &mut *state_lock; + + let now = Instant::now(); + + while let Some((expiration, key)) = state.expirations.iter().next().as_ref() { + let expiration = *expiration; + + if expiration <= now { + state.entries.remove(key); + state.expirations.remove(&(expiration, key.clone())); + continue; + } + + drop(state_lock); + + tokio::select! { + _ = tokio::time::sleep_until(expiration) => (), + _ = db.notify.notified() => (), + } + continue 'outer; + } + + drop(state_lock); + db.notify.notified().await; } } diff --git a/src/main.rs b/src/main.rs index 8b88975..503a307 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,7 +38,9 @@ async fn main() -> Result<()> { let mut server = Server::new(&config).await?; - log::info!("The server is listening on {}:{}", config.host, config.port,); + // tokio::spawn(test()); + + log::info!("The server is listening on {}:{}", config.host, config.port); log::info!( "The maximum amount of concurrent connections is {}", config.max_connections @@ -48,3 +50,31 @@ async fn main() -> Result<()> { Ok(()) } + +/* async fn test() -> Result<()> { + let mut client = Client::new("127.0.0.1:6171").await?; + + let key = String::from("my-key"); + + client + .set( + &key, + Value::from_string( + "my-value".into(), + Some(Instant::now() + Duration::from_secs(5)), + ), + ) + .await?; + + assert!(client.has(&key).await?); + + let value = client.get(&key).await?; + + tokio::time::sleep(Duration::from_secs(6)).await; + + assert!(!client.has(&key).await?); + + let value = client.get(&key).await?; + + Ok(()) +} */ diff --git a/src/server.rs b/src/server.rs index 59d9b1c..292523d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,7 @@ use tokio::{net::TcpListener, sync::Semaphore}; use crate::Result; use crate::config::ServerConfig; use crate::connection::Connection; -use crate::database::Database; +use crate::database::{Database, key_expiration_manager}; use crate::handler::Handler; #[derive(Debug)] @@ -25,8 +25,12 @@ impl Server { async fn _new(addr: Addr, max_connections: usize) -> Result { let listener = TcpListener::bind(addr).await?; + let db = Database::new(); + + tokio::spawn(key_expiration_manager(db.clone())); + Ok(Self { - db: Database::new(), + db, connection_limit: Arc::new(Semaphore::const_new(max_connections)), listener, }) @@ -48,7 +52,7 @@ impl Server { tokio::spawn(async move { log::debug!("Spawned a new connection handler: {addr}"); if let Err(e) = handler.run().await { - println!("Handler::run error: {e:?}"); + log::debug!("Handler::run error: {e:?}"); } log::debug!("Connection handler ended: {addr}");