Building a Chat Application with WebSocket and Message Queue
In this blog post, we’ll take a look at how to build a chat application using websockets and message queues. We’ll be using Go as our programming language and Amazon Simple Queue Service (SQS) as our message queue.
This is adopted from a Websocket tutorial↗ where the server echos the user’s message. Our chat application will allow users to communicate with each other.
Architecture
We’ll build a simple chat server written in Go that allows multiple clients to connect and send messages to each other in real-time. It uses WebSockets to handle the communication between the server and the clients. The server maintains a list of connected clients and broadcasts incoming messages to all connected clients.
One way to scale the chat application when the number of simultaneous connections exceeds the hardware limitations of a single server is to use a message queue to distribute messages between multiple instances of the chat server running on different machines.
In this architecture, each instance of the chat server would handle a subset of the total incoming connections and maintain its own list of connected clients. When a client sends a message to the server, the server would publish the message to a message queue, such as Amazon Simple Queue Service (SQS) or Azure Service Bus, instead of broadcasting it directly to all connected clients.
The other instances of the chat server would subscribe to the message queue and receive messages published by any instance of the server. When an instance receives a message from the message queue, it would broadcast the message to all clients connected to that instance.
This architecture allows the chat application to scale horizontally by adding more instances of the chat server as needed to handle additional connections. The use of a message queue ensures that messages are distributed reliably between all instances of the server, allowing all connected clients to receive messages regardless of which instance they are connected to.
The Server
Let’s take a look at the server-side code. We’ll start by defining a Server struct that will hold the state of our server:
type Server struct {
clients map[*Client]bool
mu sync.Mutex
messageQueue *sqs.SQS
}
The clients field is a map that will keep track of all the connected clients. The mu field is a mutex that we’ll use to synchronize access to the clients map. The messageQueue field holds a SQS client to send and receive messages.
Next, we’ll add a run method to the Server struct. This method will start a goroutine that receives messages from an SQS queue and broadcasts them to all connected clients:
func (s *Server) Run() {
queueUrl := os.Getenv("QUEUE_URL")
// Start a goroutine to receive messages from the SQS queue
go func() {
for {
// Receive messages from the SQS queue using long polling
output, err := s.messageQueue.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: &queueUrl,
MaxNumberOfMessages: aws.Int64(10),
WaitTimeSeconds: aws.Int64(20),
})
if err != nil {
fmt.Println("Failed to receive messages from SQS queue:", err)
continue
}
// Broadcast received messages to all connected clients
s.mu.Lock()
for _, msg := range output.Messages {
for c := range s.clients {
c.send <- []byte(*msg.Body)
}
}
s.mu.Unlock()
// Delete received messages from the SQS queue
for _, msg := range output.Messages {
_, err = s.messageQueue.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &queueUrl,
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
fmt.Println("Failed to delete message from SQS queue:", err)
}
}
}
}()
}
Then, we start a goroutine that receives messages from the SQS queue using long polling. When messages are received, we broadcast them to all connected clients by sending them on each client’s send channel. Finally, we delete the received messages from the SQS queue.
Next, we’ll add a method to handle new clients when they connect to our server:
// Add a method to the Server struct to handle new clients
func (s *Server) HandleNewClient(conn *websocket.Conn) {
// Create a new Client object
client := &Client{
conn: conn,
send: make(chan []byte),
quit: make(chan struct{}),
}
// Add the new client to the server's clients map
s.mu.Lock()
s.clients[client] = true
s.mu.Unlock()
// Start a goroutine to handle outgoing messages for this client
go func() {
for {
select {
case msg := <-client.send:
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
case <-client.quit:
return
}
}
}()
queueUrl := os.Getenv("QUEUE_URL")
// Handle incoming messages for this client
for {
_, msg, err := conn.ReadMessage()
if err != nil {
break
}
// Print the message to the console
fmt.Printf("%s sent: %s\n", conn.RemoteAddr(), string(msg))
// Send the message to the SQS queue
_, err = s.messageQueue.SendMessage(&sqs.SendMessageInput{
MessageBody: aws.String(string(msg)),
QueueUrl: &queueUrl,
})
if err != nil {
fmt.Println("Failed to send message to SQS queue:", err)
continue
}
}
// Signal the goroutine to exit and remove the client from the server's clients map when the connection is closed
close(client.quit)
s.mu.Lock()
delete(s.clients, client)
s.mu.Unlock()
}
In this code, we create a new Client object for the connected client and add it to the server’s clients map. Then, we start a goroutine that handles outgoing messages for this client by reading from the client’s send channel and writing the messages to the websocket connection.
Next, we enter a loop that reads incoming messages from the websocket connection and sends them to the SQS queue.
Finally, when the connection is closed, we signal the goroutine to exit by closing the client’s quit channel and remove the client from the server’s clients map.
The Client
Now that we have our server-side code, let’s take a look at the client-side code. We’ll start by defining a Client struct that will hold the state of our client:
type Client struct {
conn *websocket.Conn
send chan []byte
quit chan struct{}
}
The conn field is a pointer to the websocket connection. The send field is a channel that we’ll use to send outgoing messages. The quit field is a channel that we’ll use to signal when the connection is closed.
Putting It All Together
Now that we have our server and client code, let’s put it all together in our main function:
func main() {
// Create a new SQS client
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
sqsClient := sqs.New(sess)
// Create a new Server object
server := &Server{
clients: make(map[*Client]bool),
mu: sync.Mutex{},
messageQueue: sqsClient,
}
// Start the server
server.Run()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, "Failed to upgrade connection", http.StatusInternalServerError)
return
}
// Call the server's HandleNewClient method to handle the new connection
server.HandleNewClient(conn)
})
http.ListenAndServe(":8080", nil)
}
In this code, we create a new Server object and call its run method to start the server. Then, we set up an HTTP handler for the /ws endpoint that upgrades incoming HTTP connections to websocket connections.
That’s it! We now have a fully functional chat application that uses websockets and message queues.