forked from 409/chat-app
websockets + send messages as json
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -338,6 +338,7 @@ dependencies = [
|
|||||||
"axum_typed_multipart",
|
"axum_typed_multipart",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"validify",
|
"validify",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -12,3 +12,4 @@ tokio = { version = "1.45.1", features = ["full"] }
|
|||||||
axum-valid = { path = "../axum-valid", features = ["basic", "typed_multipart", "validify"], default-features = false }
|
axum-valid = { path = "../axum-valid", features = ["basic", "typed_multipart", "validify"], default-features = false }
|
||||||
validify = "2.0.0"
|
validify = "2.0.0"
|
||||||
axum_typed_multipart = "0.16.2"
|
axum_typed_multipart = "0.16.2"
|
||||||
|
serde_json = "1.0.140"
|
||||||
|
|||||||
12
src/main.rs
12
src/main.rs
@@ -1,19 +1,19 @@
|
|||||||
|
mod message;
|
||||||
mod send_message_handler;
|
mod send_message_handler;
|
||||||
|
mod state;
|
||||||
mod websockets;
|
mod websockets;
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
Router,
|
Router,
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
};
|
};
|
||||||
use tokio::{net::TcpListener, sync::Mutex};
|
use state::AppState;
|
||||||
|
use tokio::{net::TcpListener, sync::broadcast::channel};
|
||||||
pub type MyState = Arc<Mutex<Vec<String>>>;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let state = MyState::default();
|
let (sender, _) = channel(32);
|
||||||
|
let state = AppState::new(sender);
|
||||||
|
|
||||||
let router: Router = Router::new()
|
let router: Router = Router::new()
|
||||||
.route("/message", post(send_message_handler::handler))
|
.route("/message", post(send_message_handler::handler))
|
||||||
|
|||||||
14
src/message.rs
Normal file
14
src/message.rs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ChatMessage {
|
||||||
|
pub sender_id: u32,
|
||||||
|
pub content: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChatMessage {
|
||||||
|
pub fn new(sender_id: u32, content: String) -> Self {
|
||||||
|
Self { sender_id, content }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,25 +3,26 @@ use axum_typed_multipart::{TryFromMultipart, TypedMultipart};
|
|||||||
use axum_valid::ValidifiedByRef;
|
use axum_valid::ValidifiedByRef;
|
||||||
use validify::Validify;
|
use validify::Validify;
|
||||||
|
|
||||||
use crate::MyState;
|
use crate::{message::ChatMessage, state::AppState};
|
||||||
|
|
||||||
#[derive(Validify, TryFromMultipart)]
|
#[derive(Validify, TryFromMultipart)]
|
||||||
|
#[try_from_multipart(rename_all = "kebab-case")]
|
||||||
pub struct SendMessageData {
|
pub struct SendMessageData {
|
||||||
|
client_id: u32,
|
||||||
#[modify(trim)]
|
#[modify(trim)]
|
||||||
content: String,
|
content: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handler(
|
pub async fn handler(
|
||||||
State(state): State<MyState>,
|
State(state): State<AppState>,
|
||||||
ValidifiedByRef(TypedMultipart(data)): ValidifiedByRef<TypedMultipart<SendMessageData>>,
|
ValidifiedByRef(TypedMultipart(data)): ValidifiedByRef<TypedMultipart<SendMessageData>>,
|
||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
let mut messages = state.lock().await;
|
let mut messages = state.messages.lock().await;
|
||||||
|
|
||||||
// println!("{}", &data.content);
|
let message = ChatMessage::new(data.client_id, data.content);
|
||||||
|
messages.push(message.clone());
|
||||||
|
|
||||||
messages.push(data.content);
|
let _ = state.broadcast_sender.send(message);
|
||||||
|
|
||||||
dbg!(&messages);
|
|
||||||
|
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
}
|
}
|
||||||
|
|||||||
22
src/state.rs
Normal file
22
src/state.rs
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::{Mutex, broadcast::Sender};
|
||||||
|
|
||||||
|
use crate::message::ChatMessage;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct AppState {
|
||||||
|
pub messages: Arc<Mutex<Vec<ChatMessage>>>,
|
||||||
|
pub next_client_id: Arc<Mutex<u32>>,
|
||||||
|
pub broadcast_sender: Sender<ChatMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppState {
|
||||||
|
pub fn new(websocket_sender: Sender<ChatMessage>) -> Self {
|
||||||
|
Self {
|
||||||
|
messages: Arc::default(),
|
||||||
|
next_client_id: Arc::default(),
|
||||||
|
broadcast_sender: websocket_sender,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,8 +1,6 @@
|
|||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{
|
extract::{
|
||||||
WebSocketUpgrade,
|
State, WebSocketUpgrade,
|
||||||
ws::{Message, WebSocket},
|
ws::{Message, WebSocket},
|
||||||
},
|
},
|
||||||
response::Response,
|
response::Response,
|
||||||
@@ -12,22 +10,45 @@ use futures_util::{
|
|||||||
SinkExt as _, StreamExt,
|
SinkExt as _, StreamExt,
|
||||||
stream::{SplitSink, SplitStream},
|
stream::{SplitSink, SplitStream},
|
||||||
};
|
};
|
||||||
|
use tokio::sync::broadcast::Receiver;
|
||||||
|
|
||||||
pub async fn websocket_handler(upgrade: WebSocketUpgrade) -> Response {
|
use crate::{message::ChatMessage, state::AppState};
|
||||||
upgrade.on_upgrade(handler)
|
|
||||||
|
pub async fn websocket_handler(
|
||||||
|
upgrade: WebSocketUpgrade,
|
||||||
|
State(state): State<AppState>,
|
||||||
|
) -> Response {
|
||||||
|
upgrade.on_upgrade(move |websocket| handler(websocket, state))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler(socket: WebSocket) {
|
async fn handler(mut socket: WebSocket, state: AppState) {
|
||||||
|
let mut next_client_id = state.next_client_id.lock().await;
|
||||||
|
|
||||||
|
*next_client_id += 1;
|
||||||
|
let _ = socket.send((*next_client_id).to_string().into()).await;
|
||||||
|
|
||||||
|
drop(next_client_id);
|
||||||
|
|
||||||
let (sender, receiver) = socket.split();
|
let (sender, receiver) = socket.split();
|
||||||
|
|
||||||
|
let broadcast_receiver = state.broadcast_sender.subscribe();
|
||||||
|
|
||||||
tokio::spawn(receive(receiver));
|
tokio::spawn(receive(receiver));
|
||||||
tokio::spawn(send(sender));
|
tokio::spawn(send(sender, broadcast_receiver));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send(mut sender: SplitSink<WebSocket, Message>) {
|
async fn send(
|
||||||
loop {
|
mut sender: SplitSink<WebSocket, Message>,
|
||||||
let _ = sender.send("Hello client!".into());
|
mut broadcast_receiver: Receiver<ChatMessage>,
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
) {
|
||||||
|
while let Ok(message) = broadcast_receiver.recv().await {
|
||||||
|
let Ok(json_message) = serde_json::to_string(&message) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if sender.send(json_message.into()).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user