From 7a6b1b695420cf588895e33796e99836413e2b23 Mon Sep 17 00:00:00 2001 From: 409 <409dev@protonmail.com> Date: Thu, 12 Jun 2025 15:52:01 +0200 Subject: [PATCH] basic `get` / `set` / `delete` / `has` --- .gitignore | 1 + Cargo.lock | 370 ++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 9 ++ src/client.rs | 137 +++++++++++++++++ src/command.rs | 139 +++++++++++++++++ src/connection.rs | 51 +++++++ src/database.rs | 67 +++++++++ src/errors.rs | 21 +++ src/handler.rs | 25 ++++ src/main.rs | 44 ++++++ src/server.rs | 52 +++++++ 11 files changed, 916 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/client.rs create mode 100644 src/command.rs create mode 100644 src/connection.rs create mode 100644 src/database.rs create mode 100644 src/errors.rs create mode 100644 src/handler.rs create mode 100644 src/main.rs create mode 100644 src/server.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..159ba12 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,370 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + +[[package]] +name = "archive" +version = "0.1.0" +dependencies = [ + "bytes", + "thiserror", + "tokio", +] + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cfg-if" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "libc" +version = "0.2.172" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" + +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.59.0", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "syn" +version = "2.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6397daf94fa90f058bd0fd88429dd9e5738999cca8d701813c80723add80462" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio" +version = "1.45.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..4c99235 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "archive" +version = "0.1.0" +edition = "2024" + +[dependencies] +bytes = "1.10.1" +thiserror = "2.0.12" +tokio = { version = "1.45.1", features = ["full"] } diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..4f74ca8 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,137 @@ +use bytes::{Buf, BufMut as _, Bytes, BytesMut}; +use tokio::net::{TcpStream, ToSocketAddrs}; + +use crate::{Result, connection::Connection, database::Value, errors::AppError}; + +pub struct Client { + connection: Connection, +} + +impl Client { + pub async fn new(addr: Addr) -> Result { + let socket = TcpStream::connect(addr).await?; + + let connection = Connection::new(socket); + + Ok(Self { connection }) + } + + pub async fn get(&mut self, key: &str) -> Result> { + let mut bytes = BytesMut::new(); + + bytes.put_u16(3); + bytes.put_slice(b"get"); + + let key_length: u16 = key + .len() + .try_into() + .map_err(|_| AppError::KeyLength(key.len()))?; + + bytes.put_u16(key_length); + bytes.put_slice(key.as_bytes()); + + self.connection.write(bytes.into()).await?; + + let mut r = self.get_response().await?; + let response = match r.try_get_u8()? { + 0 => None, + 1 => { + let len = r.try_get_u32()? as usize; + + if r.remaining() < len { + return Err(AppError::InvalidCommandResponse); + } + + Some(r.copy_to_bytes(len)) + } + _ => return Err(AppError::InvalidCommandResponse), + }; + + Ok(response) + } + + pub async fn set(&mut self, key: &str, value: Value) -> Result<()> { + let mut bytes = BytesMut::new(); + + bytes.put_u16(3); + bytes.put_slice(b"set"); + + let key_length: u16 = key + .len() + .try_into() + .map_err(|_| AppError::KeyLength(key.len()))?; + + bytes.put_u16(key_length); + bytes.put_slice(key.as_bytes()); + + value.write_to_bytes(&mut bytes); + + self.connection.write(bytes.into()).await?; + + let mut r = self.get_response().await?; + + match r.try_get_u8()? { + 1 => return Ok(()), + _ => return Err(AppError::InvalidCommandResponse), + } + } + + pub async fn delete(&mut self, key: &str) -> Result> { + let mut bytes = BytesMut::new(); + + bytes.put_u16(6); + bytes.put_slice(b"delete"); + + let key_length: u16 = key + .len() + .try_into() + .map_err(|_| AppError::KeyLength(key.len()))?; + + bytes.put_u16(key_length); + bytes.put_slice(key.as_bytes()); + + self.connection.write(bytes.into()).await?; + + let mut r = self.get_response().await?; + + let response = match r.try_get_u8()? { + 1 => { + let len = r.try_get_u32()?; + let bytes = r.copy_to_bytes(len as usize); + + Some(bytes) + } + 0 => None, + _ => return Err(AppError::InvalidCommandResponse), + }; + + Ok(response) + } + + pub async fn has(&mut self, key: &str) -> Result { + let mut bytes = BytesMut::new(); + bytes.put_u16(3); + bytes.put_slice(b"has"); + + let key_length: u16 = key + .len() + .try_into() + .map_err(|_| AppError::KeyLength(key.len()))?; + + bytes.put_u16(key_length); + bytes.put_slice(key.as_bytes()); + + self.connection.write(bytes.into()).await?; + + let mut r = self.get_response().await?; + + Ok(r.try_get_u8()? == 1) + } + + async fn get_response(&mut self) -> Result { + self.connection + .read_bytes() + .await? + .ok_or(AppError::NoResponse) + } +} diff --git a/src/command.rs b/src/command.rs new file mode 100644 index 0000000..bd65632 --- /dev/null +++ b/src/command.rs @@ -0,0 +1,139 @@ +use std::io::{Cursor, Read}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +use crate::{ + Result, + connection::Connection, + database::{Database, Value}, + errors::AppError, +}; + +#[derive(Debug)] +pub enum Command { + Get { key: String }, + Set { key: String, value: Value }, + Delete { key: String }, + Has { key: String }, +} + +impl Command { + pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { + match self { + Command::Get { ref key } => { + let value = db.get(&key).await; + + let mut buf = BytesMut::new(); + match value { + Some(v) => { + buf.put_u8(1); + buf.put_u32(v.len() as u32); + buf.put_slice(&v); + } + None => { + buf.put_u8(0); + } + } + + connection.write(buf.into()).await?; + } + Command::Set { key, value } => { + db.set(key.clone(), value.clone()).await?; + + connection.write(Bytes::from_static(&[1])).await?; + } + Command::Delete { ref key } => { + let value = db.delete(key).await; + + let mut buf = BytesMut::new(); + match value { + Some(v) => { + buf.put_u8(1); + buf.put_u32(v.len() as u32); + buf.put_slice(&v); + } + None => buf.put_u8(0), + } + + connection.write(buf.into()).await?; + } + Command::Has { ref key } => { + let value = db.has(key).await; + + let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]); + + connection.write(buf.into()).await?; + } + } + + Ok(()) + } + + pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> { + let mut buffer = Cursor::new(&bytes[..]); + + let name = read_string(&mut buffer)?; + + Self::parse_inner(name, &mut buffer) + } + + fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> { + let command = match command_name.as_str() { + "get" => { + let key = read_string(bytes)?; + + Self::Get { key } + } + "set" => { + let key = read_string(bytes)?; + let data = read_bytes(bytes)?; + + Self::Set { + key, + value: Value::new(data), + } + } + "delete" => { + let key = read_string(bytes)?; + + Self::Delete { key } + } + "has" => { + let key = read_string(bytes)?; + + Self::Has { key } + } + _ => return Err(AppError::UnknownCommand(command_name)), + }; + + Ok((command, bytes.position())) + } +} + +fn read_string(buffer: &mut Cursor<&[u8]>) -> Result { + let length = buffer.try_get_u16()? as usize; + + if buffer.remaining() < length { + return Err(AppError::IncompleteCommandBuffer); + } + + let mut contents = Vec::with_capacity(length); + + for _ in 0..length { + contents.push(buffer.try_get_u8()?); + } + + let string = String::from_utf8(contents)?; + + Ok(string) +} + +fn read_bytes(buffer: &mut Cursor<&[u8]>) -> Result { + let len = buffer.try_get_u32()? as usize; + + if buffer.remaining() < len { + return Err(AppError::IncompleteCommandBuffer); + } + + Ok(buffer.copy_to_bytes(len)) +} diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..792c598 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,51 @@ +use bytes::{Buf, Bytes, BytesMut}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, + net::TcpStream, +}; + +use crate::{Result, command::Command}; + +#[derive(Debug)] +pub struct Connection { + stream: BufWriter, + buffer: BytesMut, +} + +impl Connection { + pub fn new(stream: TcpStream) -> Self { + Self { + stream: BufWriter::new(stream), + buffer: BytesMut::with_capacity(4096), + } + } + + pub async fn read_command(&mut self) -> Result> { + loop { + if let Ok((command, length)) = Command::parse(&self.buffer) { + self.buffer.advance(length as usize); + + return Ok(Some(command)); + } + + if self.stream.read_buf(&mut self.buffer).await? == 0 { + return Ok(None); + } + } + } + + pub async fn read_bytes(&mut self) -> Result> { + if self.stream.read_buf(&mut self.buffer).await? == 0 { + return Ok(None); + } + + return Ok(Some(self.buffer.split().into())); + } + + pub async fn write(&mut self, mut bytes: Bytes) -> Result<()> { + self.stream.write_buf(&mut bytes).await?; + self.stream.flush().await?; + + Ok(()) + } +} diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..15067ed --- /dev/null +++ b/src/database.rs @@ -0,0 +1,67 @@ +use std::{collections::HashMap, sync::Arc}; + +use bytes::{BufMut, Bytes, BytesMut}; +use tokio::sync::Mutex; + +use crate::Result; + +#[derive(Debug, Clone)] +pub struct Database { + entries: Arc>>, +} + +#[derive(Debug, Clone)] +pub struct Value { + data: Bytes, +} + +impl Value { + pub fn new(data: Bytes) -> Self { + Self { data } + } + + pub fn from_string(data: String) -> Self { + Self { + data: Bytes::from(data), + } + } + + pub fn write_to_bytes(&self, bytes: &mut BytesMut) { + bytes.put_u32(self.data.len() as u32); + bytes.put_slice(&self.data); + } +} + +impl Database { + pub(crate) fn new() -> Self { + Self { + entries: Arc::default(), + } + } + + pub async fn get(&self, key: &str) -> Option { + let entries = self.entries.lock().await; + + entries.get(key).map(|v| v.data.clone()) + } + + pub async fn set(&self, key: String, value: Value) -> Result<()> { + let mut entries = self.entries.lock().await; + + entries.insert(key, value); + + Ok(()) + } + + pub async fn delete(&self, key: &str) -> Option { + let mut entries = self.entries.lock().await; + + entries.remove(key).map(|v| v.data) + } + + pub async fn has(&self, key: &str) -> bool { + let entries = self.entries.lock().await; + + entries.contains_key(key) + } +} diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..01b6771 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,21 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum AppError { + #[error("An IO error occurred")] + Io(#[from] std::io::Error), + #[error("A TryGetError occurred")] + TryGet(#[from] bytes::TryGetError), + #[error("The buffer is missing data for a complete command")] + IncompleteCommandBuffer, + #[error("A Utf8Error occurred")] + FromUtf8(#[from] std::string::FromUtf8Error), + #[error("The command {0} was not recognized")] + UnknownCommand(String), + #[error("The specified key's length {0} exceeds the limit")] + KeyLength(usize), + #[error("Received no response")] + NoResponse, + #[error("Expected a different response for the executed command")] + InvalidCommandResponse, +} diff --git a/src/handler.rs b/src/handler.rs new file mode 100644 index 0000000..d791415 --- /dev/null +++ b/src/handler.rs @@ -0,0 +1,25 @@ +use crate::{Result, connection::Connection, database::Database}; + +#[derive(Debug)] +pub struct Handler { + db: Database, + connection: Connection, +} + +impl Handler { + pub fn new(db: Database, connection: Connection) -> Self { + Self { db, connection } + } + + pub async fn run(&mut self) -> Result<()> { + while let Ok(command) = self.connection.read_command().await { + let Some(command) = command else { + break; + }; + + command.execute(&self.db, &mut self.connection).await?; + } + + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..131aaf3 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,44 @@ +use bytes::Bytes; +use client::Client; +use database::Value; +use errors::AppError; +use server::Server; + +pub mod client; +pub mod command; +pub mod connection; +pub mod database; +pub mod errors; +pub mod handler; +pub mod server; + +pub type Result = std::result::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let mut server = Server::new("127.0.0.1:6171").await?; + + // Testing + for i in 0..256 { + tokio::spawn(client(format!("client-{}", i + 1))); + } + + server.run().await?; + + Ok(()) +} + +// Test stuff +async fn client(v: String) -> Result<()> { + let mut client = Client::new("127.0.0.1:6171").await?; + + client + .set(&v, Value::from_string(format!("{v}'s value"))) + .await?; + assert_eq!( + client.get(&v).await.unwrap().unwrap(), + Bytes::from(format!("{v}'s value")) + ); + + Ok(()) +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..98c448b --- /dev/null +++ b/src/server.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; + +use tokio::net::ToSocketAddrs; +use tokio::{net::TcpListener, sync::Semaphore}; + +use crate::Result; +use crate::connection::Connection; +use crate::database::Database; +use crate::handler::Handler; + +#[derive(Debug)] +pub struct Server { + db: Database, + listener: TcpListener, + connection_limit: Arc, +} + +impl Server { + const MAX_CONNECTIONS: usize = 256; + + pub async fn new(addr: ToSocketAddrs) -> Result { + let listener = TcpListener::bind(addr).await?; + + Ok(Self { + db: Database::new(), + connection_limit: Arc::new(Semaphore::const_new(Self::MAX_CONNECTIONS)), + listener, + }) + } + + pub async fn run(&mut self) -> Result<()> { + loop { + let permit = Arc::clone(&self.connection_limit) + .acquire_owned() + .await + .unwrap(); + + let socket = self.listener.accept().await?.0; + + let connection = Connection::new(socket); + let mut handler = Handler::new(self.db.clone(), connection); + + tokio::spawn(async move { + if let Err(e) = handler.run().await { + println!("Handler::run error: {e:?}"); + } + + drop(permit); + }); + } + } +}