Lab 6: Chat Server
Due March 31, 11:59pm
Welcome to the final lab! As discussed in class, we'll be building of a little chat server composed of what we've been building in labs so far.
For details on the big picture of the lab, consult the class 6 notes.
Grading Rubric
- Code is formatted properly using
cargo fmt
: 5% - Code passes
cargo clippy
without warnings: 10% - Code passes our tests: 60%
- Responses in
questionnaire.md
: 25%
Connections
In the last lecture, we talked about how each client connection has a name associated with them, which is sent after the initial TCP connection is established.
To simplify this lab a bit, we've provided the base code for how each "connection" is actually represented in the program.
To use this in your program, create a new file at src/connection.rs
and paste the following in:
Filename: src/connection.rs
use crate::{frame::Frame::*, rw::FrameReader};
use std::{net::TcpStream, thread::sleep, time::Duration};
pub struct Connection {
pub reader: FrameReader<TcpStream>,
pub name: String,
pub id: u32,
}
impl Connection {
pub fn establish(reader: TcpStream, id: u32) -> Option<Self> {
let mut reader = FrameReader::new(reader);
sleep(Duration::from_millis(50));
let name =
if let [Simple("JOIN"), Simple(name)] = reader.read_frame().ok()?.frame().as_array()? {
name.to_string()
} else {
return None;
};
Some(Connection { reader, name, id })
}
}
Then, add the following to your src/main.rs
:
Filename: src/main.rs
mod connection;
use connection::Connection;
Creating a Server
type
As we discussed in class, there are several things that the server needs to keep track of:
- a list of the currently connected clients (each one is a
Connection
), - the
TcpListener
, which is was allow for accepting incoming connections, - a message queue, where each message is encoded into bytes already (what type should this be?),
We'll also assign unique IDs to each connection, which we can do by keeping track of a next_id: u64
value which will tell us what to assign to the next incoming connection.
To keep track of these nicely, we'll create a Server
struct with these fields, as well as a constructor called new
.
The constructor should take an address as a &str
(e.g. "127.0.0.1:6379"
), create a TcpListener
on the address, set it to nonblocking mode with .set_nonblocking(true)
, and then return a new Server
value that starts with no connections, an initial next_id
of 0, and an empty message queue.
Hint: If there's anything in the constructor that is fallibe i.e. results a
Result
, is it your responsibility to handle the error? If you think the answer is no, then you can make your constructor fallible as well by returning a result and propagating any possible errors to the caller.
Then, our main
function at src/main.rs
will be fairly simple:
- Create a new
Server
value using"127.0.0.1:6379"
. - Inside of an infinite loop: accept clients, read frames, broadcast messages, repeat.
And that's our event loop!
To make things easier to reason about, let's abstract each of these possible events into a method of Server
.
Accepting Clients
To accept clients, let's create an accept_clients
method on Server
, which takes &mut self
and returns io::Result<()>
.
Inside the function, we only want to handle the clients that are already waiting, and more importantly, we do not want to block and wait for clients that aren't even there yet.
To do this, we'll need the nonblocking
function discussed in class, which will convert WouldBlock
errors (which we get because we did .set_nonblocking(true)
) into None
s to make handling easier for us.
You can paste into your src/main.rs
:
use std::io;
fn nonblocking<T>(res: io::Result<T>) -> io::Result<Option<T>> {
match res {
Ok(value) => Ok(Some(value)),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(None),
Err(e) => Err(e),
}
}
Then, we can use a while let
loop in combination with self.listener.accept
and nonblocking
to repeatedly accept incoming connections until we would eventually have to block, at which point we want to stop looping and return from the function.
Piecing this logic together is a tricky, so if you find yourself completely stuck you should reach out and ask for help.
In each iteration of the loop, we should then have access to the TcpStream
and SocketAddr
for the incoming connection.
Edit: you'll need to also ensure that the
TcpStream
you get from.accept()
is also set to nonblocking mode.
From here, you should attempt to establish the connection (i.e. wait for them to send over their name) by calling Connection::establish
, passing in the TcpStream
and the current next_id
that the server is storing.
If the connection is successfully returned, then there are three things we need to take care of:
- enqueue a message saying that that user has joined,
- add the
Connection
to the server's list of connections, - update the
next_id
to the next integer.
If the connection failed to return, then we should just print an error like "{addr} timed our during connection"
, where addr
is the SocketAddr
.
To enqueue a message, let's write a helper method for that too, which will make things a lot easier if you decide to make your server multithreaded next week.
The message we'll send is:
&Slice(&[Simple("JOIN"), Simple(&conn.name)])
where conn
is the Connection
value.
Make sure you do
use frame::Frame::*;
at the top so you can write the frames as things likeSimple
, instead of having to sayFrame::Simple
.
Enqueuing Messages
To enqueue messages into the message queue, we'll start by creating an enqueue_message
method for Server
that takes &mut self
and a &Frame<'_>
.
This method is extremely straightforward: since each "message" in the message queue is the byte-encoded version of the Frame
, we need something to encode it into.
To do this, we'll just create a new Vec<u8>
.
Since Vec<u8>
implements the Write
trait, it means it's compatible with the WriteFrame
trait we created in lab 4.
Using that, write the Frame
into the Vec<u8>
.
Then, push the buffer encoding the frame to the back of the message queue using .push_back(buf)
.
Reading Frames
At this point, we know how to accept new clients, and so the next task is to see what they're sending us.
To do this, begin by adding a read_frames
method to Server
, taking just &mut self
.
This method will proceed as follows:
- Loop through all connections.
- For each connection
conn
, call.read_frame()
on its reader to get aResult<Guard<'_>, ReadError>
. - If the result is
Ok
, then we'll handle the frame (descibed below). If the result is an error because the clients message is pending, do nothing and continue looping. Otherwise, enqueue the message&Slice(&[Simple("LEAVE"), Simple(&conn.name)])
and remove the connection from the list. Also, if the disconnect wasn't intentional, print that an error occurred.
One major problem with this is that when we iterate over the Vec<Connection>
, we cannot remove things from it as we go.
To overcome this, we can use the .retain_mut(...)
method that Vec<T>
provides, which allows us to look at each element in a vector as a &mut T
and decide whether or not we want to retain it in the vector or not by returning a bool.
So instead of writing your code like this:
for conn in connections.iter_mut() {
// stuff here
}
We will instead be writing the following
connections.retain_mut(|conn| {
// stuff here
})
With the difference being that in the second example, the inner part must return/evalutate to a bool
, where true
means "keep this element in the vector" and false
means "remove".
This will lead to one other problem, however: if we're calling self.connections.retain_mut(...)
, then self
is mutably borrowed for the duration of retain_mut
, but this is a bad thing because we need to be able to do other things with self
inside of the function.
To overcome this, we can transfer ownership of the Vec<Connection>
out of self
and put in a dummy vector temporarily, do the retain_mut
stuff, and then put the vector back into self
with the following:
// Move ownership to temporary variable so we can
// mutate `connections` without using `self`
let mut connections = std::mem::take(&mut self.connections);
// Mutate connections
connections.retain_mut(|conn| {
// inner logic goes here
});
// Put the mutated vec back into self
self.connections = connections;
Handling Frames
We also need to handle frames sent to the server from the client.
For this we'll make a handle_frame
method that takes &mut self
, the name associated with the connection (&str
), and the &Frame<'_>
.
If the frame represents a message, i.e. is an array (consider using your .as_array()
method) of two elements with a simple string "MSG"
as its first element and a simple string (we'll call it message
), as its second element, then we should enqueue the following message:
&Slice(&[Simple("MSG"), Slice(&[Simple(name), Simple(": "), Simple(message)])])
Otherwise, it's an error which you can just report to the console with a print statement.
Checking if a connection is pending
For this lab, we'll define a connection as pending if one of two conditions are true about the returned ReadError
:
- The
ReadError
was theIo
variant, where calling.kind()
on the error returnsio::ErrorKind::WouldBlock
. - The
ReadError
was theParse
variant, and the innerParseError
was theCursor
variant, and calling.not_enough_data()
on the innerCursorError
returns true. You can avoid writing a lot of match statements by nesting patterns here as we saw in lab 1 questionnaire.
The first scenario tells us that the client hasn't sent any data over and so we shouldn't do anything on them, and the second represents that some of the bytes they were sending were delivered but not all yet. Neither of these are considered errors, and it just means we have to check in on them later.
It may be handy to make this a method on ReadError
directly as is_pending
.
Checking if the client intentionally disconnected
When a client disconnects intentionally (e.g. the client presses ctrl + C
), they'll send a special message telling us that the connection was closed on their end intentionally.
To determine if this is the case, we can check if the ReadError
is the Io
variant, and calling .kind()
on the inner io::Error
returns io::ErrorKind::WriteZero
.
Anything else means that the connection terminated under other conditions, in which case an error should be printed.
It may be handy to make this a method on ReadError
directly as is_exhausted
.
Broadcasting Messages
In the event loop, once we've finished reading the frames, we need to broadcast any messages we've been planning on sending.
To do this, we'll make a broadcast_messages
method to Server
that just takes &mut self
and returns io::Result<()>
and repeatedly pops messages from the front of the messages queue and writes them to each of the connections.
Hint: the
while let
syntax mentioned above might come in handy here.
One problem that you might encounter is that we want to write the encoded bytes back to the TcpStream
objects in each FrameReader
in each Connection
, but we cannot access it because it's hidden inside of the FrameReader
.
To overcome this, write a method for FrameReader
that gives you mutable access to the inner reader value, and use this to access the TcpStream
in the FrameReader
in each Connection
and then do .write_all(&message)?
where message
is the Vec<u8>
that you popped from the queue.
Client
You can get the client by cloning (this)[https://github.com/William103/chat-client] repo.
Then you can run it with cargo run
to test out your server.
Feedback
Please make sure that each member fills out this short feedback form individually.
Submitting
Once you're finished, you can push your changes to your git repository after ensuring that:
cargo fmt
has been runcargo clippy
passescargo test
passes
Congratulations on finishing!