use std::sync::Arc; use tokio::net::ToSocketAddrs; use tokio::sync::oneshot; use tokio::task::JoinHandle; use tokio::{net::TcpListener, sync::Semaphore}; use crate::Result; use crate::config::ServerConfig; use crate::connection::Connection; use crate::database::{Database, key_expiration_manager}; use crate::handler::Handler; #[derive(Debug)] pub struct Server { db: Database, listener: TcpListener, connection_limit: Arc, expiration_manager_handle: JoinHandle<()>, } impl Server { pub async fn new(config: &ServerConfig) -> Result { let addr = format!("{}:{}", config.host, config.port); Self::_new(addr, config.max_connections).await } async fn _new(addr: Addr, max_connections: usize) -> Result { let listener = TcpListener::bind(addr).await?; let db = Database::new(); let expiration_manager_handle = tokio::spawn(key_expiration_manager(db.clone())); Ok(Self { db, connection_limit: Arc::new(Semaphore::const_new(max_connections)), listener, expiration_manager_handle, }) } pub async fn run(&mut self, mut shutdown: oneshot::Receiver<()>) -> Result<()> { let shutdown = &mut shutdown; loop { let permit = Arc::clone(&self.connection_limit) .acquire_owned() .await .unwrap(); let Some(socket) = ({ tokio::select! { socket = self.listener.accept() => Some(socket?.0), _ = &mut *shutdown => None, } }) else { log::info!("Shutting down"); self.db.shutdown().await; let _ = (&mut self.expiration_manager_handle).await; return Ok(()); }; let addr = socket.peer_addr()?; let connection = Connection::new(socket); let mut handler = Handler::new(self.db.clone(), connection); tokio::spawn(async move { log::debug!("Spawned a new connection handler: {addr}"); if let Err(e) = handler.run().await { log::debug!("Handler::run error: {e:?}"); } log::debug!("Connection handler ended: {addr}"); drop(permit); }); } } }