From 34818ce0507108362c9775c2233d6e4338de5196 Mon Sep 17 00:00:00 2001 From: 409 <409dev@protonmail.com> Date: Thu, 12 Jun 2025 19:04:52 +0200 Subject: [PATCH] refactor commands --- src/command.rs | 139 ----------------------------------------- src/commands/delete.rs | 42 +++++++++++++ src/commands/get.rs | 44 +++++++++++++ src/commands/has.rs | 34 ++++++++++ src/commands/mod.rs | 64 +++++++++++++++++++ src/commands/set.rs | 49 +++++++++++++++ src/connection.rs | 2 +- src/main.rs | 7 +-- 8 files changed, 236 insertions(+), 145 deletions(-) delete mode 100644 src/command.rs create mode 100644 src/commands/delete.rs create mode 100644 src/commands/get.rs create mode 100644 src/commands/has.rs create mode 100644 src/commands/mod.rs create mode 100644 src/commands/set.rs diff --git a/src/command.rs b/src/command.rs deleted file mode 100644 index bd65632..0000000 --- a/src/command.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::io::{Cursor, Read}; - -use bytes::{Buf, BufMut, Bytes, BytesMut}; - -use crate::{ - Result, - connection::Connection, - database::{Database, Value}, - errors::AppError, -}; - -#[derive(Debug)] -pub enum Command { - Get { key: String }, - Set { key: String, value: Value }, - Delete { key: String }, - Has { key: String }, -} - -impl Command { - pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { - match self { - Command::Get { ref key } => { - let value = db.get(&key).await; - - let mut buf = BytesMut::new(); - match value { - Some(v) => { - buf.put_u8(1); - buf.put_u32(v.len() as u32); - buf.put_slice(&v); - } - None => { - buf.put_u8(0); - } - } - - connection.write(buf.into()).await?; - } - Command::Set { key, value } => { - db.set(key.clone(), value.clone()).await?; - - connection.write(Bytes::from_static(&[1])).await?; - } - Command::Delete { ref key } => { - let value = db.delete(key).await; - - let mut buf = BytesMut::new(); - match value { - Some(v) => { - buf.put_u8(1); - buf.put_u32(v.len() as u32); - buf.put_slice(&v); - } - None => buf.put_u8(0), - } - - connection.write(buf.into()).await?; - } - Command::Has { ref key } => { - let value = db.has(key).await; - - let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]); - - connection.write(buf.into()).await?; - } - } - - Ok(()) - } - - pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> { - let mut buffer = Cursor::new(&bytes[..]); - - let name = read_string(&mut buffer)?; - - Self::parse_inner(name, &mut buffer) - } - - fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> { - let command = match command_name.as_str() { - "get" => { - let key = read_string(bytes)?; - - Self::Get { key } - } - "set" => { - let key = read_string(bytes)?; - let data = read_bytes(bytes)?; - - Self::Set { - key, - value: Value::new(data), - } - } - "delete" => { - let key = read_string(bytes)?; - - Self::Delete { key } - } - "has" => { - let key = read_string(bytes)?; - - Self::Has { key } - } - _ => return Err(AppError::UnknownCommand(command_name)), - }; - - Ok((command, bytes.position())) - } -} - -fn read_string(buffer: &mut Cursor<&[u8]>) -> Result { - let length = buffer.try_get_u16()? as usize; - - if buffer.remaining() < length { - return Err(AppError::IncompleteCommandBuffer); - } - - let mut contents = Vec::with_capacity(length); - - for _ in 0..length { - contents.push(buffer.try_get_u8()?); - } - - let string = String::from_utf8(contents)?; - - Ok(string) -} - -fn read_bytes(buffer: &mut Cursor<&[u8]>) -> Result { - let len = buffer.try_get_u32()? as usize; - - if buffer.remaining() < len { - return Err(AppError::IncompleteCommandBuffer); - } - - Ok(buffer.copy_to_bytes(len)) -} diff --git a/src/commands/delete.rs b/src/commands/delete.rs new file mode 100644 index 0000000..9c7b269 --- /dev/null +++ b/src/commands/delete.rs @@ -0,0 +1,42 @@ +use std::io::Cursor; + +use bytes::{Buf as _, BufMut as _, BytesMut}; + +use crate::{Result, connection::Connection, database::Database, errors::AppError}; + +#[derive(Debug, Clone)] +pub struct Delete { + key: String, +} + +impl Delete { + pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { + let value = db.delete(&self.key).await; + + let mut buf = BytesMut::new(); + match value { + Some(v) => { + buf.put_u8(1); + buf.put_u32(v.len() as u32); + buf.put_slice(&v); + } + None => buf.put_u8(0), + } + + connection.write(buf.into()).await?; + + Ok(()) + } + + pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result { + let key_length = bytes.try_get_u16()? as usize; + + if bytes.remaining() < key_length { + return Err(AppError::IncompleteCommandBuffer); + } + + let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?; + + Ok(Self { key }) + } +} diff --git a/src/commands/get.rs b/src/commands/get.rs new file mode 100644 index 0000000..e2dcead --- /dev/null +++ b/src/commands/get.rs @@ -0,0 +1,44 @@ +use std::io::Cursor; + +use bytes::{Buf as _, BufMut as _, BytesMut}; + +use crate::{Result, connection::Connection, database::Database, errors::AppError}; + +#[derive(Debug, Clone)] +pub struct Get { + key: String, +} + +impl Get { + pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { + let value = db.get(&self.key).await; + + let mut buf = BytesMut::new(); + match value { + Some(v) => { + buf.put_u8(1); + buf.put_u32(v.len() as u32); + buf.put_slice(&v); + } + None => { + buf.put_u8(0); + } + } + + connection.write(buf.into()).await?; + + Ok(()) + } + + pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result { + let key_length = bytes.try_get_u16()? as usize; + + if bytes.remaining() < key_length { + return Err(AppError::IncompleteCommandBuffer); + } + + let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?; + + Ok(Self { key }) + } +} diff --git a/src/commands/has.rs b/src/commands/has.rs new file mode 100644 index 0000000..e2e442b --- /dev/null +++ b/src/commands/has.rs @@ -0,0 +1,34 @@ +use std::io::Cursor; + +use bytes::{Buf as _, Bytes}; + +use crate::{Result, connection::Connection, database::Database, errors::AppError}; + +#[derive(Debug, Clone)] +pub struct Has { + key: String, +} + +impl Has { + pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { + let value = db.has(&self.key).await; + + let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]); + + connection.write(buf.into()).await?; + + Ok(()) + } + + pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result { + let key_length = bytes.try_get_u16()? as usize; + + if bytes.remaining() < key_length { + return Err(AppError::IncompleteCommandBuffer); + } + + let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?; + + Ok(Self { key }) + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs new file mode 100644 index 0000000..5552ffa --- /dev/null +++ b/src/commands/mod.rs @@ -0,0 +1,64 @@ +pub mod delete; +mod get; +pub mod has; +pub mod set; + +use std::io::Cursor; + +use bytes::{Buf, BytesMut}; +use delete::Delete; +use get::Get; +use has::Has; +use set::Set; + +use crate::{ + Result, + connection::Connection, + database::{Database, Value}, + errors::AppError, +}; + +#[derive(Debug)] +pub enum Command { + Get(Get), + Set(Set), + Delete(Delete), + Has(Has), +} + +impl Command { + pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { + match self { + Command::Get(get) => get.execute(db, connection).await, + Command::Set(set) => set.execute(db, connection).await, + Command::Delete(delete) => delete.execute(db, connection).await, + Command::Has(has) => has.execute(db, connection).await, + } + } + + pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> { + let mut buffer = Cursor::new(&bytes[..]); + + let name_length = buffer.try_get_u16()? as usize; + + if buffer.remaining() < name_length { + return Err(AppError::IncompleteCommandBuffer); + } + + let name = String::from_utf8(buffer.copy_to_bytes(name_length).to_vec())?; + + Self::parse_inner(name, &mut buffer) + } + + fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> { + let command = match command_name.to_lowercase().as_str() { + "get" => Self::Get(Get::parse(bytes)?), + "set" => Self::Set(Set::parse(bytes)?), + "delete" => Self::Delete(Delete::parse(bytes)?), + "has" => Self::Has(Has::parse(bytes)?), + _ => return Err(AppError::UnknownCommand(command_name)), + }; + + Ok((command, bytes.position())) + } +} diff --git a/src/commands/set.rs b/src/commands/set.rs new file mode 100644 index 0000000..a2304a6 --- /dev/null +++ b/src/commands/set.rs @@ -0,0 +1,49 @@ +use std::io::Cursor; + +use bytes::{Buf as _, Bytes}; + +use crate::{ + Result, + connection::Connection, + database::{Database, Value}, + errors::AppError, +}; + +#[derive(Debug, Clone)] +pub struct Set { + key: String, + value: Value, +} + +impl Set { + pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { + db.set(self.key, self.value).await?; + + connection.write(Bytes::from_static(&[1])).await?; + + Ok(()) + } + + pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result { + let key_length = bytes.try_get_u16()? as usize; + + if bytes.remaining() < key_length { + return Err(AppError::IncompleteCommandBuffer); + } + + let key = String::from_utf8(bytes.copy_to_bytes(key_length).to_vec())?; + + let value_length = bytes.try_get_u32()? as usize; + + if bytes.remaining() < value_length { + return Err(AppError::IncompleteCommandBuffer); + } + + let data = bytes.copy_to_bytes(value_length); + + Ok(Self { + key, + value: Value::new(data), + }) + } +} diff --git a/src/connection.rs b/src/connection.rs index 792c598..d902460 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -4,7 +4,7 @@ use tokio::{ net::TcpStream, }; -use crate::{Result, command::Command}; +use crate::{Result, commands::Command}; #[derive(Debug)] pub struct Connection { diff --git a/src/main.rs b/src/main.rs index 131aaf3..5b5de04 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use errors::AppError; use server::Server; pub mod client; -pub mod command; +pub mod commands; pub mod connection; pub mod database; pub mod errors; @@ -18,10 +18,7 @@ pub type Result = std::result::Result; async fn main() -> Result<()> { let mut server = Server::new("127.0.0.1:6171").await?; - // Testing - for i in 0..256 { - tokio::spawn(client(format!("client-{}", i + 1))); - } + tokio::spawn(client("client-1".into())); server.run().await?;