How do you prototype applications with streaming sensor data?

In this tutorial, we're going to do a rough implementation of Joan DiMicco's conversation clock.

Joan DiMicco's Conversation Clock. Slicker than what we'll build, but a good baseline.

I'm showing you a microphone-based app because it's a sensor stream we all have access to (and an underutilized one, but that's for a different post) - a microphone is a sensor just like any other, streaming values over time, and you can easily swap out this thinking for heartrate, GSR, EEG, etc.

This tutorial covers how to use a pub/sub system for building streaming, sensor-driven applications. We pool data from multiple users in this tutorial, just to show how easy it is, but the possibilities here are endless. If you come up with cool applications or ideas, or just want to chat about this stuff, email me at ffff@berkeley.edu.

Application specs

Given a bunch of people, each with their own microphone, the Conversation Clock visualizes who is talking over time.

Conversation clock features:

  • Figure out who's speaking

  • Display a shared visualization to the group of who is speaking over time

Publish and subscribe

The key ideas here are publish and subscribe. Sensors publish data; everything else subscribes to published data.

When building a sensor app, you can start off by only pushing code to your device once. The code will manage the connection to the sensor, and publish data. This is a good thing: constantly pushing code to devices is tedious.

Now, suppose we want to process sensor data before we send it off to the UI? Well, we can make a whole directed graph with this publish/subscribe pattern:

Architecture for our conversation clock application

You can imagine sensor data coming from the top of this diagram and falling "through" the network. At any step on the route, we can send data to a UI, to a processing node, or both.

In our application, sensor data (from microphones) stream in from our users. A processing node consumes the many streams of sensor data, and produces a single stream representing who is currently speaking. Finally, our UI subscribes to the produced ("condenced") stream, and that's just about it.

S4: Distributed Stream Computing Platform

This is a somewhat complicated example from an academic paper on the stream processing system S4, but the takeaway is simple: we can make a network of nodes that publish and subscribe to streams of data.

NOTE: Many pubsub solutions are out there. You can use some FOSS like socket.io, or roll your own with something like RabbitMQ. For this tutorial, I'm using a free SaSS called Pusher so we don't need to host or setup anything.

Structuring the application

What are the discrete elements of our application?

  1. A sensor stream (client-side) that gives a stream of amplitudes from the user's microphone

  2. A stream processor (server-side) that takes a stream of amplitudes from multiple people and figures out who is talking.

  3. A visualization (client-side) that subscribes to the "who is talking" stream from (2) and visualizes it.

There are still a lot of questions at this point - Given a stream of amplitudes from multiple microphones, how do we figure out who is talking? How do we visualize who is talking? Let's put these questions aside for now and look at how the client-side looks abstractly:

audioContext    = require('audio-context')
$body           = $(document.body)
$body.html      '<h2>share your microphone!</h2>'

# request microphone access
# when we get access to the mic, we execute the callback on a stream of amplitudes from the microphone 
getAccessToMicrophone audioContext, (amplitudesStream) => 
    # show join convo view
    # the cb is executed when user selects a color + hits join
    showJoinConvoView (color) => 
        publish(color, amplitudesStream)
        loudestMicStream = subscribeToLoudestMicStream()
        showConvoViz(loudestMicStream)

You can follow along with the source, here. Also, if you're unfamiliar with callbacks, check out this article on the subject.

What's happening here? First, we make an html 5 audio context and ask the user to share their microphone with us using getAccessToMicrophone(). If the user accepts, this method produces a stream of amplitudes from the user's microphone, and executes our callback on it.

When the stream is made, show the user a joinConvoView, in which they can pick a color that represents them in the visualization. once they pick their color, we publish their stream of data, along with their color, we subscribeToLoudestMicStream - The stream of results from the processing node - and we use this stream to showConvoViz. Again, details on all these steps later. The main UI flow is: (1) get microphone data from user, (2) ask them to pick a color and start sending data, (3) send their data, subscribe to the processing node's stream of results, and visualize that results stream.

Getting access to the microphone

Our first step is getting data from the user's microphone. For that I used the npm module get-user-media:

# takes AudioContext -> returns error or nothing
getAccessToMicrophone = (audioctx, cb) ->
    # getUserMedia shim - get audio, 
    getUserMedia {audio: true, video: false}, (err, stream) ->
        # handle any error
        return err if err
        # get stream of amplitudes from microphone
        amplitudeStream = microphoneAmplitudesStream stream, audioctx
        cb amplitudeStream

We ask for the audio (but not video), and give a callback to be executed when we have a stream of the data from the microphone.

Getting a stream of amplitudes from the microphone

For this, I used this excellent tutorial on getting the amplitude of the microphone input in HTML5. You can explore app/modules/MicrophoneManager/index.coffee in the convo-clock source for details on the implementation.

High-level overview: we turn the microphone stream into a stream of the microphone's amplitude.


amplitudeStream = (scriptNode, analyzerNode) ->
    return Kefir.stream (emitter) ->
        scriptNode.onaudioprocess =  () ->
            emitter.emit getAverageVolume(analyzerNode, array)

microphoneAmplitudesStream = (stream, audioctx) ->
    analyzer = microphoneAnalyzer stream, audioctx
    scriptNode = analyzerScriptNode analyzer, audioctx
    return amplitudeStream scriptNode, analyzer

microphoneAmplitudeStream takes a stream from the user's microphone, and our audio context, and returns a stream of the amplitudes.

Wait - stream?

What's Kefir.stream?

A stream, in this context, is a data structure that represents a value over time. Where normal variable represents many values over time (their value varies), a stream represents the list of all values over time. Put another way, variables represent a value, and change over time; streams have values, which accumulate as the program runs.

Why streams? Well, it can be hard to reason about variables that change frequently. If you have a variable, you never know exactly what it's value will be, but sometimes you have to act like you do: You have to do specific operations on the value to make stuff happen. In sensor apps, values come in at arbitrary times, stop unexpectedly, give garbage values, etc. That's just the reality of working bluetooth, serial ports, etc. This adds up to a lot of if statements, try/catch blocks, etc in an imperative style with mutable variables.

With a stream, we can make general operations on all values (such as filtering them or transforming them), and deliver specific values when we need to.

If this is confusing to you, don't worry. our usage of a stream here will become clear in a minute.

Here, I used the awesome library Kefir), which provides a very performant stream implementation that works in both node and the browser.

For more on streams, James Coglan's excellent talk on why this is so useful. (In fact streams are great for regular, point-n-click interfaces too (think: condensing interface actions fom a stream of keypresses, clicks, etc!) For a slightly different, but related, kind of stream, check out substack's stream handbook).

Sharing the data

Now that we have a stream of amplitudes and the user has selected a color to represent themself, it's time to publish our user's data:

$           = require 'jquery'
postJson    = require 'post-json-nicely'

# data sharing
# samples myAmplitudeStream and posts data. returns nothing.
publish = (myColor, myAmplitudeStream) ->
    postReading = (json) -> postJson $, 'http://collection-server-url/', json
    post = (a) -> postReading 
        type: 'microphoneAmplitude'
        amplitude: a
        color: myColor
    myAmplitudeStream.throttle(500).onValue((amplitude) -> post(amplitude))

We publish data to a collection-server endpoint. I wrote the collection-server to reduce friction of publishing: by providing a simple POST endpoint for json data, we can get data from any net-connected environment under the sun. The collection-server requires that your data has a type field, as we use that string to subscribe to the published data elsewhere.

A collection server could do a number of things - it could log certain data (or everything), it could serve as a first-pass processing hub (e.g. doing FFTs on EEG data), etc. For now, it just takes the data we POST and publishes them.

Figuring out who is talking

Each of our users is streaming us their microphone data, and we need to figure out who is talking.

First, let's subscribe to the stream of mic data. This is bouncing off our collection server, but we don't need to worry about that! Again, I'm using Pusher pubsub service.

# pusher config
pusherClient = new PusherClient({
    appId: config.PUSHER_APP_ID
    key: config.PUSHER_KEY
    secret: config.PUSHER_SECRET
    encrypted: config.IS_PUSHER_ENCRYPTED
})

Notice how we used the type field from our POSTed data to subscribe. This is a convenience provided by the collection-server.

The simplest way to figure out who is talking is to figure out who's microphone amplitude is the loudest. (I'm sure there are smarter ways, but simplicity is nice).

Every time a microphone amplitude reading comes in, let's add it to the buffer, and keep it there for 300 ms:

delay = (t, cb) -> setTimeout cb, t
addToBuffer = (data) ->
    unixtime = Date.now()
    buffer[unixtime] = data
    # keep items in buffer for 300 ms
    delay 300, () -> delete buffer[unixtime]

Now, every 300ms, we'll just figure out what the loudest amplitude in the buffer is, and send that reading off to everyone:

findLoudest = (buffer) -> return _.max buffer, (reading) -> reading.amplitude
repeatedly  = (t, fn) -> setTimeout fn, t
repeatedly 300, () -> 
    # get the loudest reading
    loudest = findLoudest()
    # set its type for the pubsub
    loudest.type = 'loudestMicrophoneReading'
    # post it
    postReading loudest

Notice here how we've changed the type of the reading to something other than micAmplitude. This lets us subscribe to it back on the client side.

Visualizing the conversation

Back on the client-side, let's subscribe to the stream of data we just produced:

Kefir = require 'kefir'
Pusher = require 'pusher-js'

# subscribes to view events and returns a stream of them
subscribeToLoudestMicStream = () ->
    # a faucet for subscribing to events
    faucet = () -> new Pusher('YOUR_PUSHER_API_KEY', encrypted:true).subscribe('everything')
    # subscribes to loudestMicrophoneReading event & calls emitter.emit on each value it gets
    loudestReadingEvents = (emitter) -> faucet().bind('loudestMicrophoneReading', emitter.emit)
    # subscribe to 'loudestMicrophoneReading' from the pubsub
    Kefir.stream loudestReadingEvents 

Again, notice how we subscribed to the string we put in the type field on the processing server - 'loudestMicrophoneReading'. And we return a Kefir stream of values from the processing server.

Now, let's define showConvoViz:

showConvoViz = (loudestMicrophoneStream) ->


setup = (loudestMicStream) ->

    reading = (amplitude, color) ->
        return $('<div class = "reading"></div>')
            .css('background-color', color)
            .css('width', amplitude+'px')
            .css('height', '3px')

    $body       = $(document.body)
    addReading  = (r) -> $body.prepend reading(r.amplitude, r.color)

    # add a reading a loudest mic value comes in
    loudestMicStream.onValue (r) -> addReading r

    # clear the body html to begin
    $body('')

This is about as simple as we can get - you can spruce it up! A couple things we might do to make this better:

  • Figure out when no one is talking, and display nothing.

  • Make the visualization a bit cooler, more like DiMicco's (at the top of this post).

  • Let multiple people's microphones overlap one another.

Conclusion

That's it for this tutorial! If you haven't already clone the repo and set it up for yourself!

From here, you can make all sorts of sensor apps. A few points to keep in mind:

  • Whatever talks to your sensor, keep it separate from everything else. You'll have to deal with reconnects, low battery warnings, poor signal quality, errors you never saw in testing, etc.... You don't want that complexity to leak into the rest of your app. So, whatever routine talks to your sensor, it should only talk to your sensor. Publish that over the pubsub, and do the rest of your logic elsewhere!

  • Log everything when you're debugging! Hook something up to your PubSub that does nothing but store everything that comes through. I recommend CouchDB as you can use straight JSON with it, no schema, but your mileage may vary. It's important to record stuff so you can open it up, run models, train and test classifiers, and even do playbacks to see if your streaming analyses work. Which brings me to my final point...

  • The best way to test your UI is by doing playbacks/replays of stored data. If you can play back or inject sensor data you recorded previously, you can get a real sense for how your UI looks in different conditions. Over a development cycle, this will save you a lot of time. Compare to hooking up all your devices, and hoping to see something informative as you test. It's good to do now and again, but not every time you make a change to your UI!

  • Sensor applications are distributed systems. All of your clients are like separate processing nodes, and establishing a coherent timeline of events across users is hard. I don't touch this issue in this tutorial, but it's worth noting. If you're aggregating data from multiple devices, you'll want to be careful when making assumptions about when data will arrive at any given place. Soon, I hope to write more about this problem, and how our group dealt with it in our synchronized brainwave recordings project.

If you have questions, application ideas, or want to chat about engineering and designing applications with sensors, email me at ffff@berkeley.edu.