How to use Redis Pub/Sub in Go Chat Application (Part 3)
In the third part of this tutorial series, we will add Redis Pub/Sub to our existing chat application (build in the previous parts). With the use of Redis Pub/Sub, we can scale our application by running multiple instances at the same time.
Posts overview
- Basic chat application with Go + Vue.JS
- Multi-room & 1 one 1 chats.
- Using Redis Pub/Sub for scalability (This page)
- Adding authentication and allow users to log-in
Preconditions
To follow along you should have completed part 1 and part 2 or grab the source from here.
What is Redis Pub/Sub?
Reds Pub/Sub is the Redis implementation of the Publish–subscribe pattern. This is a so-called “messaging pattern”, where senders of messages (publishers) don’t send their messages directly to receivers (subscribers) but publish their messages in a “channel”. Subscribers choose to subscribe to specific channels and will receive these published messages.
When we run multiple instances of the same application we can leverage these Pub/Sub channels to not only notify clients connected to the same instance but notify all clients connected to any instance.
Diagram of pub/sub subscriptions.
For our application every chat message is sent through a room, therefore we can use these rooms to publish and subscribe within their own channel. So for each (running) room, there will be a pub/sub channel (illustrated by the Room channels in the diagram above).
We would like to have a list of all the online users on each server as well, to be able to start a private chat for example. For this, we will use a “general” channel, where the ChatServer can publish and subscribe.
Ok, let’s start coding!
Step 1: Adding a persistence layer
Because Pub/Sub won’t playback missed messages we need some sort of persistence. If we scale our application after the service is running, the new instance needs a way to get all the existing data (rooms and users).
For this we will add a database, in this post we will keep it simple and use an SQLite database. Depending on your use case you ought to use a different database engine. To make this swap easy we will use the Repository Pattern.
Install the needed package with:
go get github.com/mattn/go-sqlite3
// config/database.go package config import ( "database/sql" "log" _ "github.com/mattn/go-sqlite3" ) func InitDB() *sql.DB { db, err := sql.Open("sqlite3", "./chatdb.db") if err != nil { log.Fatal(err) } sqlStmt := ` CREATE TABLE IF NOT EXISTS room ( id VARCHAR(255) NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL, private TINYINT NULL ); ` _, err = db.Exec(sqlStmt) if err != nil { log.Fatal("%q: %s\n", err, sqlStmt) } sqlStmt = ` CREATE TABLE IF NOT EXISTS user ( id VARCHAR(255) NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL ); ` _, err = db.Exec(sqlStmt) if err != nil { log.Fatal("%q: %s\n", err, sqlStmt) } return db }
// main.go .. import ( ... "github.com/jeroendk/chatApplication/config" "github.com/jeroendk/chatApplication/repository" ) func main() { ... db := config.InitDB() defer db.Close() }
The code above will initialize the database when starting the Go application.
Room Repository
Next, we will add two repository files, first the roomRepository. To be able to use the room model in all our packages, we will create an interface for it in the (new) models package. We add an interface for our roomRepository as well, this makes swapping out the implementation easier.
// models/room.go package models type Room interface { GetId() string GetName() string GetPrivate() bool } type RoomRepository interface { AddRoom(room Room) FindRoomByName(name string) Room }
// repository/roomRepository.go package repository import ( "database/sql" "github.com/jeroendk/chatApplication/models" ) type Room struct { Id string Name string Private bool } func (room *Room) GetId() string { return room.Id } func (room *Room) GetName() string { return room.Name } func (room *Room) GetPrivate() bool { return room.Private } type RoomRepository struct { Db *sql.DB } func (repo *RoomRepository) AddRoom(room models.Room) { stmt, err := repo.Db.Prepare("INSERT INTO room(id, name, private) values(?,?,?)") checkErr(err) _, err = stmt.Exec(room.GetId(), room.GetName(), room.GetPrivate()) checkErr(err) } func (repo *RoomRepository) FindRoomByName(name string) models.Room { row := repo.Db.QueryRow("SELECT id, name, private FROM room where name = ? LIMIT 1", name) var room Room if err := row.Scan(&room.Id, &room.Name, &room.Private); err != nil { if err == sql.ErrNoRows { return nil } panic(err) } return &room } func checkErr(err error) { if err != nil { panic(err) } }
The repository file has two methods, one for adding a new room and one for finding a room based on the given name.
User Repository
We will do the same thing for the users, add the interfaces and create a repository:
// models/user.go package models type User interface { GetId() string GetName() string } type UserRepository interface { AddUser(user User) RemoveUser(user User) FindUserById(ID string) User GetAllUsers() []User }
package repository import ( "database/sql" "log" "github.com/jeroendk/chatApplication/models" ) type User struct { Id string `json:"id"` Name string `json:"name"` } func (user *User) GetId() string { return user.Id } func (user *User) GetName() string { return user.Name } type UserRepository struct { Db *sql.DB } func (repo *UserRepository) AddUser(user models.User) { stmt, err := repo.Db.Prepare("INSERT INTO user(id, name) values(?,?)") checkErr(err) _, err = stmt.Exec(user.GetId(), user.GetName()) checkErr(err) } func (repo *UserRepository) RemoveUser(user models.User) { stmt, err := repo.Db.Prepare("DELETE FROM user WHERE id = ?") checkErr(err) _, err = stmt.Exec(user.GetId()) checkErr(err) } func (repo *UserRepository) FindUserById(ID string) models.User { row := repo.Db.QueryRow("SELECT id, name FROM user where id = ? LIMIT 1", ID) var user User if err := row.Scan(&user.Id, &user.Name); err != nil { if err == sql.ErrNoRows { return nil } panic(err) } return &user } func (repo *UserRepository) GetAllUsers() []models.User { rows, err := repo.Db.Query("SELECT id, name FROM user") if err != nil { log.Fatal(err) } var users []models.User defer rows.Close() for rows.Next() { var user User rows.Scan(&user.Id, &user.Name) users = append(users, &user) } return users }
The user repository has four methods:
- AddUser, to add new users to the database.
- RemoveUser, to remove a user from the database.
- FindUserById, to find one user by a given ID.
- GetAllUsers, to retrieve all users from the database.
Updating existing code to use interfaces
Before we can proceed further, we first need to update some existing code to comply with the new interfaces.
Message
// message.go import ( ... "github.com/jeroendk/chatApplication/models" ) ... type Message struct { Action string `json:"action"` Message string `json:"message"` Target *Room `json:"target"` Sender models.User `json:"sender"` // Use model.User interface } ... // UnmarshalJSON custom unmarshel to create a Client instance for Sender func (message *Message) UnmarshalJSON(data []byte) error { type Alias Message msg := &struct { Sender Client `json:"sender"` *Alias }{ Alias: (*Alias)(message), } if err := json.Unmarshal(data, &msg); err != nil { return err } message.Sender = &msg.Sender return nil }
Client
// client.go import ( ... "github.com/jeroendk/chatApplication/models" ) // Change the type sender from Client to the User interface. func (client *Client) joinRoom(roomName string, sender models.User) { ... } func (client *Client) notifyRoomJoined(room *Room, sender models.User) { ... } // Add the GetId method to make Client compatible with model.User interface func (client *Client) GetId() string { return client.ID.String() }
Room
// room.go // Add the GetPrivate method to make Room compatible with model.Room interface func (room *Room) GetPrivate() bool { return room.Private }
Step 2: Using the repositories
Currently, the chatServer is responsible for keeping track of the users and rooms. It does so by putting these entities in a map (clients & rooms). We will keep doing this but on top write both entities to the database.
For starters, add the two repositories as property in the struct and set them in the NewWebsocketServer method. We add a new property as well, “users” to keep track of all the users. The clients property is dedicated to actual clients, with an active WebSocket connection (this is in preparation for the Pub/Sub logic).
// chatServer.go import ( "github.com/jeroendk/chatApplication/models" ) type WsServer struct { ... users []models.User roomRepository models.RoomRepository userRepository models.UserRepository } func NewWebsocketServer(roomRepository models.RoomRepository, userRepository models.UserRepository) *WsServer { wsServer := &WsServer{ clients: make(map[*Client]bool), register: make(chan *Client), unregister: make(chan *Client), rooms: make(map[*Room]bool), roomRepository: roomRepository, userRepository: userRepository, } // Add users from database to server wsServer.users = userRepository.GetAllUsers() return wsServer }
When creating a new instance of the WsServer, all the users are loaded from the database.
The next step is to change te call to NewWebsocketServer in main.go and include the two repositories
// main.go ... wsServer := NewWebsocketServer(&repository.RoomRepository{Db: db}, &repository.UserRepository{Db: db})
Using the room repository
Now that we have access to the repository we can use it inside the chatServer methods. First, we will update all existing methods to use the userRepository. Below are the modified methods, within the new code is marked with a comment.
// chatServer.go func (server *WsServer) registerClient(client *Client) { // NEW: Add user to the repo server.userRepository.AddUser(client) // Existing actions server.notifyClientJoined(client) server.listOnlineClients(client) server.clients[client] = true // NEW: Add user to the user slice server.users = append(server.users, message.Sender) } func (server *WsServer) unregisterClient(client *Client) { if _, ok := server.clients[client]; ok { delete(server.clients, client) server.notifyClientLeft(client) // NEW: Remove user from slice for i, user := range server.users { if user.GetId() == message.Sender.GetId() { server.users[i] = server.users[len(server.users)-1] server.users = server.users[:len(server.users)-1] } } // NEW: Remove user from repo server.userRepository.RemoveUser(client) } } func (server *WsServer) listOnlineClients(client *Client) { // NEW: Use the users slice instead of the client map for _, user := range server.users { message := &Message{ Action: UserJoinedAction, Sender: user, } client.send <- message.encode() } }
After adding the above all the online users should be saved in the database. When a user disconnects he/she will be removed from the database.
Using the user repository
Next up are the rooms. We don’t need all the rooms when we start the server. Therefore we only try to look for it in the repository when we can’t find it in the local map.
// chatServer.go func (server *WsServer) findRoomByName(name string) *Room { var foundRoom *Room for room := range server.rooms { if room.GetName() == name { foundRoom = room break } } // NEW: if there is no room, try to create it from the repo if foundRoom == nil { // Try to run the room from the repository, if it is found. foundRoom = server.runRoomFromRepository(name) } return foundRoom } // NEW: Try to find a room in the repo, if found Run it. func (server *WsServer) runRoomFromRepository(name string) *Room { var room *Room dbRoom := server.roomRepository.FindRoomByName(name) if dbRoom != nil { room = NewRoom(dbRoom.GetName(), dbRoom.GetPrivate()) room.ID, _ = uuid.Parse(dbRoom.GetId()) go room.RunRoom() server.rooms[room] = true } return room } func (server *WsServer) createRoom(name string, private bool) *Room { room := NewRoom(name, private) // NEW: Add room to repo server.roomRepository.AddRoom(room) go room.RunRoom() server.rooms[room] = true return room }
That’s it, in the next step we will finally add the Pub/Sub integration.
Step 3: Redis Pub/Sub
Now, with everything in place, we can start to add the publishing and subscribing to Redis Pub/Sub channels.
First, install the Redis package:
go mod init go get github.com/go-redis/redis/v8
Then make sure you have a Redis container at your disposal. You can create one with docker & docker-compose for example:
# docker-compose.yml version: '3.5' services: redis: image: "redis:alpine" ports: - "6364:6379"
Then start it with docker-compose up.
With your Redis container up and running lets create a connection within our application. For this we will create a new file called redis.go and lets put it in the config folder with our database connection.
// config/redis.go package config import "github.com/go-redis/redis/v8" var Redis *redis.Client func CreateRedisClient() { opt, err := redis.ParseURL("redis://localhost:6364/0") if err != nil { panic(err) } redis := redis.NewClient(opt) Redis = redis }
Then initialize the connection from your main.go
// main.go func main() { ... config.CreateRedisClient() ... }
There are a total of 4 different messages we want to send through the Pub/Sub channels.
- Chat messages
- User joined notification
- User left notification
- Private chat invitation
Chat messages
Sending chat messages inside a room is the job of our room.go. It is actually quite easy to integrate the Pub/Sub channels in this logic.
First, we will add two new methods, for publishing in a channel and subscribing to a channel:
// room.go package main import ( "fmt" "log" "github.com/jeroendk/chatApplication/config" "github.com/google/uuid" "context" ) var ctx = context.Background() ... func (room *Room) publishRoomMessage(message []byte) { err := config.Redis.Publish(ctx, room.GetName(), message).Err() if err != nil { log.Println(err) } } func (room *Room) subscribeToRoomMessages() { pubsub := config.Redis.Subscribe(ctx, room.GetName()) ch := pubsub.Channel() for msg := range ch { room.broadcastToClientsInRoom([]byte(msg.Payload)) } }
Then we will change the existing calls to broadcastToClientsInRoom, instead, they will use the new publish method. Also, start listing to the Pub/Sub subscription when starting the room.
// room.go func (room *Room) RunRoom() { // subscribe to pub/sub messages inside a new goroutine go room.subscribeToRoomMessages() for { select { ... case message := <-room.broadcast: room.publishRoomMessage(message.encode()) } } } func (room *Room) notifyClientJoined(client *Client) { ... room.publishRoomMessage(message.encode()) }
User joined & left
Next, lets publish when users join & leave and subscribe to these events inside the chatServer.go
// chatServer.go package main import ( "encoding/json" "log" "github.com/google/uuid" "github.com/jeroendk/chatApplication/config" "github.com/jeroendk/chatApplication/models" ) const PubSubGeneralChannel = "general" // Publish userJoined message in pub/sub func (server *WsServer) publishClientJoined(client *Client) { message := &Message{ Action: UserJoinedAction, Sender: client, } if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil { log.Println(err) } } // Publish userleft message in pub/sub func (server *WsServer) publishClientLeft(client *Client) { message := &Message{ Action: UserLeftAction, Sender: client, } if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil { log.Println(err) } } // Listen to pub/sub general channels func (server *WsServer) listenPubSubChannel() { pubsub := config.Redis.Subscribe(ctx, PubSubGeneralChannel) ch := pubsub.Channel() for msg := range ch { var message Message if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil { log.Printf("Error on unmarshal JSON message %s", err) return } switch message.Action { case UserJoinedAction: server.handleUserJoined(message) case UserLeftAction: server.handleUserLeft(message) } } } func (server *WsServer) handleUserJoined(message Message) { // Add the user to the slice server.users = append(server.users, message.Sender) server.broadcastToClients(message.encode()) } func (server *WsServer) handleUserLeft(message Message) { // Remove the user from the slice for i, user := range server.users { if user.GetId() == message.Sender.GetId() { server.users[i] = server.users[len(server.users)-1] server.users = server.users[:len(server.users)-1] } } server.broadcastToClients(message.encode()) }
publishClientJoined & publishClientLeft will replace notifyClientJoined & notifyClientLeft.
Then again start listening to the Channel and make sure the publish methods above are correctly used:
// chatServer.go func (server *WsServer) Run() { go server.listenPubSubChannel() ... } func (server *WsServer) registerClient(client *Client) { // Add user to the repo server.userRepository.AddUser(client) // Publish user in PubSub server.publishClientJoined(client) server.listOnlineClients(client) server.clients[client] = true } func (server *WsServer) unregisterClient(client *Client) { if _, ok := server.clients[client]; ok { delete(server.clients, client) // Remove user from repo server.userRepository.RemoveUser(client) // Publish user left in PubSub server.publishClientLeft(client) } }
Private chat
Almost done, the last piece of the puzzle is to let our users start private chats with each other while they are connected with different servers.
Star by changing the logic of client.go
// client.go import ( ... "github.com/jeroendk/chatApplication/config" ... ) func (client *Client) handleJoinRoomPrivateMessage(message Message) { // instead of searching for a client, search for User by the given ID. target := client.wsServer.findUserByID(message.Message) if target == nil { return } // create unique room name combined to the two IDs roomName := message.Message + client.ID.String() // Join room joinedRoom := client.joinRoom(roomName, target) // Instead of instantaneously joining the target client. // Let the target client join with a invite request over pub/sub if joinedRoom != nil { client.inviteTargetUser(target, joinedRoom) } } // JoinRoom now returns a room or nil func (client *Client) joinRoom(roomName string, sender models.User) *Room { room := client.wsServer.findRoomByName(roomName) if room == nil { room = client.wsServer.createRoom(roomName, sender != nil) } // Don't allow to join private rooms through public room message if sender == nil && room.Private { return nil } if !client.isInRoom(room) { client.rooms[room] = true room.register <- client client.notifyRoomJoined(room, sender) } return room } // Send out invite message over pub/sub in the general channel. func (client *Client) inviteTargetUser(target models.User, room *Room) { inviteMessage := &Message{ Action: JoinRoomPrivateAction, Message: target.GetId(), Target: room, Sender: client, } if err := config.Redis.Publish(ctx, PubSubGeneralChannel, inviteMessage.encode()).Err(); err != nil { log.Println(err) } }
So our client is once again able to start a private chat. All we have to do now is to make sure the target client will join as well.
Add the code below to your chatServer.go. The first part adds one extra case in the Switch, to handle private chat invitations.
// chatServer.go func (server *WsServer) listenPubSubChannel() { ... switch message.Action { ... case JoinRoomPrivateAction: server.handleUserJoinPrivate(message) } } func (server *WsServer) handleUserJoinPrivate(message Message) { // Find client for given user, if found add the user to the room. targetClient := server.findClientByID(message.Message) if targetClient != nil { targetClient.joinRoom(message.Target.GetName(), message.Sender) } } // Add the findUserByID method used by client.go func (server *WsServer) findUserByID(ID string) models.User { var foundUser models.User for _, client := range server.users { if client.GetId() == ID { foundUser = client break } } return foundUser }
Result
To test the new set-up, you can start multiple instances of your application on different ports. make sure your Javascript WebSocket actually connects to the correct server. You can change the connection string as follows:
serverUrl: "ws://" + location.host + "/ws",
Then:
go run ./ --addr=:8080 go run ./ --addr=:8090
Done! You finished your Pub/Sub chat application in Go. Stay tuned for the last part of this series. There we will allow users to log-in before they can participate in a chat.
If you want your users to automatically reconnect after a short outage of some sort, check this out.
Feel free to leave a comment when you have suggestions or questions!
The final source code of this part van be found here:
https://github.com/jeroendk/go-vuejs-chat/tree/v3.0