Real-time data pushed directly to the browser is a powerful tool when designing an application. PerformanceBridge provides client side real-time access to warehouse data changes via HarbingerJS. Applications can also generate and consume real-time messages unrelated to the warehouse for their own use. A specific use case is demonstrated in part 9 of the Rails tutorial.

Application/Platform Messenger (APM), is the platform service that allows clients to subscribe to and receive real-time updates. Architecturally, it functions as a clearinghouse that receives messages from the platform message broker and distributes them to clients subscribed to messages for a given topic.

Setup

HarbingerJS is included in the Rails Extensions gem. The gem provides both the HarbingerJS library to the asset rails pipeline as well as a Rails helper method to handle passing server side information into the JavaScript library.

HarbingerJS is also available as a standalone npm package that can be included in a package.json file.

You can then import the APM client directly:

import amqpListener from "harbinger-js/dist/amqp-listener-dist";

Warning: You must perform the HarbingerJS setup to use this code!

Connecting

Applications connect to the APM service over a websocket using the APM client (the connection falls back to HTTP long polling should the user's browser not support websockets). The APM client abstracts the details of managing the connection (including handling reconnecting if it is lost), leaving three steps for the developer to implement to begin receiving data:

  1. Before connecting, define callbacks for:
    1. what to do if the connection attempt succeeds
    2. what to do if the connection attempt fails
    3. how to handle new messages sent from APM to the application
  2. Set up the APM client by:
    1. instantiating it
    2. pointing it to the platform's APM endpoint
    3. calling the connect function, passing in the previously defined callbacks
  3. After connecting, bind to the exchanges that deal with the real-time data you're interested in.

NOTE There are defaults for most of the callbacks mentioned in the above steps. See the sections below for more information.

After successfully binding, APM immediately starts to push new messages for your subscriptions out to your application in real-time.

Example

This is a minimal example implementing the steps required to connect to real-time data using APM. Usage is explored in detail in the following sections.

import amqpListener from "harbinger-js/dist/amqp-listener-dist"

var apmClient = new amqpListener()

// Default implementations exist for the callbacks;
// newMsg and joinOk should explicitly be implemented
// to handle messages and connect to specific exhanges
// as needed.
var joinCallbacks = {
  newMsg: msg => processMessage(msg),
  joinOk: function() {
    console.log("Connected to APM")
    // Once successfully connected, automatically bind
    // the exchanges with the data we're interested in.
    bindExchanges()
  },
  joinError: () => {},
  onClose: () => {},
}

async function bindExchanges() {
  // Replace exhange name and routing key as applicable.
  // Callbacks can be provided to do something based on binding results.
  await apmClient.bindExchange("web-application-messages", "my-performance-bridge-application.#", {
    ok: function() { console.log("Bound to web-application-messages")}
  })
  // Repeat for each exchange from which you want to receive data.
  await apmClient.bindExchange("audit", "rad_exams.#", {
    ok: function() { console.log("Bound to the audit exchange.") }
  })
}

// Messages include the exchange and routing key they were sent to
// and the payload itself as a JSON object.
function processMessage(msg) {
  const routing_key = msg.routing_key
  const exchange = msg.exchange
  const payload = msg.payload

  if (exchange === "web-application-messages") {
    console.log("Received new web application message", payload)
  } else if (exchange === "audit") {
    const tokens = routing_key.split(".")
    const table = tokens[0]
    const affected_id = tokens[2]

    if (table == "rad_exams") {
      // Do something with the updated exam.
    }
    // Ignore updates to other tables.
  }

  // Do something
}

// The harbingerjsApmUrl is set via harbinger-rails-extensions.
// If using in standalone JavaScript, it is up to the developer
// to provide the correct endpoint.
var apmUrl = harbingerjsApmUrl
apmClient.setup({ url: apmUrl })

// The client is not connected to APM until this
// is executed.
apmClient.connectToChannel(joinCallbacks)

API Deep Dive

The library exports a constructor function, amqpListener, to create an APM client object. Once instantiated, this object encapsulates internal state for the connection to APM itself as well as providing several functions that allow the client to interact with the service. These functions are:

Note The convention used here naming the functions is functionName/numberOfArguments.

With the exception of setup and matchRoutingKey, the remaining functions take objects that define one or more callbacks as arguments. There are default implementations for all of the callbacks, allowing you to only provide an implementation for a given event if you want to override the default.

NOTE Default implementations for error events log the error to the console, but all other defaults are no-ops.

setup/1

This function is used to set the service's endpoint in the client. After instantiating the client object, it is called with an object having the key url:

apmClient.setup({url: "example-platform-host.com/socket"})

NOTE If APM is imported via Harbinger Rails Extensions, the APM endpoint is available as harbingerjsApmUrl after HarbingerJS is setup.

This function must be called prior to connectToChannel/1, otherwise the client won't know the service endpoint and won't connect to anything.

connectToChannel/2

This function starts the actual connection from the application to APM; it takes an object with callbacks to handle various events or outcomes of the connection attempt.

The connection events and their default callbacks:

const DEFAULT_CONNECT_CALLBACKS = {
  // Handler for new messages from subscribed real-time data sources.
  newMsg: (_msg) => {},
  // Handler for new messages from APM itself.
  systemMsg: (_msg) => {},
  // Function called after a connection is successfully established.
  joinOk: (_response) => {},
  // Function called after a connection attempt fails.
  joinError: (response) => {
    console.error(`Unable to join APM. Received: ${response.reason}`)
    },
  // Function called when the connection to APM is closed.
  onClose: () => {},
}

To use the defaults, call the function without an argument, otherwise provide your own callbacks for any you want to override.

// Defaults only
amqpClient.connectToChannel()

// Overriding systemMsg and joinOk
let myCallbacks = {
  systemMsg: (msg) => {console.warn(msg)},
  joinOk: (_) => {console.log("Connected to APM!")}
}
amqpClient.connectToChannel(myCallbacks)

bindExchange/3 and unbindExchange/3

These are grouped together because they are functional inverses of one another; they also take the same arguments.

These are their events and default callbacks:

const DEFAULT_BIND_CALLBACKS = {
  // Function called if the bind request succeeds.
  ok: () => {},
  // Function called if the bind request fails.
  error: (error) => {
    console.error(`Unable to bind to exchange. Received: ${error.reason}`)
  },
}

WARNING If real-time data is an essential part of your application, it is expected that you will override all of the joinError/error callbacks. If there is a joinError, the client will continue attempting to reconnect using an exponential backoff whether you override the callback or not; these callbacks are the proper place to e.g. notify the user that they aren't receiving real-time data. For errors when binding or unbinding exchanges, the result will be not receiving data for the exchange or continuing to receive data after attempting to stop respectively. It is up to the developer to determine how to handle these errors.

The remaining arguments for these functions are an exchange and a routing key.

INFO An exchange is an AMQP messaging entity that receives messages and distributes them to queues. The AMQP message broker determines how the messages are distributed from the exchange based on which queues have bound themselves to a given exchange and what routing key they used when doing so. A routing key is a string that can be separated into segments that is used by the broker to forward messages to the correct destination.

To use an analogy, an exchange can be thought of as a post office, a queue as a mailbox, and a routing key as an address. Each message sent to the broker is sent to an exchange with a payload and a routing key and each queue connects to an exchange (or multiple exchanges) using a routing key. When the keys match, the exchange distributes the message to the matching queue. Note that this isn't 1:1 from the exchange to a queue; multiple queues can bind with equivalent routing keys (exactly the same or via wildcard segments) and the exchange forwards a copy of the message to every queue with matching routing keys.

Routing key segments are separated by . and can use the wildcards # and *. A * matches anything within a given segment: my-pb-app.*.exam_updates would match my-pb-app.site-1.exam_updates, my-pb-app.site-2.exam_updates, etc; A # matches anything that comes after it: # is a catch-all for any message sent to the exchange, my-pb-app.# matches anything that starts with the segment my-pb-app regardless of the rest of the key.

Tying this back to APM, internally APM creates and manages an AMQP queue per application instance. When you call bindExchange/3 in the client, you designate an exchange to which your queue will be bound and the routing key to tell the AMQP broker what kind of messages you want.

Further, a single queue can bind to multiple exchanges with any combination of routing keys (including binding to the same exchange multiple times with different keys) in order to get the messages you care about. Extending the analogy, this would be as if you could tell all of the post offices in a 50 mile radius about your mailbox by sending them your address and then any of them could send you messages directly if they're addressed to you.

See the RabbitMQ documentation for more information on AMQP messaging.

// Bind to exchange web-application-messages
// with routing key 'my-pb-application.#'
// and callbacks to log the result of the bind.

let bindCallbacks = {
  ok: () => {
    console.log("Bound to web-application-messages with key my-pb-application.#")
  },
  error: (error) => {
    console.error(`Unable to bind to exchange. Received: ${error.reason}`)
  },
}
amqpClient.bindExchange('web-application-messages', 'my-pb-application.#', bindCallbacks)

// Bind to the audit exchange and subscribe to all
// of the messages it receives.
amqpClient.bindExchange('audit', '#', {
  // ...callbacks
})

Conversely, unbindExchange/3 removes the link between a queue and an exchange and shuts off the subscription to messages from that exchange with that key. The operation to unbind from the exchange bound above would look like this:

// Unbind from exchange web-application-messages
// with the routing key provided in the bind call
// and callbacks to log the result of the unbind.

let unbindCallbacks = {
  ok: () => {
    console.log("Unbound from web-application-messages with key my-pb-application.#")
  },
  error: (error) => {
    console.error(`Unable to unbind to exchange. Received: ${error.reason}`)
  },
}
amqpClient.unbindExchange('web-application-messages', 'my-pb-application.#', unbindCallbacks)

// Unbind from the audit exchange and stop receiving
// its messages.
amqpClient.unbindExchange('audit', '#', {
  // ...callbacks
})

matchRoutingKey/2

This function takes two routing keys and returns whether they are equivalent by comparing the segments with wildcards accounted for. By using it in the newMsg callback, can help in handling individual messages received from the service based on their routing key.

WARNING Note that there is only one function to handle receiving messages (passed as part of the callbacks to connectToChannel). Each incoming message has a payload and a routing key, so if an application subscribes to different types of messages and intends to handle them in different ways, then the routing key inside of the newMsg callback is the best way to identify the message type and handle it as appropriate.

amqpClient.matchRoutingKey('#', 'my-pb-application.site-1.update_exam')
// true

amqpClient.matchRoutingKey('my-pb-application.#', 'my-pb-application.site-1.update_exam')
// true

amqpClient.matchRoutingKey('my-pb-application.*.update_exam', 'my-pb-application.site-1.update_exam')
// true

amqpClient.matchRoutingKey('some-other-pb-application.#', 'my-pb-application.site-1.update_exam')
// false

Best Practices

You should avoid repeatedly connecting to and disconnecting from APM. Manually instantiating the client object and setting up connections multiple times is an anti-pattern. A potential unwanted result is opening multiple connections for a single user/session that are then kept around without being used. Depending on how exactly the connections are made, it would even be possible to receive and act on the same real-time messages multiple times across connections. The client is designed to manage and maintain a steady connection and works best when allowed to do so.

React

When using APM in a React application, it's crucial that the client connection object is set up and maintained in a long-lived part of the application. The best location to instantiate the client depends on your use case and your application architecture.

If using Redux and connecting to APM for the duration of the application's usage, placing it in its own Redux middleware is a good option. This puts it in a position in your application architecture where it can be managed via actions and trigger new actions as a result of real-time messages. It also avoids the trap of connecting in a component that may unmount and disrupt the application behavior.

If not using Redux, setting up the connection in the application's root component is a good approach because it should remain mounted for the duration of the application's lifecycle.