Compare commits

..

20 Commits

Author SHA1 Message Date
409
0b0449e995 README.md better line breaks 2025-06-17 23:28:51 +02:00
409
e05a9d3d11 update README.md 2025-06-17 23:28:19 +02:00
409
4d45b5c4bc update compose.yaml 2025-06-17 23:26:44 +02:00
409
3d2d9d30ee docker 2025-06-17 23:08:32 +02:00
409
4d4ccc1d14 cli 2025-06-17 22:31:34 +02:00
409
7c0acc3ecf refactor set command 2025-06-17 21:21:16 +02:00
409
2c0942fee1 also parse LOG_LEVEL env variable 2025-06-17 21:14:11 +02:00
409
0a9c8f81aa persist command 2025-06-17 21:11:09 +02:00
409
c51c90b597 expire command 2025-06-17 21:00:40 +02:00
409
8ac4dac2f0 add README.md 2025-06-17 02:17:13 +02:00
409
bd19ddd6cb ttl command 2025-06-17 02:12:15 +02:00
409
20e3fbd5d3 read expiration from Set command + tests 2025-06-17 01:52:11 +02:00
409
28b42c786c basic expiration (values still need to serialize expiration) 2025-06-16 20:51:48 +02:00
409
39dd27378a basic logging 2025-06-16 18:11:48 +02:00
409
10837dac35 add ServerConfig with env variables 2025-06-16 18:03:42 +02:00
409
0c619fbc94 remove test from main.rs 2025-06-16 17:45:38 +02:00
409
2931cf2927 improve database memory usage 2025-06-16 17:41:09 +02:00
409
34818ce050 refactor commands 2025-06-12 19:04:52 +02:00
409
06a503f67d fix Server::new syntax error 2025-06-12 17:47:57 +02:00
409
a48496058c add logo 2025-06-12 15:53:45 +02:00
26 changed files with 1435 additions and 216 deletions

397
Cargo.lock generated
View File

@@ -17,11 +17,76 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]]
name = "anstream"
version = "0.6.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd"
[[package]]
name = "anstyle-parse"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.59.0",
]
[[package]]
name = "archive"
version = "0.1.0"
dependencies = [
"bon",
"bytes",
"byteyarn",
"clap",
"env_logger",
"log",
"shell-words",
"thiserror",
"tokio",
]
@@ -53,24 +118,225 @@ version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967"
[[package]]
name = "bon"
version = "3.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f61138465baf186c63e8d9b6b613b508cd832cba4ce93cf37ce5f096f91ac1a6"
dependencies = [
"bon-macros",
"rustversion",
]
[[package]]
name = "bon-macros"
version = "3.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40d1dad34aa19bf02295382f08d9bc40651585bd497266831d40ee6296fb49ca"
dependencies = [
"darling",
"ident_case",
"prettyplease",
"proc-macro2",
"quote",
"rustversion",
"syn",
]
[[package]]
name = "buf-trait"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21eaafc770e8c073d6c3facafe7617e774305d4954aa6351b9c452eb37ee17b4"
dependencies = [
"zerocopy",
]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "byteyarn"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93e51d26468a15ea59f8525e0c13dc405db43e644a0b1e6d44346c72cf4cf7b"
dependencies = [
"buf-trait",
]
[[package]]
name = "cfg-if"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268"
[[package]]
name = "clap"
version = "4.5.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675"
[[package]]
name = "colorchoice"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "darling"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "gimli"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "libc"
version = "0.2.172"
@@ -87,6 +353,12 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "memchr"
version = "2.7.5"
@@ -122,6 +394,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "once_cell_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad"
[[package]]
name = "parking_lot"
version = "0.12.4"
@@ -151,6 +429,31 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "portable-atomic"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "prettyplease"
version = "0.2.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -178,18 +481,79 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rustc-demangle"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f"
[[package]]
name = "rustversion"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
[[package]]
name = "signal-hook-registry"
version = "1.4.5"
@@ -215,6 +579,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"
version = "2.0.102"
@@ -281,6 +651,12 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
@@ -368,3 +744,24 @@ name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@@ -2,8 +2,24 @@
name = "archive"
version = "0.1.0"
edition = "2024"
authors = ["409"]
default-run = "archive-server"
[dependencies]
bon = "3.6.4"
bytes = "1.10.1"
byteyarn = "0.5.1"
clap = { version = "4.5.40", features = ["derive"] }
env_logger = "0.11.8"
log = "0.4.27"
shell-words = "1.1.0"
thiserror = "2.0.12"
tokio = { version = "1.45.1", features = ["full"] }
[[bin]]
name = "archive-server"
path = "src/bin/server.rs"
[[bin]]
name = "archive-cli"
path = "src/bin/cli.rs"

17
Dockerfile Normal file
View File

@@ -0,0 +1,17 @@
FROM rust:bullseye AS builder
WORKDIR /usr/src/archive
COPY Cargo.toml .
RUN mkdir -p src/bin && echo "fn main() {}" > src/bin/server.rs
RUN cargo build --release --bin archive-server
COPY src src
RUN touch src/main.rs
RUN cargo build --release --bin archive-server
RUN strip target/release/archive-server
FROM debian:12.11
RUN apt-get update
COPY --from=builder /usr/src/archive/target/release/archive-server /usr/local/bin/archive-server
ENTRYPOINT ["archive-server"]

34
README.md Normal file
View File

@@ -0,0 +1,34 @@
![Logo](logo.webp)
# ArcHIVE
A lightweight in-memory database written in Rust
## Usage
### CLI
Download a binary or compile it yourself using `cargo build --release --bin archive-cli`
To compile the CLI and install it globally run `cargo install --path .`
### Server
#### Docker
The repository contains a simple `compose.yaml`
#### Binary
Alternatively you can run the server binary directly
Download a binary or compile it yourself using `cargo build --release --bin archive-server`
#### Environment variables
This is a list of the server's environment variables and their default values:
- SERVER_HOST=0.0.0.0
- SERVER_PORT=6171
- MAX_CONNECTIONS=256
- LOG_LEVEL=info

12
compose.yaml Normal file
View File

@@ -0,0 +1,12 @@
services:
archive-server:
container_name: 'archive-server'
image: 'git.409dev.buzz/409/archive:latest'
build: '.'
ports:
- '6171:6171/tcp'
environment:
- 'SERVER_HOST=0.0.0.0'
- 'SERVER_PORT=6171'
- 'MAX_CONNECTIONS=256'
- 'LOG_LEVEL=info'

BIN
logo.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

123
src/bin/cli.rs Normal file
View File

@@ -0,0 +1,123 @@
use std::io::{Write as _, stdin};
use archive::{Result, client::Client};
use clap::Parser;
#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(long, default_value = "127.0.0.1")]
host: String,
#[arg(short, long, default_value_t = 6171)]
port: u16,
}
#[derive(Debug, Parser)]
#[command(help_template = "{subcommands}")]
enum Commands {
Get {
key: String,
},
Set {
key: String,
value: String,
expiration: Option<u64>,
},
Delete {
key: String,
},
Has {
key: String,
},
Ttl {
key: String,
},
Expire {
key: String,
seconds: u64,
},
Persist {
key: String,
},
#[command(aliases = &["exit", "q"])]
Quit,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let mut client = Client::new(&format!("{}:{}", args.host, args.port)).await?;
print_welcome();
let stdin = stdin();
let mut input = String::new();
loop {
input.clear();
print!("> ");
std::io::stdout().flush()?;
stdin.read_line(&mut input)?;
let Ok(mut words) = shell_words::split(&input) else {
continue;
};
words.insert(0, "".into());
let command = match Commands::try_parse_from(words.clone()) {
Ok(command) => command,
Err(e) => {
println!("{}", e.render());
continue;
}
};
match command {
Commands::Get { key } => {
let value = client.get(&key).await?;
println!("{value:?}");
}
Commands::Set {
key,
value,
expiration,
} => {
client.set(&key, value.as_bytes(), expiration).await?;
println!("1");
}
Commands::Delete { key } => {
let value = client.delete(&key).await?;
println!("{value:?}");
}
Commands::Has { key } => {
let value = client.has(&key).await?;
println!("{value:?}");
}
Commands::Ttl { key } => {
let value = client.ttl(&key).await?;
println!("{value:?}");
}
Commands::Expire { key, seconds } => {
let value = client.expire(&key, seconds).await?;
println!("{value:?}");
}
Commands::Persist { key } => {
let value = client.persist(&key).await?;
println!("{value:?}");
}
Commands::Quit => break,
}
}
println!("{input}");
Ok(())
}
fn print_welcome() {
let version = env!("CARGO_PKG_VERSION");
println!("archive-cli {version}");
}

53
src/bin/server.rs Normal file
View File

@@ -0,0 +1,53 @@
use archive::Result;
use archive::config::ServerConfig;
use archive::server::Server;
use tokio::{signal::ctrl_c, sync::oneshot};
#[tokio::main]
async fn main() -> Result<()> {
env_logger::builder()
.format_target(false)
.filter_level(log::LevelFilter::Info)
.parse_env("LOG_LEVEL")
.parse_default_env()
.init();
let config = ServerConfig::builder()
.maybe_host(std::env::var("SERVER_HOST").ok())
.maybe_port(
std::env::var("SERVER_PORT")
.ok()
.map(|v| v.parse().ok())
.flatten(),
)
.maybe_max_connections(
std::env::var("MAX_CONNECTIONS")
.ok()
.map(|v| v.parse().ok())
.flatten(),
)
.build();
let mut server = Server::new(&config).await?;
log::info!("The server is listening on {}:{}", config.host, config.port);
log::info!(
"The maximum amount of concurrent connections is {}",
config.max_connections
);
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
tokio::spawn(async move {
if ctrl_c().await.is_ok() {
let _ = shutdown_sender.send(());
}
});
server.run(shutdown_receiver).await?;
log::info!("Goodbye");
Ok(())
}

View File

@@ -1,7 +1,7 @@
use bytes::{Buf, BufMut as _, Bytes, BytesMut};
use tokio::net::{TcpStream, ToSocketAddrs};
use crate::{Result, connection::Connection, database::Value, errors::AppError};
use crate::{Result, connection::Connection, errors::AppError};
pub struct Client {
connection: Connection,
@@ -50,7 +50,12 @@ impl Client {
Ok(response)
}
pub async fn set(&mut self, key: &str, value: Value) -> Result<()> {
pub async fn set(
&mut self,
key: &str,
data: &[u8],
expiration_secs: Option<u64>,
) -> Result<()> {
let mut bytes = BytesMut::new();
bytes.put_u16(3);
@@ -64,7 +69,18 @@ impl Client {
bytes.put_u16(key_length);
bytes.put_slice(key.as_bytes());
value.write_to_bytes(&mut bytes);
bytes.put_u32(data.len() as u32);
bytes.put_slice(data);
match expiration_secs {
Some(seconds) => {
bytes.put_u8(1);
bytes.put_u64(seconds);
}
None => {
bytes.put_u8(0);
}
}
self.connection.write(bytes.into()).await?;
@@ -128,6 +144,86 @@ impl Client {
Ok(r.try_get_u8()? == 1)
}
pub async fn ttl(&mut self, key: &str) -> Result<Option<u64>> {
let mut bytes = BytesMut::new();
bytes.put_u16(3);
bytes.put_slice(b"ttl");
let key_length: u16 = key
.len()
.try_into()
.map_err(|_| AppError::KeyLength(key.len()))?;
bytes.put_u16(key_length);
bytes.put_slice(key.as_bytes());
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let ttl = match r.try_get_u8()? {
1 => Some(r.try_get_u64()?),
0 => None,
_ => return Err(AppError::InvalidCommandResponse),
};
Ok(ttl)
}
pub async fn expire(&mut self, key: &str, seconds: u64) -> Result<bool> {
let mut bytes = BytesMut::new();
bytes.put_u16(6);
bytes.put_slice(b"expire");
let key_length: u16 = key
.len()
.try_into()
.map_err(|_| AppError::KeyLength(key.len()))?;
bytes.put_u16(key_length);
bytes.put_slice(key.as_bytes());
bytes.put_u64(seconds);
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let success = match r.try_get_u8()? {
1 => true,
0 => false,
_ => return Err(AppError::InvalidCommandResponse),
};
Ok(success)
}
pub async fn persist(&mut self, key: &str) -> Result<bool> {
let mut bytes = BytesMut::new();
bytes.put_u16(7);
bytes.put_slice(b"persist");
let key_length: u16 = key
.len()
.try_into()
.map_err(|_| AppError::KeyLength(key.len()))?;
bytes.put_u16(key_length);
bytes.put_slice(key.as_bytes());
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let success = match r.try_get_u8()? {
1 => true,
0 => false,
_ => return Err(AppError::InvalidCommandResponse),
};
Ok(success)
}
async fn get_response(&mut self) -> Result<Bytes> {
self.connection
.read_bytes()

View File

@@ -1,139 +0,0 @@
use std::io::{Cursor, Read};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::{
Result,
connection::Connection,
database::{Database, Value},
errors::AppError,
};
#[derive(Debug)]
pub enum Command {
Get { key: String },
Set { key: String, value: Value },
Delete { key: String },
Has { key: String },
}
impl Command {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
match self {
Command::Get { ref key } => {
let value = db.get(&key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => {
buf.put_u8(0);
}
}
connection.write(buf.into()).await?;
}
Command::Set { key, value } => {
db.set(key.clone(), value.clone()).await?;
connection.write(Bytes::from_static(&[1])).await?;
}
Command::Delete { ref key } => {
let value = db.delete(key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => buf.put_u8(0),
}
connection.write(buf.into()).await?;
}
Command::Has { ref key } => {
let value = db.has(key).await;
let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]);
connection.write(buf.into()).await?;
}
}
Ok(())
}
pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> {
let mut buffer = Cursor::new(&bytes[..]);
let name = read_string(&mut buffer)?;
Self::parse_inner(name, &mut buffer)
}
fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> {
let command = match command_name.as_str() {
"get" => {
let key = read_string(bytes)?;
Self::Get { key }
}
"set" => {
let key = read_string(bytes)?;
let data = read_bytes(bytes)?;
Self::Set {
key,
value: Value::new(data),
}
}
"delete" => {
let key = read_string(bytes)?;
Self::Delete { key }
}
"has" => {
let key = read_string(bytes)?;
Self::Has { key }
}
_ => return Err(AppError::UnknownCommand(command_name)),
};
Ok((command, bytes.position()))
}
}
fn read_string(buffer: &mut Cursor<&[u8]>) -> Result<String> {
let length = buffer.try_get_u16()? as usize;
if buffer.remaining() < length {
return Err(AppError::IncompleteCommandBuffer);
}
let mut contents = Vec::with_capacity(length);
for _ in 0..length {
contents.push(buffer.try_get_u8()?);
}
let string = String::from_utf8(contents)?;
Ok(string)
}
fn read_bytes(buffer: &mut Cursor<&[u8]>) -> Result<Bytes> {
let len = buffer.try_get_u32()? as usize;
if buffer.remaining() < len {
return Err(AppError::IncompleteCommandBuffer);
}
Ok(buffer.copy_to_bytes(len))
}

42
src/commands/delete.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::io::Cursor;
use bytes::{Buf as _, BufMut as _, BytesMut};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Delete {
key: String,
}
impl Delete {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.delete(&self.key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => buf.put_u8(0),
}
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

37
src/commands/expire.rs Normal file
View File

@@ -0,0 +1,37 @@
use std::io::Cursor;
use bytes::{Buf as _, Bytes};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Expire {
key: String,
seconds: u64,
}
impl Expire {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.expire(&self.key, self.seconds).await?;
connection
.write(Bytes::from_static(if value { &[1] } else { &[0] }))
.await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
let seconds = bytes.try_get_u64()?;
Ok(Self { key, seconds })
}
}

44
src/commands/get.rs Normal file
View File

@@ -0,0 +1,44 @@
use std::io::Cursor;
use bytes::{Buf as _, BufMut as _, BytesMut};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Get {
key: String,
}
impl Get {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.get(&self.key).await;
let mut buf = BytesMut::new();
match value {
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => {
buf.put_u8(0);
}
}
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

34
src/commands/has.rs Normal file
View File

@@ -0,0 +1,34 @@
use std::io::Cursor;
use bytes::{Buf as _, Bytes};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Has {
key: String,
}
impl Has {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.has(&self.key).await;
let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]);
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

74
src/commands/mod.rs Normal file
View File

@@ -0,0 +1,74 @@
mod delete;
mod expire;
mod get;
mod has;
mod persist;
mod set;
mod ttl;
use std::io::Cursor;
use bytes::{Buf, BytesMut};
use delete::Delete;
use expire::Expire;
use get::Get;
use has::Has;
use persist::Persist;
use set::Set;
use ttl::Ttl;
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug)]
pub enum Command {
Get(Get),
Set(Set),
Delete(Delete),
Has(Has),
Ttl(Ttl),
Expire(Expire),
Persist(Persist),
}
impl Command {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
match self {
Command::Get(get) => get.execute(db, connection).await,
Command::Set(set) => set.execute(db, connection).await,
Command::Delete(delete) => delete.execute(db, connection).await,
Command::Has(has) => has.execute(db, connection).await,
Command::Ttl(ttl) => ttl.execute(db, connection).await,
Command::Expire(expire) => expire.execute(db, connection).await,
Command::Persist(persist) => persist.execute(db, connection).await,
}
}
pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> {
let mut buffer = Cursor::new(&bytes[..]);
let name_length = buffer.try_get_u16()? as usize;
if buffer.remaining() < name_length {
return Err(AppError::IncompleteCommandBuffer);
}
let name = String::from_utf8(buffer.copy_to_bytes(name_length).to_vec())?;
Self::parse_inner(name, &mut buffer)
}
fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> {
let command = match command_name.to_lowercase().as_str() {
"get" => Self::Get(Get::parse(bytes)?),
"set" => Self::Set(Set::parse(bytes)?),
"delete" => Self::Delete(Delete::parse(bytes)?),
"has" => Self::Has(Has::parse(bytes)?),
"ttl" => Self::Ttl(Ttl::parse(bytes)?),
"expire" => Self::Expire(Expire::parse(bytes)?),
"persist" => Self::Persist(Persist::parse(bytes)?),
_ => return Err(AppError::UnknownCommand(command_name)),
};
Ok((command, bytes.position()))
}
}

34
src/commands/persist.rs Normal file
View File

@@ -0,0 +1,34 @@
use std::io::Cursor;
use bytes::{Buf as _, Bytes};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Persist {
key: String,
}
impl Persist {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.persist(&self.key).await?;
connection
.write(Bytes::from_static(if value { &[1] } else { &[0] }))
.await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

51
src/commands/set.rs Normal file
View File

@@ -0,0 +1,51 @@
use std::io::Cursor;
use crate::{Result, connection::Connection, database::Database, errors::AppError};
use bytes::{Buf as _, Bytes};
#[derive(Debug, Clone)]
pub struct Set {
key: String,
data: Box<[u8]>,
expiration: Option<u64>,
}
impl Set {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
db.set(self.key, self.data, self.expiration).await?;
connection.write(Bytes::from_static(&[1])).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
let value_length = bytes.try_get_u32()? as usize;
if bytes.remaining() < value_length {
return Err(AppError::IncompleteCommandBuffer);
}
let data = bytes.copy_to_bytes(value_length);
let expiration: Option<u64> = match bytes.try_get_u8()? {
1 => Some(bytes.try_get_u64()?),
0 => None,
_ => return Err(AppError::UnexpectedCommandData),
};
Ok(Self {
key,
data: (*data).into(),
expiration,
})
}
}

42
src/commands/ttl.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::io::Cursor;
use bytes::{Buf as _, BufMut, Bytes, BytesMut};
use crate::{Result, connection::Connection, database::Database, errors::AppError};
#[derive(Debug, Clone)]
pub struct Ttl {
key: String,
}
impl Ttl {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let ttl = db.ttl(&self.key).await;
let Some(ttl) = ttl else {
connection.write(Bytes::from_static(&[0])).await?;
return Ok(());
};
let mut buf = BytesMut::new();
buf.put_u8(1);
buf.put_u64(ttl);
connection.write(buf.into()).await?;
Ok(())
}
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> {
let key_length = bytes.try_get_u16()? as usize;
if bytes.remaining() < key_length {
return Err(AppError::IncompleteCommandBuffer);
}
let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?;
Ok(Self { key })
}
}

11
src/config.rs Normal file
View File

@@ -0,0 +1,11 @@
use bon::Builder;
#[derive(Debug, Builder, Clone)]
pub struct ServerConfig {
#[builder(default = String::from("0.0.0.0"))]
pub host: String,
#[builder(default = 6171)]
pub port: u16,
#[builder(default = 256)]
pub max_connections: usize,
}

View File

@@ -4,7 +4,7 @@ use tokio::{
net::TcpStream,
};
use crate::{Result, command::Command};
use crate::{Result, commands::Command};
#[derive(Debug)]
pub struct Connection {

View File

@@ -1,28 +1,46 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::Duration,
};
use bytes::{BufMut, Bytes, BytesMut};
use tokio::sync::Mutex;
use bytes::{BufMut, BytesMut};
use byteyarn::Yarn;
use tokio::{
sync::{Mutex, Notify},
time::Instant,
};
use crate::Result;
#[derive(Debug, Clone)]
pub struct Database {
entries: Arc<Mutex<HashMap<String, Value>>>,
state: Arc<Mutex<DatabaseState>>,
notify: Arc<Notify>,
}
#[derive(Debug, Default)]
pub struct DatabaseState {
entries: BTreeMap<Yarn, Value>,
expirations: BTreeSet<(Instant, Yarn)>,
shutdown: bool,
}
#[derive(Debug, Clone)]
pub struct Value {
data: Bytes,
data: Box<[u8]>,
pub expiration: Option<Instant>,
}
impl Value {
pub fn new(data: Bytes) -> Self {
Self { data }
pub fn new(data: Box<[u8]>, expiration: Option<Instant>) -> Self {
Self { data, expiration }
}
pub fn from_string(data: String) -> Self {
pub fn from_string(data: String, expiration: Option<Instant>) -> Self {
Self {
data: Bytes::from(data),
data: data.as_bytes().into(),
expiration,
}
}
@@ -34,34 +52,197 @@ impl Value {
impl Database {
pub(crate) fn new() -> Self {
let state = Arc::default();
Self {
entries: Arc::default(),
state,
notify: Arc::default(),
}
}
pub async fn get(&self, key: &str) -> Option<Bytes> {
let entries = self.entries.lock().await;
pub async fn get(&self, key: &str) -> Option<Box<[u8]>> {
let state = self.state.lock().await;
entries.get(key).map(|v| v.data.clone())
state.entries.get(key).map(|v| v.data.clone())
}
pub async fn set(&self, key: String, value: Value) -> Result<()> {
let mut entries = self.entries.lock().await;
pub async fn set(&self, key: String, data: Box<[u8]>, expiration: Option<u64>) -> Result<()> {
let mut state = self.state.lock().await;
entries.insert(key, value);
let expiration = expiration.map(|seconds| Instant::now() + Duration::from_secs(seconds));
let key = Yarn::from(key.clone());
let value = Value::new(data, expiration);
let previous = state.entries.insert(key.clone(), value);
let mut notify = false;
if let Some(previous_expiration) = previous.map(|v| v.expiration).flatten() {
if state
.expirations
.iter()
.next()
.is_some_and(|&(instant, ref next_key)| {
previous_expiration == instant && key == next_key
})
{
notify = true;
}
state
.expirations
.remove(&(previous_expiration, key.clone().into()));
}
if let Some(expiration) = expiration {
if state
.expirations
.iter()
.next()
.is_none_or(|&(instant, _)| expiration > instant)
{
notify = true;
}
state.expirations.insert((expiration, key.into()));
};
if notify {
self.notify.notify_one();
}
Ok(())
}
pub async fn delete(&self, key: &str) -> Option<Bytes> {
let mut entries = self.entries.lock().await;
pub async fn delete(&self, key: &str) -> Option<Box<[u8]>> {
let mut state = self.state.lock().await;
entries.remove(key).map(|v| v.data)
state.entries.remove(key).map(|v| v.data)
}
pub async fn has(&self, key: &str) -> bool {
let entries = self.entries.lock().await;
let state = self.state.lock().await;
entries.contains_key(key)
state.entries.contains_key(key)
}
pub async fn ttl(&self, key: &str) -> Option<u64> {
self.state
.lock()
.await
.entries
.get(key)
.map(|v| v.expiration.map(|e| (e - Instant::now()).as_secs()))
.flatten()
}
pub async fn expire(&self, key: &str, seconds: u64) -> Result<bool> {
let mut state = self.state.lock().await;
let key = Yarn::copy(key);
let expiration = Instant::now() + Duration::from_secs(seconds);
let notify =
state
.expirations
.iter()
.next()
.is_none_or(|&(instant, ref next_expiration_key)| {
next_expiration_key == &key || instant > expiration
});
let Some(value) = state.entries.get_mut(&key) else {
return Ok(false);
};
let previous_expiration = value.expiration.take();
value.expiration = Some(expiration);
if let Some(previous_expiration) = previous_expiration {
state
.expirations
.remove(&(previous_expiration, key.clone()));
};
state.expirations.insert((expiration, key));
if notify {
self.notify.notify_one();
}
Ok(true)
}
pub async fn persist(&self, key: &str) -> Result<bool> {
let mut state = self.state.lock().await;
let key = Yarn::copy(key);
let notify = state
.expirations
.iter()
.next()
.is_some_and(|&(_, ref next_expiration_key)| next_expiration_key == &key);
let Some(value) = state.entries.get_mut(&key) else {
return Ok(false);
};
match value.expiration.take() {
Some(expiration) => {
state.expirations.remove(&(expiration, key));
}
None => return Ok(false),
}
if notify {
self.notify.notify_one();
}
Ok(true)
}
pub async fn shutdown(&mut self) {
self.state.lock().await.shutdown = true;
self.notify.notify_one();
}
}
pub async fn key_expiration_manager(db: Database) {
'outer: loop {
let mut state_lock = db.state.lock().await;
let state = &mut *state_lock;
if state.shutdown {
break;
}
let now = Instant::now();
while let Some((expiration, key)) = state.expirations.iter().next().as_ref() {
let expiration = *expiration;
if expiration <= now {
state.entries.remove(key);
state.expirations.remove(&(expiration, key.clone()));
continue;
}
drop(state_lock);
tokio::select! {
_ = tokio::time::sleep_until(expiration) => (),
_ = db.notify.notified() => (),
}
continue 'outer;
}
drop(state_lock);
db.notify.notified().await;
}
log::debug!("key_expiration_manager has finished");
}

View File

@@ -18,4 +18,6 @@ pub enum AppError {
NoResponse,
#[error("Expected a different response for the executed command")]
InvalidCommandResponse,
#[error("The binary command data is not structured correctly")]
UnexpectedCommandData,
}

15
src/lib.rs Normal file
View File

@@ -0,0 +1,15 @@
use errors::AppError;
#[cfg(test)]
pub mod tests;
pub mod client;
pub mod commands;
pub mod config;
pub mod connection;
pub mod database;
pub mod errors;
pub mod handler;
pub mod server;
pub type Result<T> = std::result::Result<T, AppError>;

View File

@@ -1,44 +0,0 @@
use bytes::Bytes;
use client::Client;
use database::Value;
use errors::AppError;
use server::Server;
pub mod client;
pub mod command;
pub mod connection;
pub mod database;
pub mod errors;
pub mod handler;
pub mod server;
pub type Result<T> = std::result::Result<T, AppError>;
#[tokio::main]
async fn main() -> Result<()> {
let mut server = Server::new("127.0.0.1:6171").await?;
// Testing
for i in 0..256 {
tokio::spawn(client(format!("client-{}", i + 1)));
}
server.run().await?;
Ok(())
}
// Test stuff
async fn client(v: String) -> Result<()> {
let mut client = Client::new("127.0.0.1:6171").await?;
client
.set(&v, Value::from_string(format!("{v}'s value")))
.await?;
assert_eq!(
client.get(&v).await.unwrap().unwrap(),
Bytes::from(format!("{v}'s value"))
);
Ok(())
}

View File

@@ -1,11 +1,14 @@
use std::sync::Arc;
use tokio::net::ToSocketAddrs;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::{net::TcpListener, sync::Semaphore};
use crate::Result;
use crate::config::ServerConfig;
use crate::connection::Connection;
use crate::database::Database;
use crate::database::{Database, key_expiration_manager};
use crate::handler::Handler;
#[derive(Debug)]
@@ -13,37 +16,64 @@ pub struct Server {
db: Database,
listener: TcpListener,
connection_limit: Arc<Semaphore>,
expiration_manager_handle: JoinHandle<()>,
}
impl Server {
const MAX_CONNECTIONS: usize = 256;
pub async fn new(config: &ServerConfig) -> Result<Self> {
let addr = format!("{}:{}", config.host, config.port);
Self::_new(addr, config.max_connections).await
}
pub async fn new<Addr: ToSocketAddrs>(addr: ToSocketAddrs) -> Result<Self> {
async fn _new<Addr: ToSocketAddrs>(addr: Addr, max_connections: usize) -> Result<Self> {
let listener = TcpListener::bind(addr).await?;
let db = Database::new();
let expiration_manager_handle = tokio::spawn(key_expiration_manager(db.clone()));
Ok(Self {
db: Database::new(),
connection_limit: Arc::new(Semaphore::const_new(Self::MAX_CONNECTIONS)),
db,
connection_limit: Arc::new(Semaphore::const_new(max_connections)),
listener,
expiration_manager_handle,
})
}
pub async fn run(&mut self) -> Result<()> {
pub async fn run(&mut self, mut shutdown: oneshot::Receiver<()>) -> Result<()> {
let shutdown = &mut shutdown;
loop {
let permit = Arc::clone(&self.connection_limit)
.acquire_owned()
.await
.unwrap();
let socket = self.listener.accept().await?.0;
let Some(socket) = ({
tokio::select! {
socket = self.listener.accept() => Some(socket?.0),
_ = &mut *shutdown => None,
}
}) else {
log::info!("Shutting down");
self.db.shutdown().await;
let _ = (&mut self.expiration_manager_handle).await;
return Ok(());
};
let addr = socket.peer_addr()?;
let connection = Connection::new(socket);
let mut handler = Handler::new(self.db.clone(), connection);
tokio::spawn(async move {
log::debug!("Spawned a new connection handler: {addr}");
if let Err(e) = handler.run().await {
println!("Handler::run error: {e:?}");
log::debug!("Handler::run error: {e:?}");
}
log::debug!("Connection handler ended: {addr}");
drop(permit);
});

57
src/tests.rs Normal file
View File

@@ -0,0 +1,57 @@
use std::time::Duration;
use tokio::sync::oneshot;
use crate::{client::Client, config::ServerConfig, server::Server};
#[tokio::test]
async fn expiration() -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::builder().host("127.0.0.1".into()).build();
let mut server = Server::new(&config).await?;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let server_handle = tokio::spawn(async move { server.run(shutdown_rx).await });
let mut client = client("127.0.0.1:6171").await?;
client
.set("test-key", b"test-value", Some(3))
.await
.unwrap();
assert!(client.has("test-key").await.unwrap());
assert_eq!(client.ttl("test-key").await.unwrap(), Some(2));
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(client.has("test-key").await.unwrap());
assert_eq!(client.ttl("test-key").await.unwrap(), Some(1));
assert!(client.expire("test-key", 2).await?);
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(!client.has("test-key").await.unwrap());
assert!(!client.expire("test-key", 10).await?);
client.set("test-key", b"test-value", Some(2)).await?;
assert_eq!(client.ttl("test-key").await?, Some(1));
assert!(client.persist("test-key").await?);
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(client.get("test-key").await?, Some("test-value".into()));
shutdown_tx.send(()).unwrap();
server_handle.await??;
Ok(())
}
async fn client<A: tokio::net::ToSocketAddrs>(
addr: A,
) -> Result<Client, Box<dyn std::error::Error>> {
Ok(Client::new(addr).await?)
}