FlatBuffers with Phoenix Channels and Presence

This is a write up of my exploration getting Elixir Phoenix (version 1.3) to use FlatBuffers for websocket serialization. I will be building a simple chat application, which utilizes Phoenix Channels for chat messaging and Phoenix Presence to keep track of who is connected. I will also show how to implement flatbuffers on the “client” side in javascript.

But why?

I will not make an argument as to why one should do this- There are many merits to using any message format that can be debated, but you can read about Google’s reasoning for using FlatBuffers here. I merely present this information so that future developers who want to try this can (hopefully) save some time and effort.

Btw…

I am new to Elixir and have only been reading/coding in it for about 2 months now- so please feel free to correct me if you read anything that seems wrong or less than idiomatic. Thanks!

The basics.

I started by reading Binary data over Phoenix sockets, which details how to use MessagePack to send binary data over Phoenix sockets. I also found elixir-flatbuffers (thanks Björn!), which uses Port to link to the native (c++) implementation of Flatbuffers. Lastly, you will of course need Phoenix Channels and (optionally) Phoenix Presence in your project.

Setup.

A lot of this is already detailed in the Binary data over Phoenix sockets post (like how to setup Channels), but I will repeat it for the sake of consolidation.

First, add elixir-flatbuffers to your mix dependencies.

#file mix.exs

defp deps do
  [
    # ... other dependencies
    {:flatbuffer_port, [github: "Reimerei/elixir-flatbuffers"]},
  ]
end

Run mix deps.get to download it. This will give you flatbuffer_port.ex and fb_port.cpp, which will be used in flat_buffers_serializer.ex in the next section.

You will also need to add flatbuffers javascript to your package.json for the client side of things.

#file package.json

"dependencies": {
 # ... other dependencies
 "flatbuffers": "1.7.0"
},

Then run npm install in your assets folder to download that.

Also for the client side, you will need flatc, the Flatbuffer compiler that will let us generate the javascript class needed to encode/decode our flatbuffer schema. You can grab a windows .exe here or checkout the source and build it yourself.

The schema.

The flatbuffer schema below will satisfy the basic requirements for doing simple messaging and presence updating. This schema is probably sub-optimal, however I wanted to keep it as simple as possible (because this is not a post about flatbuffers).

//file chat.fbs

namespace Chat;

table Meta {
  online_at:uint;
  phx_ref:string;
}

table Metas {
  user:string;
  metas:[Meta];
}

table Payload {
  body:string;
  sender:string;
  timestamp:uint;
  status:string;
  joins:[Metas];
  leaves:[Metas];
  state:[Metas];
}

table Message {
  ref:string;
  join_ref:string;
  topic:string;
  event:string;
  payload:Payload;
  status:string;
}

root_type Message;

file_identifier "CHAT";

We also need to compile this schema for use in javascript.

Using the flatc executable, we can compile the schema by running flatc --js chat.fbs

This will output chat_generated.js, which is pretty large so I will just link to a gist.

The serializer.

Let’s create flat_buffers_serializer.ex. Another long one, so here’s the gist. The important parts of this are decode! and pack_data which are mostly just wrappers around the flatbuffer_port.ex methods. Also notice the :binary in {:socket_push, :binary, data} in encode_and_pack!. This is so the websocket knows that the payloads are binary, not text.

There is also some special handling for the messages relating to presence.

def fastlane!(%Broadcast{event: "presence_diff"} = broadcast) do 
  payload = %{ 
    joins: format_presence(broadcast.payload.joins), 
    leaves: format_presence(broadcast.payload.leaves) 
  } 
  broadcast 
  |> Map.put(:payload, payload) 
  |> pack_and_push! 
end

...

def encode!(%Message{event: "presence_state"} = msg) do 
  %Message{msg | payload: %{state: format_presence(msg.payload)}} 
  |> pack_and_push! 
end

...

defp encode_and_pack!(%{} = msg) do
 {:ok, data} = pack_data(Poison.encode!(msg))
 {:socket_push, :binary, data}
 end

defp format_presence(payload) do 
  for {key, %{metas: metas}} <- payload, into: [] do 
    %{user: key, metas: metas} 
  end 
end

This is because Presence message payloads are formatted like this:

payload: %{
  joins: %{
    "user123" => %{ 
      metas: [ %{online_at: 1507939718214, phx_ref: "wAM7sYbo0Nw="} ] 
    }
  }, 
  leaves: %{}
}

But the use of a user id as a key cannot be modeled in a Flatbuffer schema. So we have to convert it to something like this before encoding:

payload: %{
  joins: [
    %{	
      user: "user123",
      metas: [
        %{ online_at: 1507939718214, phx_ref: "wAM7sYbo0Nw="}
      ]
    }
  ],
  leaves: []
}

Finally, need to include this serializer in user_socket.ex as well.

#file project_web/channels/user_socket.ex

...
  ## Transports
  transport :websocket, Phoenix.Transports.WebSocket,
    serializer: [{Project.Transports.FlatBuffersSerializer, "~> 2.0.0"}]
...

The channel.

Create the channel by using the mix command mix phoenix.gen.channel Room rooms. This will create project_web/channels/room_channel.ex.

Modify that file with the changes below:

#file project_web/channels/RoomChannel.ex

defmodule ProjectWeb.RoomChannel do
  use ProjectWeb, :channel

  alias ProjectWeb.Presence

  def join("room:lobby", _payload, socket) do
    send self(), :after_join
    {:ok, socket}
  end

  def handle_in("message:new", message, socket) do
    broadcast! socket, "message:new", %{
      sender: socket.assigns.username,
      body: message["body"],
      timestamp: :os.system_time(:seconds)
    }
    {:noreply, socket}
  end

  def handle_info(:after_join, socket) do
    Presence.track(socket, socket.assigns.username, %{
      online_at: :os.system_time(:seconds)
    })
    push socket, "presence_state", Presence.list(socket)
    {:noreply, socket}
  end
end

Also add the following (above the transport) to user_socket.ex:

#file project_web/channels/user_socket.ex

...
  ## Channels 
  channel "room:*", Project.RoomChannel 
...

That’s pretty much it for the server side!

The chat client.

Before we can get to the client itself, we need to add a few things to app.js.

//file assets/js/app.js

import "phoenix_html"

import "flatbuffers" //this is the flatbuffers json library that we added to package.json earlier

import "./schemas/chat_generated" //this is the chat_generated.js file created earlier from flatc

import ChatClient from "./chat_client" //this is the client we are about to create
ChatClient.init()

Let’s modify our default page template to add our chatroom:

<%# file lib/project_web/templates/page/index.html.eex %>

<h2>Messages</h2>
  <ul id="MessageList" class="list-unstyled" style="height: 300px; border: 1px solid #ccc; overflow-y: auto; padding: 20px"></ul>
  <input type="text" id="NewMessage" placeholder="Type and press enter..." class="form-control">
</div>
<div  class="col-md-4">
  <h2>Online</h2>
  <ul id="UserList" class="list-unstyled">
    <li>Loading room users...</li>
  </ul>
</div>

Now create chat_client.js. Again, this is a bit long, so here’s a gist. The important parts are all in socket creation:

let socket = new Socket("/socket", {
  params: {
    username: username, 
    room_id: window.room_id
  },
  encode: encode,
  decode: decode
})
socket.connect()
socket.conn.binaryType = 'arraybuffer'

When creating the new socket, we need to pass in a custom encode and decode callback so that we can use the FlatBuffer library and generated schema. Also, we want to set the socket connection’s binaryType to 'arraybuffer' so that it knows to send and receive the data as a raw binary data buffer.

When decoding, we need to wrap the arrayBuffer into a Uint8Array to read it.

let decode = function(rawPayload, callback) {
  let bytes = new Uint8Array(rawPayload)
  let buf = new flatbuffers.ByteBuffer(bytes)
...

That’s it! Now you can send messages over Phoenix Channels using FlatBuffers!

Optimizations.

This code is probably not that efficient, however I haven’t run actual benchmarks. That said, there are some obvious rooms for improvement:

  • Don’t load/unload the schema every time a message is being encoded/decoded.
  • Modify the elixir-flatbuffer code to avoid needing to encode/decode from json.
  • Optimize the schema.

1 thought on “FlatBuffers with Phoenix Channels and Presence”

Leave a Reply

Your email address will not be published. Required fields are marked *