当前位置:首页 > 科技  > 软件

异步Rust:构建实时消息代理服务器

来源: 责编: 时间:2024-02-01 12:44:22 308观看
导读在本文中,我们将深入研究使用Rust构建实时消息代理服务器,展示其强大的并发特性。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务。此外,我们将创建一个WebSocket客户端来测试代理服务器的功能。设计图如下:图片

在本文中,我们将深入研究使用Rust构建实时消息代理服务器,展示其强大的并发特性。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务。此外,我们将创建一个WebSocket客户端来测试代理服务器的功能。1hY28资讯网——每日最新资讯28at.com

设计图如下:1hY28资讯网——每日最新资讯28at.com

图片图片1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

构建消息代理服务器

消息代理服务器允许客户端为主题生成事件并订阅它们。它使用Warp作为HTTP和WebSocket服务器,使用Tokio作为异步运行时。1hY28资讯网——每日最新资讯28at.com

使用以下命令创建一个Rust项目:1hY28资讯网——每日最新资讯28at.com

cargo new real-ime-message

在Cargo.toml文件中加入以下依赖项:1hY28资讯网——每日最新资讯28at.com

[dependencies]futures-util = "0.3.30"tokio = {version = "1.35.1", features = ["full"]}tokio-tungstenite = "0.21.0"url = "2.5.0"warp = "0.3.6"

在src/main.rs文件中定义一个Broker结构体:1hY28资讯网——每日最新资讯28at.com

use std::{    collections::{HashMap, VecDeque},    sync::Arc,};use futures_util::{SinkExt, StreamExt};use tokio::sync::{    mpsc::{self, UnboundedSender},    RwLock,};use warp::{filters::ws::Message, Filter};type Topic = String;type Event = String;type WsSender = UnboundedSender<warp::ws::Message>;struct Broker {    events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,    subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,}
  • events:存储每个主题的事件。
  • subscribers:跟踪每个主题的订阅者。

创建一个新的Broker实例:1hY28资讯网——每日最新资讯28at.com

impl Broker {    fn new() -> Self {        Broker {            events: Arc::new(RwLock::new(HashMap::new())),            subscribers: Arc::new(RwLock::new(HashMap::new())),        }    }}

定义发布事件的方法produce:1hY28资讯网——每日最新资讯28at.com

impl Broker {    ......    async fn produce(&self, topic: Topic, event: Event) {        let mut events = self.events.write().await;        events            .entry(topic.clone())            .or_default()            .push_back(event.clone());        // 异步通知所有订阅者        let subscribers_list;        {            let subscribers = self.subscribers.read().await;            subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();        }        for ws_sender in subscribers_list {            // 将事件发送到WebSocket客户端            let _ = ws_sender.send(warp::ws::Message::text(event.clone()));        }    }}

1hY28资讯网——每日最新资讯28at.com

这个方法主要是将事件添加到相应的主题,然后将新事件通知所有订阅者。1hY28资讯网——每日最新资讯28at.com

定义subscribe方法,来管理新的订阅:1hY28资讯网——每日最新资讯28at.com

impl Broker {    ......    pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {        let (ws_sender, mut ws_receiver) = socket.split();        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();        {            let mut subs = self.subscribers.write().await;            subs.entry(topic).or_default().push(tx);        }        tokio::task::spawn(async move {            while let Some(result) = ws_receiver.next().await {                match result {                    Ok(message) => {                        // 处理有效的消息                        if message.is_text() {                            println!(                                "Received message from client: {}",                                message.to_str().unwrap()                            );                        }                    }                    Err(e) => {                        // 处理错误                        eprintln!("WebSocket error: {:?}", e);                        break;                    }                }            }            println!("WebSocket connection closed");        });        tokio::task::spawn(async move {            let mut sender = ws_sender;            while let Some(msg) = rx.recv().await {                let _ = sender.send(msg).await;            }        });    }}

1hY28资讯网——每日最新资讯28at.com

这个方法主要是将WebSocket拆分为发送方和接收方,将订阅者添加到订阅者列表中,处理传入的WebSocket消息。1hY28资讯网——每日最新资讯28at.com

main函数代码如下:1hY28资讯网——每日最新资讯28at.com

#[tokio::main]async fn main() {    let broker = Arc::new(Broker::new());    let broker_clone1 = Arc::clone(&broker);    let broker_clone2 = Arc::clone(&broker);    let produce = warp::path!("produce" / String)        .and(warp::post())        .and(warp::body::json())        .and(warp::any().map(move || Arc::clone(&broker_clone1)))        .and_then(            move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move {                broker_clone2.produce(topic, event).await;                Ok::<_, warp::Rejection>(warp::reply())            },        );    let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(        move |topic: String, ws: warp::ws::Ws| {            let broker_clone3 = Arc::clone(&broker_clone2);            ws.on_upgrade(move |socket| async move {                broker_clone3.subscribe(topic.clone(), socket).await;            })        },    );    let routes = produce.or(subscribe);    println!("Broker server running at http://127.0.0.1:3030");    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;}

1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

实现WebSocket客户端

WebSocket客户端将模拟一个订阅主题和接收消息的真实用户。1hY28资讯网——每日最新资讯28at.com

在src/bin目录下,创建一个ws_cli.rs文件。在文件中定义websocket_client函数,建立WebSocket连接并管理消息:1hY28资讯网——每日最新资讯28at.com

use futures_util::{sink::SinkExt, stream::StreamExt};use std::sync::Arc;use tokio::sync::RwLock;use tokio::time::{sleep, Duration};use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};use url::Url;async fn websocket_client(topic_url: &str) {    // 解析要连接WebSocket服务器的URL    let url = Url::parse(topic_url).expect("Invalid URL");    // 连接到WebSocket服务器    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");    println!("WebSocket client connected");    let (mut write, mut read) = ws_stream.split();    let message = Arc::new(RwLock::new(String::new()));    let message_1 = message.clone();    // 生成一个任务来处理传入的消息    tokio::spawn(async move {        let msg_lock = message_1.clone();        while let Some(message) = read.next().await {            match message {                Ok(msg) => {                    let mut ms = msg_lock.write().await;                    *ms = msg.to_text().unwrap().to_string();                    println!("Received message: {}", msg.to_text().unwrap());                }                Err(e) => {                    eprintln!("Error receiving message: {:?}", e);                    break;                }            }        }    });    // 发送消息    loop {        let msg_lock = message.clone();        let ms = msg_lock.read().await;        if let Err(e) = write.send(Message::Text(ms.to_string())).await {            eprintln!("Error sending message: {:?}", e);            break;        }        sleep(Duration::from_secs(5)).await;    }}

1hY28资讯网——每日最新资讯28at.com

main函数代码如下:1hY28资讯网——每日最新资讯28at.com

#[tokio::main]async fn main() {    websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await;}

1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

测试

执行如下命令运行消息代理服务器:1hY28资讯网——每日最新资讯28at.com

cargo run --bin real-ime-message

1hY28资讯网——每日最新资讯28at.com

执行结果:1hY28资讯网——每日最新资讯28at.com

Broker server running at http://127.0.0.1:3030

1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

然后打开一个新的命令行,执行如下命令运行WebSocket客户端:1hY28资讯网——每日最新资讯28at.com

cargo run --bin ws_cli

1hY28资讯网——每日最新资讯28at.com

执行结果:1hY28资讯网——每日最新资讯28at.com

WebSocket client connected

1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

向http://127.0.0.1:3030/produce/newtopic接口发送post请求,如图:1hY28资讯网——每日最新资讯28at.com

图片图片1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

客户端接收到消息:1hY28资讯网——每日最新资讯28at.com

WebSocket client connectedReceived message: This is a new event

1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

总结

我们已经探索了在Rust中创建一个简单的消息代理,并使用WebSocket客户端对其进行测试。这个例子突出了Rust在构建高效、并发的网络应用程序方面的能力。1hY28资讯网——每日最新资讯28at.com

1hY28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-70392-0.html异步Rust:构建实时消息代理服务器

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 分享 15 个 HTML 新特性,大多数人可能不知道,建议尽早使用上

下一篇: PHP 高性能的事件循环库 Revolt

标签:
  • 热门焦点
  • Redmi Pad评测:红米充满野心的一次尝试

    从Note系列到K系列,从蓝牙耳机到笔记本电脑,红米不知不觉之间也已经形成了自己颇有竞争力的产品体系,在中端和次旗舰市场上甚至要比小米新机的表现来得更好,正所谓“大丈夫生居
  • 小米平板5 Pro 12.4简评:多专多能 兼顾影音娱乐的大屏利器

    疫情带来了网课,网课盘活了安卓平板,安卓平板市场虽然中途停滞了几年,但好的一点就是停滞的这几年行业又有了新的发展方向,例如超窄边框、高刷新率、多摄镜头组合等,这就让安卓
  • 7月安卓手机性价比榜:努比亚+红魔两款新机入榜

    7月登场的新机有努比亚Z50S Pro和红魔8S Pro,除了三星之外目前唯二的两款搭载超频版骁龙8Gen2处理器的产品,而且努比亚和红魔也一贯有着不错的性价比,所以在本次的性价比榜单
  • 线程通讯的三种方法!通俗易懂

    线程通信是指多个线程之间通过某种机制进行协调和交互,例如,线程等待和通知机制就是线程通讯的主要手段之一。 在 Java 中,线程等待和通知的实现手段有以下几种方式:Object 类下
  • 为什么你不应该使用Div作为可点击元素

    按钮是为任何网络应用程序提供交互性的最常见方式。但我们经常倾向于使用其他HTML元素,如 div span 等作为 clickable 元素。但通过这样做,我们错过了许多内置浏览器的功能。
  • 得物宠物生意「狂飙」,发力“它经济”

    作者|花花小萌主近日,得物宣布正式上线宠物鉴别,通过得物App内的&ldquo;在线鉴别&rdquo;,可找到鉴别宠物的选项。通过上传自家宠物的部位细节,就能收获拥有专业资质认证的得物鉴
  • 腾讯盖楼,字节拆墙

    来源 | 光子星球撰文 | 吴坤谚编辑 | 吴先之&ldquo;想重温暴刷深渊、30+技能搭配暴搓到爽的游戏体验吗?一起上晶核,即刻暴打!&rdquo;曾凭借直播腾讯旗下代理格斗游戏《DNF》一
  • iQOO Neo8 Pro抢先上架:首发天玑9200+ 安卓性能之王

    经过了一段时间的密集爆料,昨日iQOO官方如期对外宣布:将于5月23日推出全新的iQOO Neo8系列新品,官方称这是一款拥有旗舰级性能调校的作品。随着发布时
  • 苹果MacBook Pro 2021测试:仍不支持平滑滚动

    据10月30日9to5 Mac 消息报道,苹果新的 14 英寸和 16 英寸 MacBook Pro 2021 上市后获得了不错的评价,亮点包括行业领先的性能,令人印象深刻的电池续航,精美丰
Top