mset / mget commands (cli currently only supports mget)

This commit is contained in:
2025-07-01 18:33:13 +02:00
parent cff5c37a40
commit c527bb0072
14 changed files with 448 additions and 43 deletions

View File

@@ -1,9 +1,13 @@
use bytes::{Buf, BufMut as _, Bytes, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use tokio::net::{TcpStream, ToSocketAddrs};
use crate::{
Result,
buffer::{ArchiveBuf, ArchiveBufMut as _},
buffer::ArchiveBuf,
commands::{
delete::Delete, expire::Expire, get::Get, has::Has, m_get::MGet, m_set::MSet,
persist::Persist, set::Set, ttl::Ttl,
},
connection::Connection,
errors::AppError,
};
@@ -24,8 +28,8 @@ impl Client {
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
let mut bytes = BytesMut::new();
bytes.put_short_string("get")?;
bytes.put_short_string(key)?;
let cmd = Get::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
@@ -42,16 +46,12 @@ impl Client {
data: &[u8],
expiration_secs: Option<u64>,
) -> Result<()> {
let mut bytes = BytesMut::new();
let mut buf = BytesMut::new();
bytes.put_short_string("set")?;
bytes.put_short_string(key)?;
let cmd = Set::new(key.to_owned(), data.into(), expiration_secs);
cmd.put(&mut buf)?;
bytes.put_bytes_with_length(data);
bytes.put_option(expiration_secs, BytesMut::put_u64);
self.connection.write(bytes.into()).await?;
self.connection.write(buf.into()).await?;
let mut r = self.get_response().await?;
@@ -65,8 +65,8 @@ impl Client {
pub async fn delete(&mut self, key: &str) -> Result<Option<Bytes>> {
let mut bytes = BytesMut::new();
bytes.put_short_string("delete")?;
bytes.put_short_string(key)?;
let cmd = Delete::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
@@ -80,8 +80,8 @@ impl Client {
pub async fn has(&mut self, key: &str) -> Result<bool> {
let mut bytes = BytesMut::new();
bytes.put_short_string("has")?;
bytes.put_short_string(key)?;
let cmd = Has::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
@@ -95,8 +95,8 @@ impl Client {
pub async fn ttl(&mut self, key: &str) -> Result<Option<u64>> {
let mut bytes = BytesMut::new();
bytes.put_short_string("ttl")?;
bytes.put_short_string(key)?;
let cmd = Ttl::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
@@ -110,10 +110,8 @@ impl Client {
pub async fn expire(&mut self, key: &str, seconds: u64) -> Result<bool> {
let mut bytes = BytesMut::new();
bytes.put_short_string("expire")?;
bytes.put_short_string(key)?;
bytes.put_u64(seconds);
let cmd = Expire::new(key.to_owned(), seconds);
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
@@ -124,8 +122,9 @@ impl Client {
pub async fn persist(&mut self, key: &str) -> Result<bool> {
let mut bytes = BytesMut::new();
bytes.put_short_string("persist")?;
bytes.put_short_string(key)?;
let cmd = Persist::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
@@ -134,6 +133,52 @@ impl Client {
Ok(r.try_get_bool()?)
}
pub async fn m_set(
&mut self,
keys: Vec<&str>,
data: Vec<&[u8]>,
expirations: Vec<Option<u64>>,
) -> Result<()> {
let mut bytes = BytesMut::new();
let len = keys.len().min(data.len()).min(expirations.len());
let mut sets = Vec::with_capacity(len);
for i in 0..len {
sets.push(Set::new(keys[i].to_owned(), data[i].into(), expirations[i]));
}
let cmd = MSet::new(sets);
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
if !r.try_get_bool()? {
return Err(AppError::InvalidCommandResponse);
}
Ok(())
}
pub async fn m_get(&mut self, keys: Vec<String>) -> Result<Vec<Option<Bytes>>> {
let mut bytes = BytesMut::new();
let gets: Vec<Get> = keys.into_iter().map(Get::new).collect();
let cmd = MGet::new(gets);
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let values = r.try_get_vec(|b| b.try_get_option(ArchiveBuf::try_get_bytes))?;
Ok(values)
}
async fn get_response(&mut self) -> Result<Bytes> {
self.connection
.read_bytes()