basic get / set / delete / has
This commit is contained in:
137
src/client.rs
Normal file
137
src/client.rs
Normal file
@@ -0,0 +1,137 @@
|
||||
use bytes::{Buf, BufMut as _, Bytes, BytesMut};
|
||||
use tokio::net::{TcpStream, ToSocketAddrs};
|
||||
|
||||
use crate::{Result, connection::Connection, database::Value, errors::AppError};
|
||||
|
||||
pub struct Client {
|
||||
connection: Connection,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new<Addr: ToSocketAddrs>(addr: Addr) -> Result<Self> {
|
||||
let socket = TcpStream::connect(addr).await?;
|
||||
|
||||
let connection = Connection::new(socket);
|
||||
|
||||
Ok(Self { connection })
|
||||
}
|
||||
|
||||
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
bytes.put_u16(3);
|
||||
bytes.put_slice(b"get");
|
||||
|
||||
let key_length: u16 = key
|
||||
.len()
|
||||
.try_into()
|
||||
.map_err(|_| AppError::KeyLength(key.len()))?;
|
||||
|
||||
bytes.put_u16(key_length);
|
||||
bytes.put_slice(key.as_bytes());
|
||||
|
||||
self.connection.write(bytes.into()).await?;
|
||||
|
||||
let mut r = self.get_response().await?;
|
||||
let response = match r.try_get_u8()? {
|
||||
0 => None,
|
||||
1 => {
|
||||
let len = r.try_get_u32()? as usize;
|
||||
|
||||
if r.remaining() < len {
|
||||
return Err(AppError::InvalidCommandResponse);
|
||||
}
|
||||
|
||||
Some(r.copy_to_bytes(len))
|
||||
}
|
||||
_ => return Err(AppError::InvalidCommandResponse),
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub async fn set(&mut self, key: &str, value: Value) -> Result<()> {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
bytes.put_u16(3);
|
||||
bytes.put_slice(b"set");
|
||||
|
||||
let key_length: u16 = key
|
||||
.len()
|
||||
.try_into()
|
||||
.map_err(|_| AppError::KeyLength(key.len()))?;
|
||||
|
||||
bytes.put_u16(key_length);
|
||||
bytes.put_slice(key.as_bytes());
|
||||
|
||||
value.write_to_bytes(&mut bytes);
|
||||
|
||||
self.connection.write(bytes.into()).await?;
|
||||
|
||||
let mut r = self.get_response().await?;
|
||||
|
||||
match r.try_get_u8()? {
|
||||
1 => return Ok(()),
|
||||
_ => return Err(AppError::InvalidCommandResponse),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete(&mut self, key: &str) -> Result<Option<Bytes>> {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
bytes.put_u16(6);
|
||||
bytes.put_slice(b"delete");
|
||||
|
||||
let key_length: u16 = key
|
||||
.len()
|
||||
.try_into()
|
||||
.map_err(|_| AppError::KeyLength(key.len()))?;
|
||||
|
||||
bytes.put_u16(key_length);
|
||||
bytes.put_slice(key.as_bytes());
|
||||
|
||||
self.connection.write(bytes.into()).await?;
|
||||
|
||||
let mut r = self.get_response().await?;
|
||||
|
||||
let response = match r.try_get_u8()? {
|
||||
1 => {
|
||||
let len = r.try_get_u32()?;
|
||||
let bytes = r.copy_to_bytes(len as usize);
|
||||
|
||||
Some(bytes)
|
||||
}
|
||||
0 => None,
|
||||
_ => return Err(AppError::InvalidCommandResponse),
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub async fn has(&mut self, key: &str) -> Result<bool> {
|
||||
let mut bytes = BytesMut::new();
|
||||
bytes.put_u16(3);
|
||||
bytes.put_slice(b"has");
|
||||
|
||||
let key_length: u16 = key
|
||||
.len()
|
||||
.try_into()
|
||||
.map_err(|_| AppError::KeyLength(key.len()))?;
|
||||
|
||||
bytes.put_u16(key_length);
|
||||
bytes.put_slice(key.as_bytes());
|
||||
|
||||
self.connection.write(bytes.into()).await?;
|
||||
|
||||
let mut r = self.get_response().await?;
|
||||
|
||||
Ok(r.try_get_u8()? == 1)
|
||||
}
|
||||
|
||||
async fn get_response(&mut self) -> Result<Bytes> {
|
||||
self.connection
|
||||
.read_bytes()
|
||||
.await?
|
||||
.ok_or(AppError::NoResponse)
|
||||
}
|
||||
}
|
||||
139
src/command.rs
Normal file
139
src/command.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use std::io::{Cursor, Read};
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
use crate::{
|
||||
Result,
|
||||
connection::Connection,
|
||||
database::{Database, Value},
|
||||
errors::AppError,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Command {
|
||||
Get { key: String },
|
||||
Set { key: String, value: Value },
|
||||
Delete { key: String },
|
||||
Has { key: String },
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
|
||||
match self {
|
||||
Command::Get { ref key } => {
|
||||
let value = db.get(&key).await;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
match value {
|
||||
Some(v) => {
|
||||
buf.put_u8(1);
|
||||
buf.put_u32(v.len() as u32);
|
||||
buf.put_slice(&v);
|
||||
}
|
||||
None => {
|
||||
buf.put_u8(0);
|
||||
}
|
||||
}
|
||||
|
||||
connection.write(buf.into()).await?;
|
||||
}
|
||||
Command::Set { key, value } => {
|
||||
db.set(key.clone(), value.clone()).await?;
|
||||
|
||||
connection.write(Bytes::from_static(&[1])).await?;
|
||||
}
|
||||
Command::Delete { ref key } => {
|
||||
let value = db.delete(key).await;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
match value {
|
||||
Some(v) => {
|
||||
buf.put_u8(1);
|
||||
buf.put_u32(v.len() as u32);
|
||||
buf.put_slice(&v);
|
||||
}
|
||||
None => buf.put_u8(0),
|
||||
}
|
||||
|
||||
connection.write(buf.into()).await?;
|
||||
}
|
||||
Command::Has { ref key } => {
|
||||
let value = db.has(key).await;
|
||||
|
||||
let buf = Bytes::copy_from_slice(&[if value { 1 } else { 0 }]);
|
||||
|
||||
connection.write(buf.into()).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> {
|
||||
let mut buffer = Cursor::new(&bytes[..]);
|
||||
|
||||
let name = read_string(&mut buffer)?;
|
||||
|
||||
Self::parse_inner(name, &mut buffer)
|
||||
}
|
||||
|
||||
fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> {
|
||||
let command = match command_name.as_str() {
|
||||
"get" => {
|
||||
let key = read_string(bytes)?;
|
||||
|
||||
Self::Get { key }
|
||||
}
|
||||
"set" => {
|
||||
let key = read_string(bytes)?;
|
||||
let data = read_bytes(bytes)?;
|
||||
|
||||
Self::Set {
|
||||
key,
|
||||
value: Value::new(data),
|
||||
}
|
||||
}
|
||||
"delete" => {
|
||||
let key = read_string(bytes)?;
|
||||
|
||||
Self::Delete { key }
|
||||
}
|
||||
"has" => {
|
||||
let key = read_string(bytes)?;
|
||||
|
||||
Self::Has { key }
|
||||
}
|
||||
_ => return Err(AppError::UnknownCommand(command_name)),
|
||||
};
|
||||
|
||||
Ok((command, bytes.position()))
|
||||
}
|
||||
}
|
||||
|
||||
fn read_string(buffer: &mut Cursor<&[u8]>) -> Result<String> {
|
||||
let length = buffer.try_get_u16()? as usize;
|
||||
|
||||
if buffer.remaining() < length {
|
||||
return Err(AppError::IncompleteCommandBuffer);
|
||||
}
|
||||
|
||||
let mut contents = Vec::with_capacity(length);
|
||||
|
||||
for _ in 0..length {
|
||||
contents.push(buffer.try_get_u8()?);
|
||||
}
|
||||
|
||||
let string = String::from_utf8(contents)?;
|
||||
|
||||
Ok(string)
|
||||
}
|
||||
|
||||
fn read_bytes(buffer: &mut Cursor<&[u8]>) -> Result<Bytes> {
|
||||
let len = buffer.try_get_u32()? as usize;
|
||||
|
||||
if buffer.remaining() < len {
|
||||
return Err(AppError::IncompleteCommandBuffer);
|
||||
}
|
||||
|
||||
Ok(buffer.copy_to_bytes(len))
|
||||
}
|
||||
51
src/connection.rs
Normal file
51
src/connection.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
|
||||
net::TcpStream,
|
||||
};
|
||||
|
||||
use crate::{Result, command::Command};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Connection {
|
||||
stream: BufWriter<TcpStream>,
|
||||
buffer: BytesMut,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(stream: TcpStream) -> Self {
|
||||
Self {
|
||||
stream: BufWriter::new(stream),
|
||||
buffer: BytesMut::with_capacity(4096),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_command(&mut self) -> Result<Option<Command>> {
|
||||
loop {
|
||||
if let Ok((command, length)) = Command::parse(&self.buffer) {
|
||||
self.buffer.advance(length as usize);
|
||||
|
||||
return Ok(Some(command));
|
||||
}
|
||||
|
||||
if self.stream.read_buf(&mut self.buffer).await? == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_bytes(&mut self) -> Result<Option<Bytes>> {
|
||||
if self.stream.read_buf(&mut self.buffer).await? == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
return Ok(Some(self.buffer.split().into()));
|
||||
}
|
||||
|
||||
pub async fn write(&mut self, mut bytes: Bytes) -> Result<()> {
|
||||
self.stream.write_buf(&mut bytes).await?;
|
||||
self.stream.flush().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
67
src/database.rs
Normal file
67
src/database.rs
Normal file
@@ -0,0 +1,67 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Database {
|
||||
entries: Arc<Mutex<HashMap<String, Value>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Value {
|
||||
data: Bytes,
|
||||
}
|
||||
|
||||
impl Value {
|
||||
pub fn new(data: Bytes) -> Self {
|
||||
Self { data }
|
||||
}
|
||||
|
||||
pub fn from_string(data: String) -> Self {
|
||||
Self {
|
||||
data: Bytes::from(data),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_to_bytes(&self, bytes: &mut BytesMut) {
|
||||
bytes.put_u32(self.data.len() as u32);
|
||||
bytes.put_slice(&self.data);
|
||||
}
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
entries: Arc::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get(&self, key: &str) -> Option<Bytes> {
|
||||
let entries = self.entries.lock().await;
|
||||
|
||||
entries.get(key).map(|v| v.data.clone())
|
||||
}
|
||||
|
||||
pub async fn set(&self, key: String, value: Value) -> Result<()> {
|
||||
let mut entries = self.entries.lock().await;
|
||||
|
||||
entries.insert(key, value);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(&self, key: &str) -> Option<Bytes> {
|
||||
let mut entries = self.entries.lock().await;
|
||||
|
||||
entries.remove(key).map(|v| v.data)
|
||||
}
|
||||
|
||||
pub async fn has(&self, key: &str) -> bool {
|
||||
let entries = self.entries.lock().await;
|
||||
|
||||
entries.contains_key(key)
|
||||
}
|
||||
}
|
||||
21
src/errors.rs
Normal file
21
src/errors.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum AppError {
|
||||
#[error("An IO error occurred")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("A TryGetError occurred")]
|
||||
TryGet(#[from] bytes::TryGetError),
|
||||
#[error("The buffer is missing data for a complete command")]
|
||||
IncompleteCommandBuffer,
|
||||
#[error("A Utf8Error occurred")]
|
||||
FromUtf8(#[from] std::string::FromUtf8Error),
|
||||
#[error("The command {0} was not recognized")]
|
||||
UnknownCommand(String),
|
||||
#[error("The specified key's length {0} exceeds the limit")]
|
||||
KeyLength(usize),
|
||||
#[error("Received no response")]
|
||||
NoResponse,
|
||||
#[error("Expected a different response for the executed command")]
|
||||
InvalidCommandResponse,
|
||||
}
|
||||
25
src/handler.rs
Normal file
25
src/handler.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
use crate::{Result, connection::Connection, database::Database};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Handler {
|
||||
db: Database,
|
||||
connection: Connection,
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
pub fn new(db: Database, connection: Connection) -> Self {
|
||||
Self { db, connection }
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
while let Ok(command) = self.connection.read_command().await {
|
||||
let Some(command) = command else {
|
||||
break;
|
||||
};
|
||||
|
||||
command.execute(&self.db, &mut self.connection).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
44
src/main.rs
Normal file
44
src/main.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use bytes::Bytes;
|
||||
use client::Client;
|
||||
use database::Value;
|
||||
use errors::AppError;
|
||||
use server::Server;
|
||||
|
||||
pub mod client;
|
||||
pub mod command;
|
||||
pub mod connection;
|
||||
pub mod database;
|
||||
pub mod errors;
|
||||
pub mod handler;
|
||||
pub mod server;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, AppError>;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let mut server = Server::new("127.0.0.1:6171").await?;
|
||||
|
||||
// Testing
|
||||
for i in 0..256 {
|
||||
tokio::spawn(client(format!("client-{}", i + 1)));
|
||||
}
|
||||
|
||||
server.run().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Test stuff
|
||||
async fn client(v: String) -> Result<()> {
|
||||
let mut client = Client::new("127.0.0.1:6171").await?;
|
||||
|
||||
client
|
||||
.set(&v, Value::from_string(format!("{v}'s value")))
|
||||
.await?;
|
||||
assert_eq!(
|
||||
client.get(&v).await.unwrap().unwrap(),
|
||||
Bytes::from(format!("{v}'s value"))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
52
src/server.rs
Normal file
52
src/server.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use tokio::{net::TcpListener, sync::Semaphore};
|
||||
|
||||
use crate::Result;
|
||||
use crate::connection::Connection;
|
||||
use crate::database::Database;
|
||||
use crate::handler::Handler;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Server {
|
||||
db: Database,
|
||||
listener: TcpListener,
|
||||
connection_limit: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
const MAX_CONNECTIONS: usize = 256;
|
||||
|
||||
pub async fn new<Addr: ToSocketAddrs>(addr: ToSocketAddrs) -> Result<Self> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
Ok(Self {
|
||||
db: Database::new(),
|
||||
connection_limit: Arc::new(Semaphore::const_new(Self::MAX_CONNECTIONS)),
|
||||
listener,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
loop {
|
||||
let permit = Arc::clone(&self.connection_limit)
|
||||
.acquire_owned()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let socket = self.listener.accept().await?.0;
|
||||
|
||||
let connection = Connection::new(socket);
|
||||
let mut handler = Handler::new(self.db.clone(), connection);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handler.run().await {
|
||||
println!("Handler::run error: {e:?}");
|
||||
}
|
||||
|
||||
drop(permit);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user