refactor option write / reads

This commit is contained in:
2025-07-01 16:42:30 +02:00
parent 02aaef1560
commit 87eb32eb5d
9 changed files with 128 additions and 84 deletions

View File

@@ -1,25 +1,86 @@
use bytes::{Buf, Bytes}; use bytes::{Buf, BufMut, Bytes};
use crate::{Result, errors::AppError}; use crate::{Result, errors::AppError};
pub fn try_get_string<B: Buf>(buf: &mut B) -> Result<String> { pub trait ArchiveBuf<B: Buf> {
let len = buf.try_get_u16()? as usize; fn try_get_string(&mut self) -> Result<String>;
fn try_get_bytes(&mut self) -> Result<Bytes>;
if buf.remaining() <= len { fn try_get_bool(&mut self) -> Result<bool>;
return Err(AppError::IncompleteBuffer); fn try_get_option<T, F, E>(&mut self, f: F) -> Result<Option<T>>
} where
T: Sized,
Ok(String::from_utf8(buf.copy_to_bytes(len).to_vec())?) F: FnOnce(&mut B) -> std::result::Result<T, E>,
AppError: From<E>;
} }
pub fn try_get_bytes<B: Buf>(buf: &mut B) -> Result<Bytes> { pub trait ArchiveBufMut<B: Buf> {
let len = buf.try_get_u32()? as usize; fn put_bytes_with_length<T: AsRef<[u8]>>(&mut self, bytes: T);
fn put_option<T, F>(&mut self, value: Option<T>, f: F)
where
T: Sized,
F: FnOnce(&mut B, T);
}
if buf.remaining() < len { impl<B: Buf> ArchiveBuf<B> for B {
return Err(AppError::IncompleteBuffer); fn try_get_string(&mut self) -> Result<String> {
let len = self.try_get_u16()? as usize;
if self.remaining() < len {
return Err(AppError::IncompleteBuffer);
}
Ok(String::from_utf8(self.copy_to_bytes(len).to_vec())?)
} }
let data = buf.copy_to_bytes(len); fn try_get_bytes(&mut self) -> Result<Bytes> {
let len = self.try_get_u32()? as usize;
Ok(data) if self.remaining() < len {
return Err(AppError::IncompleteBuffer);
}
let data = self.copy_to_bytes(len);
Ok(data)
}
fn try_get_bool(&mut self) -> Result<bool> {
Ok(self.try_get_u8()? == 1)
}
fn try_get_option<T, F, E>(&mut self, f: F) -> Result<Option<T>>
where
T: Sized,
F: FnOnce(&mut B) -> std::result::Result<T, E>,
AppError: From<E>,
{
if !self.try_get_bool()? {
return Ok(None);
}
Ok(Some(f(self)?))
}
}
impl<B: Buf + BufMut> ArchiveBufMut<B> for B {
fn put_bytes_with_length<T: AsRef<[u8]>>(&mut self, bytes: T) {
let bytes = bytes.as_ref();
self.put_u32(bytes.len() as u32);
self.put_slice(bytes);
}
fn put_option<T, F>(&mut self, value: Option<T>, f: F)
where
T: Sized,
F: FnOnce(&mut B, T),
{
let Some(value) = value else {
self.put_u8(0);
return;
};
self.put_u8(1);
f(self, value);
}
} }

View File

@@ -1,8 +1,13 @@
use std::io::Cursor; use std::io::Cursor;
use bytes::{BufMut as _, BytesMut}; use bytes::BytesMut;
use crate::{Result, buffer::try_get_string, connection::Connection, database::Database}; use crate::{
Result,
buffer::{ArchiveBuf as _, ArchiveBufMut},
connection::Connection,
database::Database,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Delete { pub struct Delete {
@@ -14,22 +19,15 @@ impl Delete {
let value = db.delete(&self.key).await; let value = db.delete(&self.key).await;
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
match value { buf.put_option(value, ArchiveBufMut::put_bytes_with_length);
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => buf.put_u8(0),
}
connection.write(buf.into()).await?; connection.write(buf.into()).await?;
Ok(()) Ok(())
} }
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> { pub fn parse(buf: &mut Cursor<&[u8]>) -> Result<Self> {
let key = try_get_string(bytes)?; let key = buf.try_get_string()?;
Ok(Self { key }) Ok(Self { key })
} }

View File

@@ -2,7 +2,7 @@ use std::io::Cursor;
use bytes::{Buf as _, Bytes}; use bytes::{Buf as _, Bytes};
use crate::{Result, buffer::try_get_string, connection::Connection, database::Database}; use crate::{Result, buffer::ArchiveBuf as _, connection::Connection, database::Database};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Expire { pub struct Expire {
@@ -12,18 +12,18 @@ pub struct Expire {
impl Expire { impl Expire {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let value = db.expire(&self.key, self.seconds).await?; let success = db.expire(&self.key, self.seconds).await?;
connection connection
.write(Bytes::from_static(if value { &[1] } else { &[0] })) .write(Bytes::from_static(if success { &[1] } else { &[0] }))
.await?; .await?;
Ok(()) Ok(())
} }
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> { pub fn parse(buf: &mut Cursor<&[u8]>) -> Result<Self> {
let key = try_get_string(bytes)?; let key = buf.try_get_string()?;
let seconds = bytes.try_get_u64()?; let seconds = buf.try_get_u64()?;
Ok(Self { key, seconds }) Ok(Self { key, seconds })
} }

View File

@@ -1,8 +1,13 @@
use std::io::Cursor; use std::io::Cursor;
use bytes::{BufMut as _, BytesMut}; use bytes::BytesMut;
use crate::{Result, buffer::try_get_string, connection::Connection, database::Database}; use crate::{
Result,
buffer::{ArchiveBuf as _, ArchiveBufMut},
connection::Connection,
database::Database,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Get { pub struct Get {
@@ -14,24 +19,15 @@ impl Get {
let value = db.get(&self.key).await; let value = db.get(&self.key).await;
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
match value { buf.put_option(value, ArchiveBufMut::put_bytes_with_length);
Some(v) => {
buf.put_u8(1);
buf.put_u32(v.len() as u32);
buf.put_slice(&v);
}
None => {
buf.put_u8(0);
}
}
connection.write(buf.into()).await?; connection.write(buf.into()).await?;
Ok(()) Ok(())
} }
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> { pub fn parse(buf: &mut Cursor<&[u8]>) -> Result<Self> {
let key = try_get_string(bytes)?; let key = buf.try_get_string()?;
Ok(Self { key }) Ok(Self { key })
} }

View File

@@ -2,7 +2,7 @@ use std::io::Cursor;
use bytes::Bytes; use bytes::Bytes;
use crate::{Result, buffer::try_get_string, connection::Connection, database::Database}; use crate::{Result, buffer::ArchiveBuf as _, connection::Connection, database::Database};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Has { pub struct Has {
@@ -20,8 +20,8 @@ impl Has {
Ok(()) Ok(())
} }
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> { pub fn parse(buf: &mut Cursor<&[u8]>) -> Result<Self> {
let key = try_get_string(bytes)?; let key = buf.try_get_string()?;
Ok(Self { key }) Ok(Self { key })
} }

View File

@@ -18,7 +18,7 @@ use set::Set;
use ttl::Ttl; use ttl::Ttl;
use crate::{ use crate::{
Result, buffer::try_get_string, connection::Connection, database::Database, errors::AppError, Result, buffer::ArchiveBuf as _, connection::Connection, database::Database, errors::AppError,
}; };
#[derive(Debug)] #[derive(Debug)]
@@ -46,11 +46,12 @@ impl Command {
} }
pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> { pub fn parse(bytes: &BytesMut) -> Result<(Self, u64)> {
let mut buffer = Cursor::new(&bytes[..]); let mut buf = Cursor::new(&bytes[..]);
let name = try_get_string(&mut buffer)?; let name = buf.try_get_string()?;
println!("Command name: {name}, buf: {buf:?}");
Self::parse_inner(name, &mut buffer) Self::parse_inner(name, &mut buf)
} }
fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> { fn parse_inner(command_name: String, bytes: &mut Cursor<&[u8]>) -> Result<(Self, u64)> {

View File

@@ -2,7 +2,7 @@ use std::io::Cursor;
use bytes::Bytes; use bytes::Bytes;
use crate::{Result, buffer::try_get_string, connection::Connection, database::Database}; use crate::{Result, buffer::ArchiveBuf as _, connection::Connection, database::Database};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Persist { pub struct Persist {
@@ -20,8 +20,8 @@ impl Persist {
Ok(()) Ok(())
} }
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> { pub fn parse(buf: &mut Cursor<&[u8]>) -> Result<Self> {
let key = try_get_string(bytes)?; let key = buf.try_get_string()?;
Ok(Self { key }) Ok(Self { key })
} }

View File

@@ -1,12 +1,6 @@
use std::io::Cursor; use std::io::Cursor;
use crate::{ use crate::{Result, buffer::ArchiveBuf as _, connection::Connection, database::Database};
Result,
buffer::{try_get_bytes, try_get_string},
connection::Connection,
database::Database,
errors::AppError,
};
use bytes::{Buf as _, Bytes}; use bytes::{Buf as _, Bytes};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -25,16 +19,12 @@ impl Set {
Ok(()) Ok(())
} }
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> { pub fn parse(buf: &mut Cursor<&[u8]>) -> Result<Self> {
let key = try_get_string(bytes)?; let key = buf.try_get_string()?;
let data = try_get_bytes(bytes)?; let data = buf.try_get_bytes()?;
let expiration: Option<u64> = match bytes.try_get_u8()? { let expiration = buf.try_get_option(Cursor::try_get_u64)?;
1 => Some(bytes.try_get_u64()?),
0 => None,
_ => return Err(AppError::UnexpectedCommandData),
};
Ok(Self { Ok(Self {
key, key,

View File

@@ -1,8 +1,13 @@
use std::io::Cursor; use std::io::Cursor;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, BytesMut};
use crate::{Result, buffer::try_get_string, connection::Connection, database::Database}; use crate::{
Result,
buffer::{ArchiveBuf as _, ArchiveBufMut as _},
connection::Connection,
database::Database,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Ttl { pub struct Ttl {
@@ -13,23 +18,16 @@ impl Ttl {
pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> { pub async fn execute(self, db: &Database, connection: &mut Connection) -> Result<()> {
let ttl = db.ttl(&self.key).await; let ttl = db.ttl(&self.key).await;
let Some(ttl) = ttl else {
connection.write(Bytes::from_static(&[0])).await?;
return Ok(());
};
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
buf.put_option(ttl, BytesMut::put_u64);
buf.put_u8(1);
buf.put_u64(ttl);
connection.write(buf.into()).await?; connection.write(buf.into()).await?;
Ok(()) Ok(())
} }
pub fn parse(bytes: &mut Cursor<&[u8]>) -> Result<Self> { pub fn parse(buf: &mut Cursor<&[u8]>) -> Result<Self> {
let key = try_get_string(bytes)?; let key = buf.try_get_string()?;
Ok(Self { key }) Ok(Self { key })
} }