Web服务器涉及的两个主要协议是超文本传输协议(HTTP)和传输控制协议(TCP)。
监听TCP连接
use std::net::TcpListener; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); println!("Connection established!"); } } //当访问此端口: // Connection established! // Connection established! // Connection established!
读取请求
use std::{ io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let http_request: Vec<_> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect(); println!("Request: {:#?}", http_request); }
此时当访问端口后,会输出以下内容:
take_while
和 for
循环都可以用于迭代集合中的元素,但它们有不同的用途和行为:
-
for
循环:-
for
循环是一种通用的迭代工具,它会遍历整个集合,处理每个元素。 -
你可以在
for
循环中执行任何你想要的操作,例如处理集合中的每个元素,筛选元素,转换元素等。 -
for
循环通常用于遍历整个集合,而不是根据特定条件选择性地处理元素。
-
-
take_while
方法:-
take_while
是一个迭代器方法,它根据条件选择性地从集合中取元素,并返回一个新的迭代器。 -
它会从集合的开头开始,一直遍历元素,直到条件不再满足为止。
-
通常,
take_while
用于处理集合中的一部分元素,而不是整个集合。
-
总之,for
循环通常用于遍历整个集合,而 take_while
通常用于处理集合的部分元素,具体取决于满足的条件。
返回实际 HTML 页面
首先在项目根目录创建一个hello.html文件:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello!</title> </head> <body> <h1>Hello!</h1> <p>Hi from Rust</p> </body> </html>
修改handle_connection,添加返回信息:
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let http_request: Vec<_> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect(); let status_line = "HTTP/1.1 200 OK"; let contents = fs::read_to_string("hello.html").unwrap(); let length = contents.len(); let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents); stream.write_all(response.as_bytes()).unwrap(); println!("Request: {:#?}", http_request); println!("Response: {:#?}", response); }
运行图:
验证请求并选择性响应
根据请求自定义响应,仅对 /
路径的请求返回HTML文件。修改handle_connection函数:
fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); // 只查看HTTP请求的第一行,调用 next 来从迭代器中获取第一个。 println!("Request: {:#?}", request_line); // let http_request: Vec<_> = buf_reader // .lines() // .map(|result| result.unwrap()) // .take_while(|line| !line.is_empty()) // .collect(); if request_line == "GET / HTTP/1.1" { let status_line = "HTTP/1.1 200 OK"; let contents = fs::read_to_string("hello.html").unwrap(); let length = contents.len(); let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents); stream.write_all(response.as_bytes()).unwrap(); }else { // other } }
运行图:
在根目录添加404页面,当请求不存在时显示:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello!</title> </head> <body> <h1>Oops!</h1> <p>Sorry, I don't know what you're asking for.</p> </body> </html>
添加else分支代码:
else { let status_line = "HTTP/1.1 404 NOT FOUND"; let contents = fs::read_to_string("404.html").unwrap(); let length = contents.len(); let response = format!( "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}" ); stream.write_all(response.as_bytes()).unwrap(); }
截图:
重构代码,提取重复代码并使404更合理。
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = if request_line == "GET / HTTP/1.1" { ("HTTP/1.1 200 OK", "hello.html") }else { ("HTTP/1.1 404 NOT FOUND", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents); stream.write_all(response.as_bytes()).unwrap(); }
效果图:
创建多线程服务器
当前服务器会逐个处理每个请求,在第一个请求处理完成之前,它不会处理第二个连接。
模拟慢请求
当执行sleep满请求时,其他请求都不会执行。
使用线程池改善吞吐量
线程池(thread pool)是一组预先分配的等待或准备处理任务的线程。可通过限制线程池中线程的数量,防止耗尽服务器的所有资源。
每个请求创建一个线程
修改main函数:
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
效果图:
使用线程池管理线程
函数定义:
#[stable(feature = "rust1", since = "1.0.0")] pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, { Builder::new().spawn(f).expect("failed to spawn thread") }
where
子句中每个约束的含义:
-
F: FnOnce() -> T
: 这个约束表示泛型参数F
必须是一个闭包(函数),并且这个闭包不需要任何参数(FnOnce()
),它的返回值类型必须与T
相匹配。也就是说,F
必须是一个能够执行一次的函数,而返回值类型必须是T
。 -
F: Send + 'static
: 这个约束要求泛型参数F
必须实现Send
特性。Send
特性表示类型是可以安全地在不同线程之间传递的。此外,'static
生命周期表示F
必须具有静态生命周期,也就是说,它的生命周期可以持续到整个程序的运行期间。 -
T: Send + 'static
: 同样,这个约束要求泛型参数T
必须实现Send
特性并且必须具有静态生命周期。
根据spawn函数参数及返回值,编写以下线程池文件:
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); // sender可以多个、receiver只能有一个,使用Arc在多个线程下使用同一个receiver let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id,Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } } }
主函数代码:
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; use hello::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); println!("request: {}", request_line); // 使用match匹配多个请求 let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(10)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents); stream.write_all(response.as_bytes()).unwrap(); }
new函数会同时创建一组Worker线程,Worker线程在创建后,因发送端无数据会阻塞当前线程,等待直到有数据可用。当执行线程池 execute方法时,job会被发送给接收端,然后Worker线程会竞争获取队列中的任务来执行。
因为Worker的new方法中使用创建了新的线程,所以是异步执行,故for可以继续创建Worker实例而不会阻塞主线程。同时每当一个工作线程完成了它的任务,就会创建一个新的具有相同标识符id的新worker实例,以继续等待任务的到来。运行图:
当发送端有job发送时,以及执行完任务后,效果图:
若使用while-let替换let来实现new:
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } }
在使用let job = receiver.lock().unwrap().recv().unwrap();的代码中,let右侧表达式中使用的临时值在let语句结束时会被立即丢弃。然而,while let(以及if let和match)不会丢弃临时值,直到关联块的末尾。while-let中,锁会一直被持有,直到job()函数执行完毕,这会导致其他线程无法获取锁并执行任务。
只有第一个线程在工作,效果图:
线程的终止与清理
为线程池添加drop trait
下面为初版代码,但其存在问题:
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); // join 等待执行任务 } } }
因为join函数需获取线程的所有权,此情况下运行会出现以下错误:
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference --> src\lib.rs:67:13 | 67 | worker.thread.join().unwrap(); | ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call | | | move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait | note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
为使drop时可以获得thread的所有权,修改Worker结构:
struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take(){ thread.join().unwrap(); } } } }
替换为Option后,可以使用Option::take方法可以获取 Option 的内部值及其所有权。
发送信号以关闭接收器
若只添加了drop trait并不会使其生效,因为线程会一直等待接收任务,不会关闭线程。需使drop时清理sender,修改ThreadPool结构:
pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, }
完整代码如下:
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; use hello::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); // 使只接受两个请求。 for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); println!("request: {}", request_line); // 使用match匹配多个请求 let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(10)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents); stream.write_all(response.as_bytes()).unwrap(); }
lib.rs:
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); // sender可以多个、receiver只能有一个,使用Arc在多个线程下使用同一个receiver let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id,Arc::clone(&receiver))); } ThreadPool { workers, sender:Some(sender) } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); // self.sender.send(job).unwrap(); self.sender.as_ref().unwrap().send(job).unwrap(); } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { //println!("Worker {id} 等待."); let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread: Some(thread), } } } impl Drop for ThreadPool { fn drop(&mut self) { // 手动调用drop drop(self.sender.take()); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take(){ thread.join().unwrap(); } // join 等待执行任务 } } }
运行图:
文章评论