Last Time we looked at how to get data from an IoT device and start pushing it up to Azure, now it’s time for the next step, processing the data as it comes in.
I mentioned in the solution design that the processing of the data would happen with Azure Functions so let’s have a look at how that works.
Processing Data with Functions
Azure Functions has built in support for processing data out of IoT Hub which makes it really easy to integrate. The only drawback of this is that it monitors the built-in event hub that’s provided by IoT Hub and if you have multiple data structures being submitted (like I do) your Function will become complex. Instead I’m going to use the Event Hub binding.
Designing Functions
When designing functions, or serverless in general, you want to keep them as small as possible; our goal isn’t to create a serverless monolith! This means that part of the design requires you to think about what the role your functions will be playing. For me, they will be responsible for converting the JSON payloads that are sent from the IoT device into a structure that is stored in Table Storage. If we think about the APIs I described in the data downloader post there is one endpoint, livedata
, that provides me with the bulk of the data needed for capture.
After a bit of inspection of the real API I noticed that there were 3 buckets that this data could be represented in:
- Panel data
- Point-in-time summary data
- Miscellaneous data
I made the decision to store each of these as separate tables in Table Storage (to understand more, check out the post on data design). Since they all come from the same originating structure I could do it all in a single function, but instead I split it into 3 functions, keeping each as lightweight as possible.
Writing Our Function
The Functions are implemented in F# (here’s how to set that up) and I’m using the FSharp.Azure.Storage NuGet package to make working with the Table Storage SDK more F# friendly.
Note: If you’re going to use that NuGet package in F# Azure Functions you’ll need to be really careful on the versions that you’re depending on. Since Functions internally uses Table Storage there’s a potential to bring in conflicting versions that results in errors. I solved this with very explicit pinning in my paket.dependencies
file.
We’ll start by defining the Record Type that will be stored in Table Storage:
|
|
On the Record Type I’ve added attributes to represent which members will be the Partition and Row keys in Table Storage, which makes it nicer for me to work against the object model if I require. This type is used to represent the data for a single group of panels in my solar setup and gives me a view of the inbound values across Volts, Watts and Current.
To define the Function itself we create a member in the module:
|
|
We attribute the member (which I’ve called trigger
) with FunctionName
so the Functions host knows to find it and knows what name to give it. Unfortunately, you’ll need to explicitly state the type, F# won’t be able to infur the type based on usage of the complex types in the binding (at least, not in my experience).
This Function has 3 inputs to it, the first is the Event Hub binding, which binds to an Event Hub named live-data
using the Consumer Group Panel
(see the Solution Design section Handling a Message Multiple Times for why I use Consumer Groups). We’ll also use the EventData
type for the input, not a string
, so we can access the metadata of the message, not just the body (which is what comes in when the type is string
). Next up is the output binding to Table Storage, bound as a CloudTable
, which provides me with interop with FSharp.Storage.Data
. Lastly is the ILogger
so I can log out messages from the Function.
Unpacking the Message
It’s time to start working with the message, and for that I need to extract the body (and strongly type it with a Type Provider) and get some metadata:
|
|
The EventData
object gives us access to the body of the message as an ArraySegmentArray
property. This is a UTF8 encoded byte array so we have to decode that to the string of JSON (or whatever your transport structure was). Then, because we have access to the whole message, not just the body, we can access the additional properties that are put into the message by the downloader, the CorrelationId
and MessageId
.
Because the data points comes up as an array of key/value pairs I created a function to find a specific point’s value:
|
|
And then use partial application to bind the parsed data to it:
|
|
Writing to Storage
Because I need to write 2 panel groups to storage I created a function in the Function to do that:
|
|
This created the record type using the panel number passed in (let! _ = storePanel "1"
is how it’s called) before handing it over to the Insert
function from my external library. But FSharp.Azure.Storage
library is designed to work with the client from the SDK, and convert that into a CloudTable
, it’s not 100% optimised for using in Azure Functions, this is an easy fix though, here’s a function to handle that:
|
|
Finally, because we’re using F#’s async workflows and the Azure Function host only handles Task<T>
(C#’s async) we need to convert it back with Async.StartAsTask
:
|
|
And with that, for each message we received 2 entries are written to Table Storage.
Conclusion
I won’t go over each of the functions in the project as they all follow this same pattern, instead, you can find them on GitHub.
While it might feel a little like a micromanagement of the codebase by having a whole bunch of functions with less than 50 lines in them, it makes them a lot simpler to maintain and editable as you iterate the development. It also makes it scale very nicely, which I’ve found a few times when I’ve accidentally disabled the functions for 24 hours and had a huge backlog in Event Hub to process, it makes quick job of it!
I hope this gives you a bit of an insight into how you can create your own Azure Functions stack for processing data, whether it’s from an IoT device, or some other input stream.