In this section you will implement a distinguishing feature of Bridge: displaying real-time data, by adding and removing exams from the worklist (no refreshing or polling). The notes area will be converted into a real-time communication tool like a chat or comment section in a social networking application. This code uses the JavaScript library HarbingerJS (included in harbinger-rails-extensions, further documentation.

Platform Architecture

The communication hub of Bridge is an message bus using the AMQP protocol. Various platform components use this bus for queueing and communication. These communications include an audit stream of all clinical information and configuration changes. Applications can subscribe to these messages through a service and JavaScript library provided in the harbinger-rails-extensions gem, based on cometd.

HarbingerJS provides an interface for automatically creating an AMQP queue tied to a clients web browser and binding that queue to the messaging exchanges of Bridge. JavaScript callbacks can be attached to specific message types to drive the functionality of an application in real time.

Prerequisites

From notes to live comments

Currently, the application allows a logged in user to add clinical notes to an exam. The goal of this section is to display notes someone else enters appear to all other concurrently logged in clients. The plan to accomplish this:

Model, controller, and view updates

Start with model changes to send a message using harbinger-rails-extensions any time a note is created. Add a method to app/models/note.rb:

def send_amqp_message
  Harbinger::Rails::Extensions::Messaging.send_application_message("railsdevelopertutorial","#{self.employee_id}.#{self.rad_exam_id}",self.attributes.to_json)
end

This makes an instance method for a Note that uses the send_application_message method provided by harbinger-rails-extensions. send_application_message takes three arguments:

  1. The application name (the canonical / approved short name of the application)
  2. A routing key
  3. The payload or body of the message. This can be any string.

The send_application_message function combines the routing key (argument 2) and the application name as the first token in the routing key used to send the message. For this application, the routing key is the employee_id of the note creator and the rad_exam_id of the associated exam as additional tokens. The payload of the message is a JSON representation of the notes attributes.

Best Practice - Routing keys are used to filter and define which messages an application receives. For that reason it is important to make them as specific as possible and contain tokens that could be used as filters in a routing key matcher upon binding. If this application was going to send other types of messages, you should add a token for the type of message, say a token with the value of note at the beginning of our routing key.

send_amqp_message needs to be called upon successful note creation. Edit app/controllers/notes_controller.rb:

  def create
    # Get the logged in employee
    employee = Java::HarbingerSdkData::Employee.withUserName(session[:username],@entity_manager)
    # Create our ActiveRecord Object
    note = Note.new({ employee_id: employee.getId,
                      rad_exam_id: params[:rad_exam_id],
                      note: params[:note],
                      created: Time.now()
                    })
    # Save it to the database
    if note.save
      # Send this note as a message over the Bridge messaging bus
      note.send_amqp_message()
      # Render our notes/note partial with some local variables
      render :partial => "notes/note", locals: {note: note, employee: employee}
    else
      # Render our error status if it fails to save
      render :status => 500, :text => "Failed to save the note"
    end
  end

  def get
    employee = Java::HarbingerSdkData::Employee.withUserName(session[:username],@entity_manager)
    note = Note.find(params[:id])
    render partial: "notes/note", locals: {note: note, employee: employee}
  end

The new get a method/action takes an id as an HTTP parameter and returns the markup for a note. The call to note.send_amqp_message() was added after the note is saved, so now each successful new note creation will also send a message over the messaging bus.

Tip - Rather than creating an instance method called in the controller, you could also use an ActiveRecord creation callback. There are benefits and drawbacks to this method, so be aware of the option.

The final server side update is to add a route for the new method/action. Edit config/routes.rb to add:

  get 'notes/get' => "notes#get"

Test out our new method browsing to /notes/get?id=[sometestid] (once logged in) and reviewing the returned markup.

Next, update the views: add some id attributes to the table rows to improve access those DOM elements and create a place to give the user feedback of events that are happening. Edit app/views/worklist/index.html:

<tr class="data-row" id="data-row-<%= exam.id %>">
<!--- ... -->
<tr class="note-row" id="note-row-<%= exam.id %>">
<!--...the bottom of index.html-->
<div id="notifier"></div>

Add some additional CSS to app/assets/stylesheets/application.css:

#notifier {
  position: fixed;
  right: 20px;
  bottom: 15px;
  width: 200px;
  z-index: 1000;
}

JavaScript

To listen for these messages, add the required HarbingerJS libraries to app/assets/javascripts/application.js:

//= require harbinger-js/cometd/cometd.js
//= require harbinger-js/cometd/jquery.cometd.js
//= require harbinger-js/amqp-listener.js

Since this functionality is unrelated to the other worklist event handlers, create a new JavaScript file to include in app/views/worklist/index.html similar to the worklist.js file. . This will begin with with a utility function to drive notifications and then establish a connection, through the harbingerjs.amqp library. Create app/assets/javascripts/pages/realtime.js:

//To ensure all libraries are loaded before this fires we put it in a document ready callback
$(document).ready(function() {

    /* Utility functions */
    /*********************/

    // Here we have a simple notification system. It adds a bootstrap alert div to the
    // page with a given css class and message and then removes it after 3 seconds.
    var notifier_count = 1;

    function notifier(klass,message) {
    $("#notifier").append('<div class="alert ' + klass + '" id="notification-' + notifier_count + '"><p>' + message + '</p></div>');
    var selector = "#notification-" + notifier_count;
    setTimeout(function() { $(selector).remove(); }, 3000);
    notifier_count++;
    }

    /* Setting up real time connection */
    /***********************************/

    //Add a disconnect callback
    $.harbingerjs.amqp.addCallback('disconnect',function(message) {
    notifier("alert-danger","Oh no! We lost the real time data connection.");
    });

    //Add a connect callback
    $.harbingerjs.amqp.addCallback('connect',function(message) {
    notifier("alert-success","Connected to real time data.");
    });

    //Setup our real time connection library with the url set in
    //app/views/layouts/application.html.erb using harbingerjs_setup()
    //The setup command starts the connection and fires off the
    //callbacks we've defined above.
    $.harbingerjs.amqp.setup({url: harbingerjsCometdURL});

});

Include this file in the page by editing the top of app/views/worklist/index.html:

<% content_for(:javascript_includes, javascript_include_tag("pages/realtime.js")) %>

Loading up the worklist in a browser should now display a notification in the lower right corner stating that the browser has successfully "connected to the real-time data". When the page connects to the amqp-cometd bridge, it validates the SSO token. If the user is not logged in through the SSO then the page will be unable to connect. After validation of the SSO token, the library establishes a temporary queue on the messaging bus. This queue is specific to a particular client and is not shared by any other browser. It keeps an open connection to the browser, used to send messages whenever they enter the queue.

The next step is binding the queue to an exchange to get the desired messages, in thise case note messages. The note messages are sent over the web-application-messages exchange. Bind the queue to the web-application-messages exchange and specify the routing key matcher to filter for only the note messages. Add a binding call to reatltime.js:

    //Bind to the web-application-messages exchange for all messages
    //related to railsdevelopertutorial
    $.harbingerjs.amqp.bind("web-application-messages","railsdevelopertutorial.#",
                            function(bindMessage) { notifier("alert-success","Bound to application stream"); }, //An optional onBind callback
                            function(unBoundMessage) { notifier("alert-danger","No longer bound to application stream"); }); //An optional onUnbound callback

In the call to $.harbingerjs.amqp.bind the first argument specifies the exchange "web-application-messages" and the second argument is the routing key matcher "railsdevelopertutorial.#". The last two arguments are optional callbacks for when the queue is bound and unbound, respectively.

The page is now subscribed to note messages, but nothing is happening when they get recieved. Take action by adding a listener in realtime.js:

    // This is how we update the badge when a message comes in
    function updateBadge(exam_id) {
    var number_of_notes = $("#note-row-" + exam_id + " .note").length
    var badge = $("#data-row-" + exam_id + " .badge")
    badge.text(number_of_notes);
    if (number_of_notes > 0) { badge.addClass("highlight") }
    }


    $.harbingerjs.amqp.addListener(function(rk,payload,exchange) {
    $.ajax($.harbingerjs.core.url("/notes/get"),
           {data: {id: payload.id},
        method: 'get',
        success: function(response) {
            notifier("alert-success","Added note from employee id" + payload.employee_id);
            $("#note-row-" + payload.rad_exam_id + " .notes").append(response);
            updateBadge(payload.rad_exam_id);
        },
        error: function() {
            if (typeof console !== 'undefined') { console.log("Error adding a note from stream",arguments) }
            notifier("alert-error","Failed to add a note from employee id" + payload.employee_id);
        }});
    },"railsdevelopertutorial.#","web-application-messages");//Only listen to the railsdevelopertutorial messages

$.harbingerjs.amqp.addListener takes 3 arguments, the last two optional. The first argument is simply the callback function to run when a message comes arrives. That callback function is given arguments of the message routing key, payload, and source exchange. The second argument to $.harbingerjs.amqp.addListener is a routing key matcher. Note that both the bind and listener have routing key matchers: the bind matcher tells the messaging bus which messages to put in the queue, but you can bind multiple times with multiple matchers. In order to segregate the logic that handles the different types of messages you could receive, listeners have their own matcher to determine if they should run or not. The final argument to $.harbingerjs.amqp.addListener is a matcher on the exchange that the message came from. It is possible that two messages have a similar routing key format, but are sourced from different exchanges: this would cause problems when determining which callback function should handle the message (this argument is rarely needed).

In the callback above, notice the use of the message to make an Ajax call to get the markup for the note and then update the page accordingly, using the id attributes added earlier. Currently, the code to creating a note adds it twice: once from the form submission handler and once from the received message.

This problem can be solved several ways, but this will be a simple approach: check to see if a note is from the logged in user. First, get the employee_id of the logged in user. Edit the WorklistController to add a line in the index method/action:

  @employee_id = Java::HarbingerSdkData::Employee.withUserName(session[:username],@entity_manager).getId

Edit the view app/views/worklist/index.html:

<%= javascript_tag("var logged_in_employee_id = #{@employee_id};") %>

This is not the most elegant solution, but it works and minimizes the number of required Ajax calls. Add an if statement to the AMQP listener callback:

    $.harbingerjs.amqp.addListener(function(rk,payload,exchange) {
    //Check if the logged in employee is the same as the one associated with the note
    //If it's the same person do nothing, if not update the badge, add the note
    //to the page, and create a notification.
    if (logged_in_employee_id != payload.employee_id) {
        $.ajax($.harbingerjs.core.url("/notes/get"),
           {data: {id: payload.id},
            method: 'get',
            success: function(response) {
            notifier("alert-success","Added note from employee id" + payload.employee_id);
            $("#note-row-" + payload.rad_exam_id + " .notes").append(response);
            updateBadge(payload.rad_exam_id);
            },
            error: function() {
            if (typeof console !== 'undefined') { console.log("Error adding a note from stream",arguments) }
            notifier("alert-error","Failed to add a note from employee id" + payload.employee_id);
            }});
    }
    },"railsdevelopertutorial.#","web-application-messages");//Only listen to the railsdevelopertutorial messages

Best Practice - The more an application interacts with real-time messaging to provide page updates for actions external to a users direct interaction, the more challenges like this you will face with traditional JavaScript event handling. There are JavaScript frameworks better suited to this type of functionality, like AngularJS and ember.js. If an application is sufficiently complex, these are better solutions.

Notes are now added to the worklist when any user creates them. If two users are adding notes on the same exam, the interface appears like a comment/chat application, without the complexity and management needed to implement infrastructure for message passing between clients and servers.

Real-time worklist

To transform the current, static worklist into a dynamic, real-time worklist, the plan to accomplish this:

This can be done using the audit exchange. The audit exchange sends every operation that happens to the clinical data repository. Every message moving across the audit exchange has a routing key in the format table_name.operation.primary_key, where table_name is the SQL table name of the affected table, operation is the type of SQL operation (insert, update, or delete), and primary_key is the id of the row affected by the operation. The payload of the message is a JSON data structure of the information in the routing key and the row fields twice, pre- and post-operation. Here's an example:

routing key: rad_exams.update.1234 payload:

{"table_name":"rad_exams",
 "operation":"update",
 "affected_row_id":1234,
 "data_manager_message_log_id":5678,
 "pre_and_post":{"pre": {"accession":"acc123",
             "current_report_id":1,
             "current_status_id":1,
             "exam_priority_id":1,
             "external_system_id":9,
             "first_final_report_id":1,
             "first_prelim_report_id":null,
             "first_report_id":1,
             "id":1234,
             "last_final_report_id":1,
             "last_prelim_report_id":null,
             "order_id":1,
             "patient_mrn_id":2,
             "procedure_id":5,
             "rad_exam_department_id":null,
             "resource_id":3,
             "site_class_id":8,
             "site_id":1,
             "site_sublocation_id":null,
             "updated_at":"2015-10-13 14:26:30Z",
             "visit_id":1
            },
         "post": {"accession":"acc123",
              "current_report_id":2,
              "current_status_id":2,
              "exam_priority_id":1,
              "external_system_id":9,
              "first_final_report_id":1,
              "first_prelim_report_id":null,
              "first_report_id":1,
              "id":1234,
              "last_final_report_id":2,
              "last_prelim_report_id":null,
              "order_id":1,
              "patient_mrn_id":2,
              "procedure_id":5,
              "rad_exam_department_id":null,
              "resource_id":3,
              "site_class_id":8,
              "site_id":1,
              "site_sublocation_id":null,
              "updated_at":"2015-10-13 14:30:00Z",
              "visit_id":1
             }}}

For this application, you need to capture exam changes. First, bind to the audit exchange with a routing key matcher of rad_exams.#. This yields all the changes to rad_exams. Next, a listener to handle the different scenarios described above and finally accompanying server side handling for extra information and markup to update the page.

Server Side

The updates from the audit exchange won't contain a universal_event_type, so a list of the external_system_statuses is needed to link the rad_exams.current_status_id value to the complete status. Add a line to query for those in the index action/method and use the same pattern to get that information in JavaScript that was used earlier to find the logged in user employee_id. Add this to the WorklistController index action:

@status_ids = Java::HarbingerSdkData::ExternalSystemStatus.createQuery(@entity_manager).select(".id").where({".universalEventType.eventType" => "complete"}).list.to_a

Add this to app/views/worklist/index.html:

<%= javascript_tag("var status_ids = #{@status_ids.to_json};") %>

There also need to be a way to update a given .data-row. Rather than build a data structure and logic to update each column of our .data-row, you can instead return the markup for a .data-row. Add a new action to the WorklistController:

def update_exam
  exam = Java::HarbingerSdkData::RadExam.withId(params[:id].to_i,@entity_manager)
  render partial: "worklist/data_row", locals: {exam: exam, notes: Note.for_exam(exam)}
end

You also need a way of adding both a .data-row and .note-row to the table when an exam that in complete status should be added worklist. Add another WorklistController method/action to handle this:

def get_exam
  exam = Java::HarbingerSdkData::RadExam.withId(params[:id].to_i,@entity_manager)
  render partial: "worklist/exam_rows", locals: {exam: exam, notes: Note.for_exam(exam)}
end

Add the new routes we need for these actions/methods:

get 'worklist/get_exam' => "worklist#get_exam"
get 'worklist/update_exam' => "worklist#update_exam"

Note - Restart the rails server anytime you change the routes.

These actions call view partials that have not yet been coded. Divide the current app/views/worklist/index.html into three pieces:

app/views/worklist/index.html:

<% content_for(:javascript_includes, javascript_include_tag("pages/worklist.js")) %>
<% content_for(:javascript_includes, javascript_include_tag("pages/realtime.js")) %>
<%= javascript_tag("var status_ids = #{@status_ids.to_json};") %>
<%= javascript_tag("var logged_in_employee_id = #{@employee_id};") %>
<div class="container-fluid">
  <div class="row">
    <div class="col-xs-12">
      <h1>Ready to read worklist</h1>
      <table class="table table-bordered">
    <thead>
      <tr>
        <th>Accession #</th>
        <th>Patient MRN</th>
        <th>Patient Name</th>
        <th>Completed At</th>
        <th>Age</th>
        <th>Notes</th>
        <% authorized(["radiologist","ai-staff"]) do %>
        <th></th>
        <% end %>
      </tr>
    </thead>
    <tbody>
      <% @exams.each do |exam| %>
      <% notes = Note.for_exam(exam) %>
      <%= render partial: "worklist/exam_rows", locals: {exam: exam, notes: notes} %>
      <% end %>
    </tbody>
      </table>
    </div>
  </div>
</div>

<div id="notifier"></div>

app/views/worklist/_data_row.html.erb:

<td><%= exam.accession %></td>
<td><%= exam.patientMrn.mrn %></td>
<td><%= exam.patientMrn.patient.name %></td>
<td><%= formatd(exam.radExamTime.endExam) %></td>
<td><%= age(exam.radExamTime.endExam) %></td>
<td><span class="badge <%= 'highlight' if notes.size > 0 %>"><%= notes.size %></span></td>
<% authorized(["radiologist","ai-staff"]) do %>
<td class="launch-group">
  <div class="data">
    <div class="integration-definition"><%= exam.imageViewer %></div>
    <div class="exam-object"><%= exam.integrationJson %></div>
  </div>
  <button class="btn btn-xs btn-primary <%= 'disabled' if exam.imageViewer.blank? %>">View Images</button>
</td>
<% end %>

app/views/worlist/_exam_rows.html.erb:

<tr class="data-row" id="data-row-<%= exam.id %>">
  <%= render partial: "worklist/data_row", locals: {exam: exam, notes: notes} %>
</tr>
<tr class="note-row" id="note-row-<%= exam.id %>">
  <td colspan="<%= authorized?(["radiologist","ai-staff"]) ? 7 : 6 %>">
    <div class="notes">
      <% notes.each do |note| %>
      <%= render partial: "notes/note", locals: {note: note, employee: note.employee(@entity_manager)} %>
      <% end %>
    </div>
    <%= form_tag({controller: :notes, action: :create}, {method: :post, class: "form"}) do %>
    <input type="hidden" name="rad_exam_id" value="<%= exam.id %>" />
    <div class="form-group">
      <label>Clinical Note</label>
      <textarea name="note" class="form-control"></textarea>
    </div>
    <input type="submit" class="btn btn-primary" value="Add" />
    <% end %>
  </td>
</tr>

JavaScript

With the controller actions/method and views in place, write the listener and binding calls. Start with the listener, edit app/assets/javascripts/pages/realtime.js:

    //We create a listener callback to handle audit messages related
    //to the rad exams table. This callback updates our worklist depending
    //on what type of operation was performed (insert/update/delete) and the
    //current status of the exam. We have a all "complete" status ids stashed
    //in a variable called "status_ids"
    $.harbingerjs.amqp.addListener(function(rk,payload,exchange) {
    var data_row = $("#data-row-" + payload.affected_row_id);
    if (payload.operation == 'update') { var exam = payload.pre_and_post.post }
    else { var exam = payload.pre_and_post.pre }

    //If the exam is in our worklist
    if (data_row.length > 0) {
        //If the new status of the exam is not "complete" then remove it
        //from the worklist.
        if (status_ids.indexOf(exam.current_status_id) == -1) {
        data_row.next().remove();
        data_row.remove();
        notifier('alert-danger','Removed accession: ' + exam.accession);
        }
        //Otherwise update the information in the data row with the result
        //of a server call (ajax) for the latest markup for the data row
        else {
        //Run update logic
        $.ajax($.harbingerjs.core.url("/worklist/update_exam"),
               {method: "get",
            data: {id: payload.affected_row_id},
            success: function(response) {
                $("#data-row-" + payload.affected_row_id).html(response);
                notifier('alert-success','Updated accession: ' + exam.accession);
                // It's possible, though highly unlikely that a note can be added at
                // between the time of the server side query that populates this update
                // and the time it gets updated on the page. So we update the badge just
                // in case.
                updateBadge(payload.affected_row_id);
            },
            error: function() {
                if (typeof console !== 'undefined') { console.log("Error updating an exam from stream",arguments) }
                notifier('alert-danger','Failed to updated accession: ' + exam.accession);
            }});

        }
    }
    //If there is no entry in our worklist and the status is "complete"
    //the then add it to our worklist by calling out to the server
    //for the table rows (data and notes) we need to add.
    else if (status_ids.indexOf(exam.current_status_id) !== -1) {
        //Run add logic
        $.ajax($.harbingerjs.core.url("/worklist/get_exam"),
           {method: "get",
            data: {id: payload.affected_row_id},
            success: function(response) {
            $("table tbody").append(response);
            notifier('alert-success','Added accession: ' + exam.accession);
            },
            error: function() {
            if (typeof console !== 'undefined') { console.log("Error adding an exam from stream",arguments) }
            notifier('alert-danger','Failed to add accession: ' + exam.accession);
            }});
    }
    },"rad_exams.#","audit"); //Only run this callback with the given matcher and exchange

Best Practice - The order of things matters here: the listener is added first, before the bind, because you want to ensure there is code to handle messages immediately after the bind completes.

Add the binding call to the same file to start the stream of rad_exam messages:

    //Bind to the audit queue with a routing key matcher of "rad_exams.#"
    //This will give us all changes made to the rad_exams table
    $.harbingerjs.amqp.bind("audit","rad_exams.#",
                            function(bindMessage) { notifier("alert-success","Bound to real time stream"); }, //An optional onBind callback
                            function(unBoundMessage) { notifier("alert-danger","No longer bound to real time stream"); }); //An optional onUnbound callback

Best Practice - Theoretically, you could bind to the audit exchange with a routing key matcher of # and the listener would still only ever run on rad_exam messages. This would never pass validation or work in a production environment: there is an extraordinary volume of traffic on the audit exchange, enough to slow or even crash a client web browser.

Summary

Between launching an image viewer with context, real-time note passing, and real-time clinical data updates, this application has developed into a much more useful, realistic worklist.