How to use Redis Pub/Sub in Go Chat Application (Part 3)

In the third part of this tutorial series, we will add Redis Pub/Sub to our existing chat application (build in the previous parts). With the use of Redis Pub/Sub, we can scale our application by running multiple instances at the same time.

Posts overview

Preconditions

To follow along you should have completed part 1 and part 2 or grab the source from here.

What is Redis Pub/Sub?

Reds Pub/Sub is the Redis implementation of the Publish–subscribe pattern. This is a so-called “messaging pattern”, where senders of messages (publishers) don’t send their messages directly to receivers (subscribers) but publish their messages in a “channel”. Subscribers choose to subscribe to specific channels and will receive these published messages.

When we run multiple instances of the same application we can leverage these Pub/Sub channels to not only notify clients connected to the same instance but notify all clients connected to any instance.

Diagram of pub/sub subscriptions.

For our application every chat message is sent through a room, therefore we can use these rooms to publish and subscribe within their own channel. So for each (running) room, there will be a pub/sub channel (illustrated by the Room channels in the diagram above).

We would like to have a list of all the online users on each server as well, to be able to start a private chat for example. For this, we will use a “general” channel, where the ChatServer can publish and subscribe.

Ok, let’s start coding!

Step 1: Adding a persistence layer

Because Pub/Sub won’t playback missed messages we need some sort of persistence. If we scale our application after the service is running, the new instance needs a way to get all the existing data (rooms and users).

For this we will add a database, in this post we will keep it simple and use an SQLite database. Depending on your use case you ought to use a different database engine. To make this swap easy we will use the Repository Pattern.

Install the needed package with:

go get github.com/mattn/go-sqlite3
// config/database.go
package config

import (
	"database/sql"
	"log"

	_ "github.com/mattn/go-sqlite3"
)

func InitDB() *sql.DB {
	db, err := sql.Open("sqlite3", "./chatdb.db")
	if err != nil {
		log.Fatal(err)
	}

	sqlStmt := `	
	CREATE TABLE IF NOT EXISTS room (
		id VARCHAR(255) NOT NULL PRIMARY KEY,
		name VARCHAR(255) NOT NULL,
		private TINYINT NULL
	);
	`
	_, err = db.Exec(sqlStmt)
	if err != nil {
		log.Fatal("%q: %s\n", err, sqlStmt)
	}

	sqlStmt = `	
	CREATE TABLE IF NOT EXISTS user (
		id VARCHAR(255) NOT NULL PRIMARY KEY,
		name VARCHAR(255) NOT NULL
	);
	`
	_, err = db.Exec(sqlStmt)
	if err != nil {
		log.Fatal("%q: %s\n", err, sqlStmt)
	}

	return db
}
// main.go
..
import (
	...
	"github.com/jeroendk/chatApplication/config"
	"github.com/jeroendk/chatApplication/repository"
)

func main() {
 	...
    db := config.InitDB()
	defer db.Close()
}

The code above will initialize the database when starting the Go application.

Room Repository

Next, we will add two repository files, first the roomRepository. To be able to use the room model in all our packages, we will create an interface for it in the (new) models package. We add an interface for our roomRepository as well, this makes swapping out the implementation easier.

// models/room.go
package models

type Room interface {
	GetId() string
	GetName() string
	GetPrivate() bool
}

type RoomRepository interface {
	AddRoom(room Room)
	FindRoomByName(name string) Room
}
// repository/roomRepository.go

package repository

import (
	"database/sql"

	"github.com/jeroendk/chatApplication/models"
)

type Room struct {
	Id      string
	Name    string
	Private bool
}

func (room *Room) GetId() string {
	return room.Id
}

func (room *Room) GetName() string {
	return room.Name
}

func (room *Room) GetPrivate() bool {
	return room.Private
}

type RoomRepository struct {
	Db *sql.DB
}

func (repo *RoomRepository) AddRoom(room models.Room) {
	stmt, err := repo.Db.Prepare("INSERT INTO room(id, name, private) values(?,?,?)")
	checkErr(err)

	_, err = stmt.Exec(room.GetId(), room.GetName(), room.GetPrivate())
	checkErr(err)
}

func (repo *RoomRepository) FindRoomByName(name string) models.Room {

	row := repo.Db.QueryRow("SELECT id, name, private FROM room where name = ? LIMIT 1", name)

	var room Room

	if err := row.Scan(&room.Id, &room.Name, &room.Private); err != nil {
		if err == sql.ErrNoRows {
			return nil
		}
		panic(err)
	}

	return &room

}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

The repository file has two methods, one for adding a new room and one for finding a room based on the given name.

User Repository

We will do the same thing for the users, add the interfaces and create a repository:

// models/user.go
package models

type User interface {
	GetId() string
	GetName() string
}

type UserRepository interface {
	AddUser(user User)
	RemoveUser(user User)
	FindUserById(ID string) User
	GetAllUsers() []User
}
package repository

import (
	"database/sql"
	"log"

	"github.com/jeroendk/chatApplication/models"
)

type User struct {
	Id   string `json:"id"`
	Name string `json:"name"`
}

func (user *User) GetId() string {
	return user.Id
}

func (user *User) GetName() string {
	return user.Name
}

type UserRepository struct {
	Db *sql.DB
}

func (repo *UserRepository) AddUser(user models.User) {
	stmt, err := repo.Db.Prepare("INSERT INTO user(id, name) values(?,?)")
	checkErr(err)

	_, err = stmt.Exec(user.GetId(), user.GetName())
	checkErr(err)
}

func (repo *UserRepository) RemoveUser(user models.User) {
	stmt, err := repo.Db.Prepare("DELETE FROM user WHERE id = ?")
	checkErr(err)

	_, err = stmt.Exec(user.GetId())
	checkErr(err)
}

func (repo *UserRepository) FindUserById(ID string) models.User {

	row := repo.Db.QueryRow("SELECT id, name FROM user where id = ? LIMIT 1", ID)

	var user User

	if err := row.Scan(&user.Id, &user.Name); err != nil {
		if err == sql.ErrNoRows {
			return nil
		}
		panic(err)
	}

	return &user

}

func (repo *UserRepository) GetAllUsers() []models.User {

	rows, err := repo.Db.Query("SELECT id, name FROM user")

	if err != nil {
		log.Fatal(err)
	}
	var users []models.User
	defer rows.Close()
	for rows.Next() {
		var user User
		rows.Scan(&user.Id, &user.Name)
		users = append(users, &user)
	}

	return users
}

The user repository has four methods:

  1. AddUser, to add new users to the database.
  2. RemoveUser, to remove a user from the database.
  3. FindUserById, to find one user by a given ID.
  4. GetAllUsers, to retrieve all users from the database.

Updating existing code to use interfaces

Before we can proceed further, we first need to update some existing code to comply with the new interfaces.

Message

// message.go
import (
	...
	"github.com/jeroendk/chatApplication/models"
)

... 

type Message struct {
	Action  string      `json:"action"`
	Message string      `json:"message"`
	Target  *Room       `json:"target"`
	Sender  models.User `json:"sender"` // Use model.User interface
}

...

// UnmarshalJSON custom unmarshel to create a Client instance for Sender 
func (message *Message) UnmarshalJSON(data []byte) error {
	type Alias Message
	msg := &struct {
		Sender Client `json:"sender"`
		*Alias
	}{
		Alias: (*Alias)(message),
	}
	if err := json.Unmarshal(data, &msg); err != nil {
		return err
	}
	message.Sender = &msg.Sender
	return nil
}

Client

// client.go
import (
    ...
	"github.com/jeroendk/chatApplication/models"
)


// Change the type sender from Client to the User interface.
func (client *Client) joinRoom(roomName string, sender models.User) {
  ...
}

func (client *Client) notifyRoomJoined(room *Room, sender models.User) {
  ...
}

// Add the GetId method to make Client compatible with model.User interface
func (client *Client) GetId() string {
	return client.ID.String()
}

Room

// room.go

// Add the GetPrivate method to make Room compatible with model.Room interface
func (room *Room) GetPrivate() bool {
	return room.Private
}

Step 2: Using the repositories

Currently, the chatServer is responsible for keeping track of the users and rooms. It does so by putting these entities in a map (clients & rooms). We will keep doing this but on top write both entities to the database.

For starters, add the two repositories as property in the struct and set them in the NewWebsocketServer method. We add a new property as well, “users” to keep track of all the users. The clients property is dedicated to actual clients, with an active WebSocket connection (this is in preparation for the Pub/Sub logic).

// chatServer.go
import (	
	"github.com/jeroendk/chatApplication/models"
)

type WsServer struct {
	...
    users          []models.User
	roomRepository models.RoomRepository
	userRepository models.UserRepository
}

func NewWebsocketServer(roomRepository models.RoomRepository, userRepository models.UserRepository) *WsServer {
	wsServer := &WsServer{
		clients:        make(map[*Client]bool),
		register:       make(chan *Client),
		unregister:     make(chan *Client),
		rooms:          make(map[*Room]bool),
		roomRepository: roomRepository,
		userRepository: userRepository,
	}

	// Add users from database to server
	wsServer.users = userRepository.GetAllUsers()

	return wsServer
}

When creating a new instance of the WsServer, all the users are loaded from the database.

The next step is to change te call to NewWebsocketServer in main.go and include the two repositories

// main.go
...
wsServer := NewWebsocketServer(&repository.RoomRepository{Db: db}, &repository.UserRepository{Db: db})

Using the room repository

Now that we have access to the repository we can use it inside the chatServer methods. First, we will update all existing methods to use the userRepository. Below are the modified methods, within the new code is marked with a comment.

// chatServer.go

func (server *WsServer) registerClient(client *Client) {
  	// NEW:  Add user to the repo
	server.userRepository.AddUser(client)    

    // Existing actions
    server.notifyClientJoined(client)
	server.listOnlineClients(client)
	server.clients[client] = true

    // NEW: Add user to the user slice
	server.users = append(server.users, message.Sender)
}

func (server *WsServer) unregisterClient(client *Client) {
	if _, ok := server.clients[client]; ok {
		delete(server.clients, client)
		server.notifyClientLeft(client)

      	// NEW: Remove user from slice
        for i, user := range server.users {
          if user.GetId() == message.Sender.GetId() {
            server.users[i] = server.users[len(server.users)-1]
            server.users = server.users[:len(server.users)-1]
          }
        }

      	// NEW: Remove user from repo
		server.userRepository.RemoveUser(client)
	}
}

func (server *WsServer) listOnlineClients(client *Client) {
  	// NEW: Use the users slice instead of the client map
   	for _, user := range server.users {
      message := &Message{
        Action: UserJoinedAction,
        Sender: user,
      }
      client.send <- message.encode()
    }
}

After adding the above all the online users should be saved in the database. When a user disconnects he/she will be removed from the database.

Using the user repository

Next up are the rooms. We don’t need all the rooms when we start the server. Therefore we only try to look for it in the repository when we can’t find it in the local map.

// chatServer.go

func (server *WsServer) findRoomByName(name string) *Room {
	var foundRoom *Room
	for room := range server.rooms {
		if room.GetName() == name {
			foundRoom = room
			break
		}
	}
	
    // NEW: if there is no room, try to create it from the repo
	if foundRoom == nil {
		// Try to run the room from the repository, if it is found.
		foundRoom = server.runRoomFromRepository(name)
	}

	return foundRoom
}

// NEW: Try to find a room in the repo, if found Run it.
func (server *WsServer) runRoomFromRepository(name string) *Room {
	var room *Room
	dbRoom := server.roomRepository.FindRoomByName(name)
	if dbRoom != nil {
		room = NewRoom(dbRoom.GetName(), dbRoom.GetPrivate())
		room.ID, _ = uuid.Parse(dbRoom.GetId())

		go room.RunRoom()
		server.rooms[room] = true
	}

	return room
}

func (server *WsServer) createRoom(name string, private bool) *Room {
	room := NewRoom(name, private)
    // NEW: Add room to repo
	server.roomRepository.AddRoom(room)

	go room.RunRoom()
	server.rooms[room] = true

	return room
}

That’s it, in the next step we will finally add the Pub/Sub integration.

Step 3: Redis Pub/Sub

Now, with everything in place, we can start to add the publishing and subscribing to Redis Pub/Sub channels.

First, install the Redis package:

go mod init
go get github.com/go-redis/redis/v8

Then make sure you have a Redis container at your disposal. You can create one with docker & docker-compose for example:

# docker-compose.yml
version: '3.5'

services:
  redis:
    image: "redis:alpine"
    ports:
      - "6364:6379"

Then start it with docker-compose up.

With your Redis container up and running lets create a connection within our application. For this we will create a new file called redis.go and lets put it in the config folder with our database connection.

// config/redis.go

package config

import "github.com/go-redis/redis/v8"

var Redis *redis.Client

func CreateRedisClient() {
	opt, err := redis.ParseURL("redis://localhost:6364/0")
	if err != nil {
		panic(err)
	}

	redis := redis.NewClient(opt)
	Redis = redis
}

Then initialize the connection from your main.go

// main.go

func main() {
    ...
	config.CreateRedisClient()
    ...
}

There are a total of 4 different messages we want to send through the Pub/Sub channels.

  • Chat messages
  • User joined notification
  • User left notification
  • Private chat invitation

Chat messages

Sending chat messages inside a room is the job of our room.go. It is actually quite easy to integrate the Pub/Sub channels in this logic.

First, we will add two new methods, for publishing in a channel and subscribing to a channel:

// room.go
package main
import (
	"fmt"
	"log"
	"github.com/jeroendk/chatApplication/config"
	"github.com/google/uuid"
	"context"
)

var ctx = context.Background()

...
func (room *Room) publishRoomMessage(message []byte) {
	err := config.Redis.Publish(ctx, room.GetName(), message).Err()

	if err != nil {
		log.Println(err)
	}
}

func (room *Room) subscribeToRoomMessages() {
	pubsub := config.Redis.Subscribe(ctx, room.GetName())

	ch := pubsub.Channel()

	for msg := range ch {
		room.broadcastToClientsInRoom([]byte(msg.Payload))
	}
}

Then we will change the existing calls to broadcastToClientsInRoom, instead, they will use the new publish method. Also, start listing to the Pub/Sub subscription when starting the room.

// room.go 
func (room *Room) RunRoom() {
    // subscribe to pub/sub messages inside a new goroutine
	go room.subscribeToRoomMessages()

	for {
		select {
		...
		case message := <-room.broadcast:
			room.publishRoomMessage(message.encode())
		}
	}
}

func (room *Room) notifyClientJoined(client *Client) {
	...
	room.publishRoomMessage(message.encode())
}

User joined & left

Next, lets publish when users join & leave and subscribe to these events inside the chatServer.go

// chatServer.go
package main

import (
	"encoding/json"
	"log"

	"github.com/google/uuid"
	"github.com/jeroendk/chatApplication/config"
	"github.com/jeroendk/chatApplication/models"
)

const PubSubGeneralChannel = "general"

// Publish userJoined message in pub/sub
func (server *WsServer) publishClientJoined(client *Client) {

	message := &Message{
		Action: UserJoinedAction,
		Sender: client,
	}

	if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil {
		log.Println(err)
	}
}

// Publish userleft message in pub/sub
func (server *WsServer) publishClientLeft(client *Client) {

	message := &Message{
		Action: UserLeftAction,
		Sender: client,
	}

	if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil {
		log.Println(err)
	}
}

// Listen to pub/sub general channels
func (server *WsServer) listenPubSubChannel() {

	pubsub := config.Redis.Subscribe(ctx, PubSubGeneralChannel)
	ch := pubsub.Channel()
	for msg := range ch {

		var message Message
		if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil {
			log.Printf("Error on unmarshal JSON message %s", err)
			return
		}

		switch message.Action {
		case UserJoinedAction:
			server.handleUserJoined(message)
		case UserLeftAction:
			server.handleUserLeft(message)		
		}
	}
}

func (server *WsServer) handleUserJoined(message Message) {
	// Add the user to the slice
	server.users = append(server.users, message.Sender)
	server.broadcastToClients(message.encode())
}

func (server *WsServer) handleUserLeft(message Message) {
	// Remove the user from the slice
	for i, user := range server.users {
		if user.GetId() == message.Sender.GetId() {
			server.users[i] = server.users[len(server.users)-1]
			server.users = server.users[:len(server.users)-1]
		}
	}
	server.broadcastToClients(message.encode())
}

publishClientJoined & publishClientLeft will replace notifyClientJoined & notifyClientLeft.

Then again start listening to the Channel and make sure the publish methods above are correctly used:

// chatServer.go
func (server *WsServer) Run() {
	go server.listenPubSubChannel()
	...
}

func (server *WsServer) registerClient(client *Client) {
	// Add user to the repo
	server.userRepository.AddUser(client)

	// Publish user in PubSub
	server.publishClientJoined(client)

	server.listOnlineClients(client)
	server.clients[client] = true
}

func (server *WsServer) unregisterClient(client *Client) {
	if _, ok := server.clients[client]; ok {
		delete(server.clients, client)

		// Remove user from repo
		server.userRepository.RemoveUser(client)

		// Publish user left in PubSub
		server.publishClientLeft(client)
	}
}

Private chat

Almost done, the last piece of the puzzle is to let our users start private chats with each other while they are connected with different servers.

Star by changing the logic of client.go

// client.go


import (	
 	...
	"github.com/jeroendk/chatApplication/config"
	...
)

func (client *Client) handleJoinRoomPrivateMessage(message Message) {
    // instead of searching for a client, search for User by the given ID.
	target := client.wsServer.findUserByID(message.Message)
	if target == nil {
		return
	}

	// create unique room name combined to the two IDs
	roomName := message.Message + client.ID.String()

	// Join room
	joinedRoom := client.joinRoom(roomName, target)

	// Instead of instantaneously joining the target client. 
    // Let the target client join with a invite request over pub/sub
	if joinedRoom != nil {
		client.inviteTargetUser(target, joinedRoom)
	}
}

// JoinRoom now returns a room or nil
func (client *Client) joinRoom(roomName string, sender models.User) *Room {

	room := client.wsServer.findRoomByName(roomName)
	if room == nil {
		room = client.wsServer.createRoom(roomName, sender != nil)
	}

	// Don't allow to join private rooms through public room message
	if sender == nil && room.Private {
		return nil
	}

	if !client.isInRoom(room) {
		client.rooms[room] = true
		room.register <- client
		client.notifyRoomJoined(room, sender)
	}
	return room
}

// Send out invite message over pub/sub in the general channel.
func (client *Client) inviteTargetUser(target models.User, room *Room) {
	inviteMessage := &Message{
		Action:  JoinRoomPrivateAction,
		Message: target.GetId(),
		Target:  room,
		Sender:  client,
	}

	if err := config.Redis.Publish(ctx, PubSubGeneralChannel, inviteMessage.encode()).Err(); err != nil {
		log.Println(err)
	}
}

So our client is once again able to start a private chat. All we have to do now is to make sure the target client will join as well.

Add the code below to your chatServer.go. The first part adds one extra case in the Switch, to handle private chat invitations.

// chatServer.go
func (server *WsServer) listenPubSubChannel() {
	...

		switch message.Action {
		...
		case JoinRoomPrivateAction:
			server.handleUserJoinPrivate(message)
		}
}

func (server *WsServer) handleUserJoinPrivate(message Message) {
	// Find client for given user, if found add the user to the room.
	targetClient := server.findClientByID(message.Message)
	if targetClient != nil {
		targetClient.joinRoom(message.Target.GetName(), message.Sender)
	}
}

// Add the findUserByID method used by client.go
func (server *WsServer) findUserByID(ID string) models.User {
	var foundUser models.User
	for _, client := range server.users {
		if client.GetId() == ID {
			foundUser = client
			break
		}
	}

	return foundUser
}

Result

To test the new set-up, you can start multiple instances of your application on different ports. make sure your Javascript WebSocket actually connects to the correct server. You can change the connection string as follows:

serverUrl: "ws://" + location.host + "/ws",

Then:

go run ./ --addr=:8080
go run ./ --addr=:8090

Done! You finished your Pub/Sub chat application in Go. Stay tuned for the last part of this series. There we will allow users to log-in before they can participate in a chat.

If you want your users to automatically reconnect after a short outage of some sort, check this out.

Feel free to leave a comment when you have suggestions or questions!

The final source code of this part van be found here:
https://github.com/jeroendk/go-vuejs-chat/tree/v3.0