Compare commits

...

10 Commits

Author SHA1 Message Date
409
bd19ddd6cb ttl command 2025-06-17 02:12:15 +02:00
409
20e3fbd5d3 read expiration from Set command + tests 2025-06-17 01:52:11 +02:00
409
28b42c786c basic expiration (values still need to serialize expiration) 2025-06-16 20:51:48 +02:00
409
39dd27378a basic logging 2025-06-16 18:11:48 +02:00
409
10837dac35 add ServerConfig with env variables 2025-06-16 18:03:42 +02:00
409
0c619fbc94 remove test from main.rs 2025-06-16 17:45:38 +02:00
409
2931cf2927 improve database memory usage 2025-06-16 17:41:09 +02:00
409
34818ce050 refactor commands 2025-06-12 19:04:52 +02:00
409
06a503f67d fix Server::new syntax error 2025-06-12 17:47:57 +02:00
409
a48496058c add logo 2025-06-12 15:53:45 +02:00
18 changed files with 947 additions and 193 deletions

343
Cargo.lock generated
View File

@@ -17,11 +17,74 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]]
name = "anstream"
version = "0.6.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd"
[[package]]
name = "anstyle-parse"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.59.0",
]
[[package]]
name = "archive"
version = "0.1.0"
dependencies = [
"bon",
"bytes",
"byteyarn",
"env_logger",
"log",
"thiserror",
"tokio",
]
@@ -53,24 +116,179 @@ version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967"
[[package]]
name = "bon"
version = "3.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f61138465baf186c63e8d9b6b613b508cd832cba4ce93cf37ce5f096f91ac1a6"
dependencies = [
"bon-macros",
"rustversion",
]
[[package]]
name = "bon-macros"
version = "3.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40d1dad34aa19bf02295382f08d9bc40651585bd497266831d40ee6296fb49ca"
dependencies = [
"darling",
"ident_case",
"prettyplease",
"proc-macro2",
"quote",
"rustversion",
"syn",
]
[[package]]
name = "buf-trait"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21eaafc770e8c073d6c3facafe7617e774305d4954aa6351b9c452eb37ee17b4"
dependencies = [
"zerocopy",
]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "byteyarn"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93e51d26468a15ea59f8525e0c13dc405db43e644a0b1e6d44346c72cf4cf7b"
dependencies = [
"buf-trait",
]
[[package]]
name = "cfg-if"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268"
[[package]]
name = "colorchoice"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "darling"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "gimli"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "libc"
version = "0.2.172"
@@ -87,6 +305,12 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "memchr"
version = "2.7.5"
@@ -122,6 +346,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "once_cell_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad"
[[package]]
name = "parking_lot"
version = "0.12.4"
@@ -151,6 +381,31 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "portable-atomic"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "prettyplease"
version = "0.2.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -178,18 +433,73 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rustc-demangle"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f"
[[package]]
name = "rustversion"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.5"
@@ -215,6 +525,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"
version = "2.0.102"
@@ -281,6 +597,12 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
@@ -368,3 +690,24 @@ name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@@ -4,6 +4,10 @@ version = "0.1.0"
edition = "2024"
[dependencies]
bon = "3.6.4"
bytes = "1.10.1"
byteyarn = "0.5.1"
env_logger = "0.11.8"
log = "0.4.27"
thiserror = "2.0.12"
tokio = { version = "1.45.1", features = ["full"] }

BIN
logo.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

View File

@@ -1,7 +1,7 @@
use bytes::{Buf, BufMut as _, Bytes, BytesMut};
use tokio::net::{TcpStream, ToSocketAddrs};
use crate::{Result, connection::Connection, database::Value, errors::AppError};
use crate::{Result, connection::Connection, errors::AppError};
pub struct Client {
connection: Connection,
@@ -50,7 +50,12 @@ impl Client {
Ok(response)
}
pub async fn set(&mut self, key: &str, value: Value) -> Result<()> {
pub async fn set(
&mut self,
key: &str,
data: &[u8],
expiration_secs: Option<u64>,
) -> Result<()> {
let mut bytes = BytesMut::new();
bytes.put_u16(3);
@@ -64,7 +69,18 @@ impl Client {
bytes.put_u16(key_length);
bytes.put_slice(key.as_bytes());
value.write_to_bytes(&mut bytes);
bytes.put_u32(data.len() as u32);
bytes.put_slice(data);
match expiration_secs {
Some(seconds) => {
bytes.put_u8(1);
bytes.put_u64(seconds);
}
None => {
bytes.put_u8(0);
}
}
self.connection.write(bytes.into()).await?;
@@ -128,6 +144,32 @@ impl Client {
Ok(r.try_get_u8()? == 1)
}
pub async fn ttl(&mut self, key: &str) -> Result<Option<u64>> {
let mut bytes = BytesMut::new();
bytes.put_u16(3);
bytes.put_slice(b"ttl");
let key_length: u16 = key
.len()
.try_into()
.map_err(|_| AppError::KeyLength(key.len()))?;
bytes.put_u16(key_length);
bytes.put_slice(key.as_bytes());
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let ttl = match r.try_get_u8()? {
1 => Some(r.try_get_u64()?),
0 => None,
_ => return Err(AppError::InvalidCommandResponse),
};
Ok(ttl)
}
async fn get_response(&mut self) -> Result<Bytes> {
self.connection
.read_bytes()

View File

@@ -1,139 +0,0 @@
use std::io::{Cursor, Read};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::{
Result,
connection::Connection,
database::{Database, Value},
errors::AppError,
};
#[derive(Debug)]
pub enum Command {
Get { key: String },
Set { key: String, value: Value },
Delete { key: String },
Has { key: String },
}
impl Command {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
match self {
Command::Get { ref key } => {
let value = db.get(&key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => {
buf.put_u8(0);
}
}
connection.write(buf.into()).await?;
}
Command::Set { key, value } => {
db.set(key.clone(), value.clone()).await?;
connection.write(Bytes::from_static(&[1])).await?;
}
Command::Delete { ref key } => {
let value = db.delete(key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => buf.put_u8(0),
}
connection.write(buf.into()).await?;
}
Command::Has { ref key } => {
let value = db.has(key).await;
let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]);
connection.write(buf.into()).await?;
}
}
Ok(())
}
pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> {
let mut buffer = Cursor::new(&bytes[..]);
let name = read_string(&mut buffer)?;
Self::parse_inner(name, &mut buffer)
}
fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> {
let command = match command_name.as_str() {
"get" => {
let key = read_string(bytes)?;
Self::Get { key }
}
"set" => {
let key = read_string(bytes)?;
let data = read_bytes(bytes)?;
Self::Set {
key,
value: Value::new(data),
}
}
"delete" => {
let key = read_string(bytes)?;
Self::Delete { key }
}
"has" => {
let key = read_string(bytes)?;
Self::Has { key }
}
_ => return Err(AppError::UnknownCommand(command_name)),
};
Ok((command, bytes.position()))
}
}
fn read_string(buffer: &mut Cursor<&[u8]>) -> Result<String> {
let length = buffer.try_get_u16()? as usize;
if buffer.remaining() < length {
return Err(AppError::IncompleteCommandBuffer);
}
let mut contents = Vec::with_capacity(length);
for _ in 0..length {
contents.push(buffer.try_get_u8()?);
}
let string = String::from_utf8(contents)?;
Ok(string)
}
fn read_bytes(buffer: &mut Cursor<&[u8]>) -> Result<Bytes> {
let len = buffer.try_get_u32()? as usize;
if buffer.remaining() < len {
return Err(AppError::IncompleteCommandBuffer);
}
Ok(buffer.copy_to_bytes(len))
}

42
src/commands/delete.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::io::Cursor;
use bytes::{Buf as _, BufMut as _, BytesMut};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Delete {
key: String,
}
impl Delete {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.delete(&self.key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => buf.put_u8(0),
}
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

44
src/commands/get.rs Normal file
View File

@@ -0,0 +1,44 @@
use std::io::Cursor;
use bytes::{Buf as _, BufMut as _, BytesMut};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Get {
key: String,
}
impl Get {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.get(&self.key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => {
buf.put_u8(0);
}
}
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

34
src/commands/has.rs Normal file
View File

@@ -0,0 +1,34 @@
use std::io::Cursor;
use bytes::{Buf as _, Bytes};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Has {
key: String,
}
impl Has {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.has(&self.key).await;
let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]);
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

64
src/commands/mod.rs Normal file
View File

@@ -0,0 +1,64 @@
mod delete;
mod get;
mod has;
mod set;
mod ttl;
use std::io::Cursor;
use bytes::{Buf, BytesMut};
use delete::Delete;
use get::Get;
use has::Has;
use set::Set;
use ttl::Ttl;
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug)]
pub enum Command {
Get(Get),
Set(Set),
Delete(Delete),
Has(Has),
Ttl(Ttl),
}
impl Command {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
match self {
Command::Get(get) => get.execute(db, connection).await,
Command::Set(set) => set.execute(db, connection).await,
Command::Delete(delete) => delete.execute(db, connection).await,
Command::Has(has) => has.execute(db, connection).await,
Command::Ttl(ttl) => ttl.execute(db, connection).await,
}
}
pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> {
let mut buffer = Cursor::new(&bytes[..]);
let name_length = buffer.try_get_u16()? as usize;
if buffer.remaining() < name_length {
return Err(AppError::IncompleteCommandBuffer);
}
let name = String::from_utf8(buffer.copy_to_bytes(name_length).to_vec())?;
Self::parse_inner(name, &mut buffer)
}
fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> {
let command = match command_name.to_lowercase().as_str() {
"get" => Self::Get(Get::parse(bytes)?),
"set" => Self::Set(Set::parse(bytes)?),
"delete" => Self::Delete(Delete::parse(bytes)?),
"has" => Self::Has(Has::parse(bytes)?),
"ttl" => Self::Ttl(Ttl::parse(bytes)?),
_ => return Err(AppError::UnknownCommand(command_name)),
};
Ok((command, bytes.position()))
}
}

56
src/commands/set.rs Normal file
View File

@@ -0,0 +1,56 @@
use std::{io::Cursor, time::Duration};
use bytes::{Buf as _, Bytes};
use tokio::time::Instant;
use crate::{
Result,
connection::Connection,
database::{Database, Value},
errors::AppError,
};
#[derive(Debug, Clone)]
pub struct Set {
key: String,
value: Value,
}
impl Set {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
db.set(self.key, self.value).await?;
connection.write(Bytes::from_static(&[1])).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
let value_length = bytes.try_get_u32()? as usize;
if bytes.remaining() < value_length {
return Err(AppError::IncompleteCommandBuffer);
}
let data = bytes.copy_to_bytes(value_length);
let expiration: Option<Instant> = match bytes.try_get_u8()? {
1 => Some(Instant::now() + Duration::from_secs(bytes.try_get_u64()?)),
0 => None,
_ => return Err(AppError::UnexpectedCommandData),
};
Ok(Self {
key,
value: Value::new(data, expiration),
})
}
}

42
src/commands/ttl.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::io::Cursor;
use bytes::{Buf as _, BufMut, Bytes, BytesMut};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Ttl {
key: String,
}
impl Ttl {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let ttl = db.ttl(&self.key).await;
let Some(ttl) = ttl else {
connection.write(Bytes::from_static(&[0])).await?;
return Ok(());
};
let mut buf = BytesMut::new();
buf.put_u8(1);
buf.put_u64(ttl);
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

11
src/config.rs Normal file
View File

@@ -0,0 +1,11 @@
use bon::Builder;
#[derive(Debug, Builder, Clone)]
pub struct ServerConfig {
#[builder(default = String::from("0.0.0.0"))]
pub host: String,
#[builder(default = 6171)]
pub port: u16,
#[builder(default = 256)]
pub max_connections: usize,
}

View File

@@ -4,7 +4,7 @@ use tokio::{
net::TcpStream,
};
use crate::{Result, command::Command};
use crate::{Result, commands::Command};
#[derive(Debug)]
pub struct Connection {

View File

@@ -1,28 +1,48 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use bytes::{BufMut, Bytes, BytesMut};
use tokio::sync::Mutex;
use byteyarn::Yarn;
use tokio::{
sync::{Mutex, Notify},
time::Instant,
};
use crate::Result;
#[derive(Debug, Clone)]
pub struct Database {
entries: Arc<Mutex<HashMap<String, Value>>>,
state: Arc<Mutex<DatabaseState>>,
notify: Arc<Notify>,
}
#[derive(Debug, Default)]
pub struct DatabaseState {
entries: BTreeMap<Yarn, Value>,
expirations: BTreeSet<(Instant, Yarn)>,
shutdown: bool,
}
#[derive(Debug, Clone)]
pub struct Value {
data: Bytes,
data: Box<[u8]>,
pub expiration: Option<Instant>,
}
impl Value {
pub fn new(data: Bytes) -> Self {
Self { data }
pub fn new(data: Bytes, expiration: Option<Instant>) -> Self {
Self {
data: data.into_iter().collect(),
expiration,
}
}
pub fn from_string(data: String) -> Self {
pub fn from_string(data: String, expiration: Option<Instant>) -> Self {
Self {
data: Bytes::from(data),
data: data.as_bytes().into(),
expiration,
}
}
@@ -34,34 +54,128 @@ impl Value {
impl Database {
pub(crate) fn new() -> Self {
let state = Arc::default();
Self {
entries: Arc::default(),
state,
notify: Arc::default(),
}
}
pub async fn get(&self, key: &str) -> Option<Bytes> {
let entries = self.entries.lock().await;
pub async fn get(&self, key: &str) -> Option<Box<[u8]>> {
let state = self.state.lock().await;
entries.get(key).map(|v| v.data.clone())
state.entries.get(key).map(|v| v.data.clone())
}
pub async fn set(&self, key: String, value: Value) -> Result<()> {
let mut entries = self.entries.lock().await;
let mut state = self.state.lock().await;
entries.insert(key, value);
let expiration = value.expiration.clone();
let key = Yarn::from(key.clone());
let previous = state.entries.insert(key.clone(), value);
let mut notify = false;
if let Some(previous_expiration) = previous.map(|v| v.expiration).flatten() {
if state
.expirations
.iter()
.next()
.is_some_and(|&(instant, ref next_key)| {
previous_expiration == instant && key == next_key
})
{
notify = true;
}
state
.expirations
.remove(&(previous_expiration, key.clone().into()));
}
if let Some(expiration) = expiration {
if state
.expirations
.iter()
.next()
.is_none_or(|&(instant, _)| expiration > instant)
{
notify = true;
}
state.expirations.insert((expiration, key.into()));
};
if notify {
self.notify.notify_one();
}
Ok(())
}
pub async fn delete(&self, key: &str) -> Option<Bytes> {
let mut entries = self.entries.lock().await;
pub async fn delete(&self, key: &str) -> Option<Box<[u8]>> {
let mut state = self.state.lock().await;
entries.remove(key).map(|v| v.data)
state.entries.remove(key).map(|v| v.data)
}
pub async fn has(&self, key: &str) -> bool {
let entries = self.entries.lock().await;
let state = self.state.lock().await;
entries.contains_key(key)
state.entries.contains_key(key)
}
pub async fn ttl(&self, key: &str) -> Option<u64> {
self.state
.lock()
.await
.entries
.get(key)
.map(|v| v.expiration.map(|e| (e - Instant::now()).as_secs()))
.flatten()
}
pub async fn shutdown(&mut self) {
self.state.lock().await.shutdown = true;
self.notify.notify_one();
}
}
pub async fn key_expiration_manager(db: Database) {
'outer: loop {
let mut state_lock = db.state.lock().await;
let state = &mut *state_lock;
if state.shutdown {
break;
}
let now = Instant::now();
while let Some((expiration, key)) = state.expirations.iter().next().as_ref() {
let expiration = *expiration;
if expiration <= now {
state.entries.remove(key);
state.expirations.remove(&(expiration, key.clone()));
continue;
}
drop(state_lock);
tokio::select! {
_ = tokio::time::sleep_until(expiration) => (),
_ = db.notify.notified() => (),
}
continue 'outer;
}
drop(state_lock);
db.notify.notified().await;
}
log::debug!("key_expiration_manager has finished");
}

View File

@@ -18,4 +18,6 @@ pub enum AppError {
NoResponse,
#[error("Expected a different response for the executed command")]
InvalidCommandResponse,
#[error("The binary command data is not structured correctly")]
UnexpectedCommandData,
}

View File

@@ -1,11 +1,14 @@
use bytes::Bytes;
use client::Client;
use database::Value;
use config::ServerConfig;
use errors::AppError;
use server::Server;
use tokio::{signal::ctrl_c, sync::oneshot};
#[cfg(test)]
pub mod tests;
pub mod client;
pub mod command;
pub mod commands;
pub mod config;
pub mod connection;
pub mod database;
pub mod errors;
@@ -16,29 +19,47 @@ pub type Result<T> = std::result::Result<T, AppError>;
#[tokio::main]
async fn main() -> Result<()> {
let mut server = Server::new("127.0.0.1:6171").await?;
env_logger::builder()
.format_target(false)
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.init();
// Testing
for i in 0..256 {
tokio::spawn(client(format!("client-{}", i + 1)));
}
let config = ServerConfig::builder()
.maybe_host(std::env::var("SERVER_HOST").ok())
.maybe_port(
std::env::var("SERVER_PORT")
.ok()
.map(|v| v.parse().ok())
.flatten(),
)
.maybe_max_connections(
std::env::var("MAX_CONNECTIONS")
.ok()
.map(|v| v.parse().ok())
.flatten(),
)
.build();
server.run().await?;
let mut server = Server::new(&config).await?;
Ok(())
}
// Test stuff
async fn client(v: String) -> Result<()> {
let mut client = Client::new("127.0.0.1:6171").await?;
client
.set(&v, Value::from_string(format!("{v}'s value")))
.await?;
assert_eq!(
client.get(&v).await.unwrap().unwrap(),
Bytes::from(format!("{v}'s value"))
log::info!("The server is listening on {}:{}", config.host, config.port);
log::info!(
"The maximum amount of concurrent connections is {}",
config.max_connections
);
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
tokio::spawn(async move {
if ctrl_c().await.is_ok() {
let _ = shutdown_sender.send(());
}
});
server.run(shutdown_receiver).await?;
log::info!("Goodbye");
Ok(())
}

View File

@@ -1,11 +1,14 @@
use std::sync::Arc;
use tokio::net::ToSocketAddrs;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::{net::TcpListener, sync::Semaphore};
use crate::Result;
use crate::config::ServerConfig;
use crate::connection::Connection;
use crate::database::Database;
use crate::database::{Database, key_expiration_manager};
use crate::handler::Handler;
#[derive(Debug)]
@@ -13,37 +16,64 @@ pub struct Server {
db: Database,
listener: TcpListener,
connection_limit: Arc<Semaphore>,
expiration_manager_handle: JoinHandle<()>,
}
impl Server {
const MAX_CONNECTIONS: usize = 256;
pub async fn new(config: &ServerConfig) -> Result<Self> {
let addr = format!("{}:{}", config.host, config.port);
Self::_new(addr, config.max_connections).await
}
pub async fn new<Addr: ToSocketAddrs>(addr: ToSocketAddrs) -> Result<Self> {
async fn _new<Addr: ToSocketAddrs>(addr: Addr, max_connections: usize) -> Result<Self> {
let listener = TcpListener::bind(addr).await?;
let db = Database::new();
let expiration_manager_handle = tokio::spawn(key_expiration_manager(db.clone()));
Ok(Self {
db: Database::new(),
connection_limit: Arc::new(Semaphore::const_new(Self::MAX_CONNECTIONS)),
db,
connection_limit: Arc::new(Semaphore::const_new(max_connections)),
listener,
expiration_manager_handle,
})
}
pub async fn run(&mut self) -> Result<()> {
pub async fn run(&mut self, mut shutdown: oneshot::Receiver<()>) -> Result<()> {
let shutdown = &mut shutdown;
loop {
let permit = Arc::clone(&self.connection_limit)
.acquire_owned()
.await
.unwrap();
let socket = self.listener.accept().await?.0;
let Some(socket) = ({
tokio::select! {
socket = self.listener.accept() => Some(socket?.0),
_ = &mut *shutdown => None,
}
}) else {
log::info!("Shutting down");
self.db.shutdown().await;
let _ = (&mut self.expiration_manager_handle).await;
return Ok(());
};
let addr = socket.peer_addr()?;
let connection = Connection::new(socket);
let mut handler = Handler::new(self.db.clone(), connection);
tokio::spawn(async move {
log::debug!("Spawned a new connection handler: {addr}");
if let Err(e) = handler.run().await {
println!("Handler::run error: {e:?}");
log::debug!("Handler::run error: {e:?}");
}
log::debug!("Connection handler ended: {addr}");
drop(permit);
});

44
src/tests.rs Normal file
View File

@@ -0,0 +1,44 @@
use std::time::Duration;
use tokio::sync::oneshot;
use crate::{client::Client, config::ServerConfig, server::Server};
#[tokio::test]
async fn expiration() -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::builder().host("127.0.0.1".into()).build();
let mut server = Server::new(&config).await?;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let server_handle = tokio::spawn(async move { server.run(shutdown_rx).await });
let mut client = client("127.0.0.1:6171").await?;
client
.set("test-key", "test-value".as_bytes(), Some(3))
.await
.unwrap();
assert!(client.has("test-key").await.unwrap());
assert_eq!(client.ttl("test-key").await.unwrap(), Some(2));
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(client.has("test-key").await.unwrap());
assert_eq!(client.ttl("test-key").await.unwrap(), Some(1));
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(!client.has("test-key").await.unwrap());
shutdown_tx.send(()).unwrap();
server_handle.await??;
Ok(())
}
async fn client<A: tokio::net::ToSocketAddrs>(
addr: A,
) -> Result<Client, Box<dyn std::error::Error>> {
Ok(Client::new(addr).await?)
}