Files
archive/src/client.rs

189 lines
4.5 KiB
Rust

use bytes::{Buf, Bytes, BytesMut};
use tokio::net::{TcpStream, ToSocketAddrs};
use crate::{
Result,
buffer::ArchiveBuf,
commands::{
delete::Delete, expire::Expire, get::Get, has::Has, m_get::MGet, m_set::MSet,
persist::Persist, set::Set, ttl::Ttl,
},
connection::Connection,
errors::AppError,
};
pub struct Client {
connection: Connection,
}
impl Client {
pub async fn new<Addr: ToSocketAddrs>(addr: Addr) -> Result<Self> {
let socket = TcpStream::connect(addr).await?;
let connection = Connection::new(socket);
Ok(Self { connection })
}
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
let mut bytes = BytesMut::new();
let cmd = Get::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let value = r.try_get_option(ArchiveBuf::try_get_bytes)?;
Ok(value)
}
pub async fn set(
&mut self,
key: &str,
data: &[u8],
expiration_secs: Option<u64>,
) -> Result<()> {
let mut buf = BytesMut::new();
let cmd = Set::new(key.to_owned(), data.into(), expiration_secs);
cmd.put(&mut buf)?;
self.connection.write(buf.into()).await?;
let mut r = self.get_response().await?;
if !r.try_get_bool()? {
return Err(AppError::InvalidCommandResponse);
}
Ok(())
}
pub async fn delete(&mut self, key: &str) -> Result<Option<Bytes>> {
let mut bytes = BytesMut::new();
let cmd = Delete::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let value = r.try_get_option(ArchiveBuf::try_get_bytes)?;
Ok(value)
}
pub async fn has(&mut self, key: &str) -> Result<bool> {
let mut bytes = BytesMut::new();
let cmd = Has::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let has = r.try_get_bool()?;
Ok(has)
}
pub async fn ttl(&mut self, key: &str) -> Result<Option<u64>> {
let mut bytes = BytesMut::new();
let cmd = Ttl::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let ttl = r.try_get_option(Bytes::try_get_u64)?;
Ok(ttl)
}
pub async fn expire(&mut self, key: &str, seconds: u64) -> Result<bool> {
let mut bytes = BytesMut::new();
let cmd = Expire::new(key.to_owned(), seconds);
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
Ok(r.try_get_bool()?)
}
pub async fn persist(&mut self, key: &str) -> Result<bool> {
let mut bytes = BytesMut::new();
let cmd = Persist::new(key.to_owned());
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
Ok(r.try_get_bool()?)
}
pub async fn m_set(
&mut self,
keys: Vec<&str>,
data: Vec<&[u8]>,
expirations: Vec<Option<u64>>,
) -> Result<()> {
let mut bytes = BytesMut::new();
let len = keys.len().min(data.len()).min(expirations.len());
let mut sets = Vec::with_capacity(len);
for i in 0..len {
sets.push(Set::new(keys[i].to_owned(), data[i].into(), expirations[i]));
}
let cmd = MSet::new(sets);
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
if !r.try_get_bool()? {
return Err(AppError::InvalidCommandResponse);
}
Ok(())
}
pub async fn m_get(&mut self, keys: Vec<String>) -> Result<Vec<Option<Bytes>>> {
let mut bytes = BytesMut::new();
let gets: Vec<Get> = keys.into_iter().map(Get::new).collect();
let cmd = MGet::new(gets);
cmd.put(&mut bytes)?;
self.connection.write(bytes.into()).await?;
let mut r = self.get_response().await?;
let values = r.try_get_vec(|b| b.try_get_option(ArchiveBuf::try_get_bytes))?;
Ok(values)
}
async fn get_response(&mut self) -> Result<Bytes> {
self.connection
.read_bytes()
.await?
.ok_or(AppError::NoResponse)
}
}