basic expiration (values still need to serialize expiration)

This commit is contained in:
2025-06-16 20:51:48 +02:00
parent 39dd27378a
commit 28b42c786c
4 changed files with 143 additions and 19 deletions

View File

@@ -43,7 +43,7 @@ impl Set {
Ok(Self {
key,
value: Value::new(data),
value: Value::new(data, None),
})
}
}

View File

@@ -1,31 +1,47 @@
use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use bytes::{BufMut, Bytes, BytesMut};
use byteyarn::Yarn;
use tokio::sync::Mutex;
use tokio::{
sync::{Mutex, Notify},
time::Instant,
};
use crate::Result;
#[derive(Debug, Clone)]
pub struct Database {
entries: Arc<Mutex<BTreeMap<Yarn, Value>>>,
state: Arc<Mutex<DatabaseState>>,
notify: Arc<Notify>,
}
#[derive(Debug, Default)]
pub struct DatabaseState {
entries: BTreeMap<Yarn, Value>,
expirations: BTreeSet<(Instant, Yarn)>,
}
#[derive(Debug, Clone)]
pub struct Value {
data: Box<[u8]>,
pub expiration: Option<Instant>,
}
impl Value {
pub fn new(data: Bytes) -> Self {
pub fn new(data: Bytes, expiration: Option<Instant>) -> Self {
Self {
data: data.into_iter().collect(),
expiration,
}
}
pub fn from_string(data: String) -> Self {
pub fn from_string(data: String, expiration: Option<Instant>) -> Self {
Self {
data: data.as_bytes().into(),
expiration,
}
}
@@ -37,34 +53,108 @@ impl Value {
impl Database {
pub(crate) fn new() -> Self {
let state = Arc::default();
Self {
entries: Arc::default(),
state,
notify: Arc::default(),
}
}
pub async fn get(&self, key: &str) -> Option<Box<[u8]>> {
let entries = self.entries.lock().await;
let state = self.state.lock().await;
entries.get(key).map(|v| v.data.clone())
state.entries.get(key).map(|v| v.data.clone())
}
pub async fn set(&self, key: String, value: Value) -> Result<()> {
let mut entries = self.entries.lock().await;
let mut state = self.state.lock().await;
entries.insert(key.into(), value);
let expiration = value.expiration.clone();
let key = Yarn::from(key.clone());
let previous = state.entries.insert(key.clone(), value);
let mut notify = false;
if let Some(previous_expiration) = previous.map(|v| v.expiration).flatten() {
if state
.expirations
.iter()
.next()
.is_some_and(|&(instant, ref next_key)| {
previous_expiration == instant && key == next_key
})
{
notify = true;
}
state
.expirations
.remove(&(previous_expiration, key.clone().into()));
}
if let Some(expiration) = expiration {
if state
.expirations
.iter()
.next()
.is_none_or(|&(instant, _)| expiration > instant)
{
notify = true;
}
state.expirations.insert((expiration, key.into()));
};
if notify {
self.notify.notify_one();
}
Ok(())
}
pub async fn delete(&self, key: &str) -> Option<Box<[u8]>> {
let mut entries = self.entries.lock().await;
let mut state = self.state.lock().await;
entries.remove(key).map(|v| v.data)
state.entries.remove(key).map(|v| v.data)
}
pub async fn has(&self, key: &str) -> bool {
let entries = self.entries.lock().await;
let state = self.state.lock().await;
entries.contains_key(key)
state.entries.contains_key(key)
}
}
// TODO: Add shutdown stuff
pub async fn key_expiration_manager(db: Database) {
'outer: loop {
let mut state_lock = db.state.lock().await;
let state = &mut *state_lock;
let now = Instant::now();
while let Some((expiration, key)) = state.expirations.iter().next().as_ref() {
let expiration = *expiration;
if expiration <= now {
state.entries.remove(key);
state.expirations.remove(&(expiration, key.clone()));
continue;
}
drop(state_lock);
tokio::select! {
_ = tokio::time::sleep_until(expiration) => (),
_ = db.notify.notified() => (),
}
continue 'outer;
}
drop(state_lock);
db.notify.notified().await;
}
}

View File

@@ -38,7 +38,9 @@ async fn main() -> Result<()> {
let mut server = Server::new(&config).await?;
log::info!("The server is listening on {}:{}", config.host, config.port,);
// tokio::spawn(test());
log::info!("The server is listening on {}:{}", config.host, config.port);
log::info!(
"The maximum amount of concurrent connections is {}",
config.max_connections
@@ -48,3 +50,31 @@ async fn main() -> Result<()> {
Ok(())
}
/* async fn test() -> Result<()> {
let mut client = Client::new("127.0.0.1:6171").await?;
let key = String::from("my-key");
client
.set(
&key,
Value::from_string(
"my-value".into(),
Some(Instant::now() + Duration::from_secs(5)),
),
)
.await?;
assert!(client.has(&key).await?);
let value = client.get(&key).await?;
tokio::time::sleep(Duration::from_secs(6)).await;
assert!(!client.has(&key).await?);
let value = client.get(&key).await?;
Ok(())
} */

View File

@@ -6,7 +6,7 @@ use tokio::{net::TcpListener, sync::Semaphore};
use crate::Result;
use crate::config::ServerConfig;
use crate::connection::Connection;
use crate::database::Database;
use crate::database::{Database, key_expiration_manager};
use crate::handler::Handler;
#[derive(Debug)]
@@ -25,8 +25,12 @@ impl Server {
async fn _new<Addr: ToSocketAddrs>(addr: Addr, max_connections: usize) -> Result<Self> {
let listener = TcpListener::bind(addr).await?;
let db = Database::new();
tokio::spawn(key_expiration_manager(db.clone()));
Ok(Self {
db: Database::new(),
db,
connection_limit: Arc::new(Semaphore::const_new(max_connections)),
listener,
})
@@ -48,7 +52,7 @@ impl Server {
tokio::spawn(async move {
log::debug!("Spawned a new connection handler: {addr}");
if let Err(e) = handler.run().await {
println!("Handler::run error: {e:?}");
log::debug!("Handler::run error: {e:?}");
}
log::debug!("Connection handler ended: {addr}");