Using Azure Stream Analytics with IoT Devices

Azure Stream Analytics is Microsoft’s PaaS (platform-as-a-service) event-processing engine that allows you to analyze and process large volumes of streaming data from multiple incoming sources. You can configure different input sources including IoT devices, sensors or business applications for data ingestion. Delivery outputs can also be configured to send the processed data to those destinations for performing actions or further analytics.

Overview

Azure Stream Analytics works on the concept of jobs. These jobs consist of one or more inputs, a query and an output. Stream Analytics ingests incoming data from one or more of these configured inputs and then a query is applied to this data to filter, sort, aggregate or join other reference data over a period of time. This transformed data is then sent to the configured output for storage or further downstream processing.

In this article, we will be using an example where I have a physical IoT Device (an MXChip dev kit) sending temperature data for processing.

Resource Creation

You can create an instance of Stream Analytics in the Azure Portal. Login to your portal at https://portal.azure.com. Once authenticated, click on Create a Resource to start the creation process.

streamanalytics1
Figure 1: Azure Services

Enter Stream Analytics Job in the search box and press Enter or click the magnifying glass.

streamanalytics2
Figure 2: Creating Stream Analytics Job Resource

This will return a list of results that match your entry. Select Stream Analytics Job to create a new instance of Stream Analytics. A description page will appear describing what Stream Analytics is, along with available pricing plans and documentation resource links. Click Create to begin the creation process.

streamanalytics3
Figure 3: Stream Analytics Job Overview

You need to provide some details when creating a Stream Analytics job in Azure. First, you will need to specify a name for your Stream Analytics instance. This name only needs to be unique to your subscription.

There is also a dropdown for selecting the Azure subscription to create the Stream Analytics instance under. It is auto populated with your current subscription but if you have multiple subscriptions you can select from one in the list.

You also need to specify a resource group that the Stream Analytics instance is hosted under. You can either select from a list of existing resource groups or click Create New to create a new resource group immediately and use that for your Stream Analytics instance.

Location specifies where you will be creating the Stream Analytics instance. Choose the location nearest to your physical location for best performance.
Hosting environment specifies where the Stream Analytics job will be deployed. Cloud specifies that the job will be hosted in the cloud (aka – the Azure Portal) while Edge specifies an on-premise IoT Edge gateway device.

Streaming Units determine the number of computational resources the Stream Analytics job will use when processing a query. Since the jobs perform all processing in memory, increasing this value will increase the amount of CPU and memory resources to process queries. This can be useful for scaling up and providing low latency stream processing.

streamanalytics4
Figure 4: Create Stream Analytics Job Details

Once created, your Stream Analytics job is stopped by default. In order to configure your job for processing you need to create three (3) things:

  • Inputs
  • Queries
  • Outputs

Let’s review each of these in detail.

Inputs

The first thing to configure are inputs. Inputs define a connection to an existing data source, and Stream Analytics accepts incoming data from these inputs. You can choose from three different input types: Azure Event Hubs, Azure IoT Hub, or Azure Blob Storage.

You can navigate to the Inputs page by clicking Inputs under the Job Topology section (on the left). This page will display a list of configured inputs while also allowing you to create a new input.

streamanalytics5
Figure 5: Stream Analytics Inputs

There are also two types of inputs: Stream and Reference. Stream inputs define an unbounded sequence of data events over time. This is the stream of input data coming from an input source and Stream Analytics requires at least one of these input types to be defined.

Reference inputs define a static set of data or data that slowly changes. This input type is typically used for data lookups or data correlation and is optional depending on your needs. An example would be having a lookup table that you would join with data from your stream input. Currently Azure Blob Storage and Azure SQL Database are supported as input sources for reference data.

For our example, we will be connecting a stream input of type IoT Hub to an existing Azure IoT Hub instance. You are prompted for an input alias name for this input source, along with selection of the subscription and IoT Hub you are specifying. There are also entries to specify IoT Hub routing endpoint to use as well as event serialization format and encoding.

streamanalytics6
Figure 6: Adding IoT Hub Input

Clicking Save will create the input and run a connection test to verify that Stream Analytics can connect to the input properly.

Outputs

Outputs must also be configured in order for the Stream Analytics job to function properly. Stream Analytics needs to send the transformed/processed data to a destination. Configuring one or more output satisfies this requirement.
There are a number of different outputs you can choose from, and are listed here:

  • Event Hub
  • SQL Database
  • Blob Storage/Data Lake Storage Gen2
  • Table Storage
  • Service Bus Topic
  • Service Bus Queue
  • Cosmos DB
  • Power BI
  • Data Lake Storage Gen1
  • Azure Function

Additional information is available for each of these types here.

streamanalytics7
Figure 7: Stream Analytics Outputs

For our example I will be configuring an output to Cosmos DB. Adding an output of this type displays a blade that allows entry of the details of the output being created. You need to enter an alias name for the output, along with the choice of entering the Cosmos DB setting manually or selecting from your Azure subscription. We are selecting an existing Cosmos DB resource that was created previously. Please note that you must have previously created a container in Cosmos DB before specifying the Container Name and Document Id values.

streamanalytics8
Figure 8: Adding CosmosDB Output

Queries

Once you have one or more inputs and outputs configured, you can now create a query to process the incoming data. Queries are the mechanism used by Stream Analytics to transform the incoming data in real time. These queries are written using Stream Analytics Query Language which are similar to SQL statements.

Structurally the query has a SELECT statement which selects data from one or more configured inputs and uses an INTO clause to specify an output to emit the transformed data to. There are also FROM, JOIN and WHERE clauses which help specify and filter the incoming data and perform processing against it.

In our example, when you click on the Query section under Job Topology, a page is displayed that shows the current query for this Stream Analytics job. You are able to edit in the window to specify your select query statements. Notice that we are using SELECT * wildcard to specify all fields from the input source and saving those to the specified output.

streamanalytics9
Figure 9: Stream Analytics Queries

You can see preview data displayed from our input source (IoT Hub device):

streamanalytics9-1
Figure 10: Input Preview Data

This querying editor also has the ability to perform discrete field selection and filtering. So, for example if we wanted to specify certain fields we could modify our query to look like this:

streamanalytics10
Figure 11: Discrete Field Selection in Queries

Additionally, if we wanted to filter the incoming data we have that ability as well. For example, if we wanted to exclude any incoming temperature data that did not exceed 100 we could specify a filter like this:

streamanalytics11
Figure 12: Query Filtering

Once we have the inputs, outputs and queries configured we are ready to turn on our Stream Analytics job. Looking at the Overview page we can see at a glance the configured inputs, outputs and what the query is specified as.

When we start the Stream Analytics job and wait a few minutes, we should begin to see data coming in and being processed. The Monitoring and Resource Utilization graphs would show some data coming through as it is being processed.

streamanalytics12
Figure 13: Stream Analytics Graphs

We can also see our data appearing in our CosmosDB database, as shown here:

streamanalytics13
Figure 14: CosmosDB Data Records

Conclusion

In conclusion, Azure Stream Analytics provides the ability to process complex events in real time, with the purpose of providing real time analytics to incoming data sources. We also showed an example on how to create and configure Azure Stream Analytics to accept incoming IoT device telemetry data and process and save it into a CosmosDB database. I hope this article has been helpful and please click here for more information.

Cheers!

Using IoT on a Beer Kegerator

Being born and raised in the great state of Wisconsin, beer has been a part of most of my adult life. Couple that with my love of technology, I always wondered how I could leverage some cool tech with a beer theme. Since the proliferation of inexpensive hardware and the Internet of Things (IoT), it has now become easy (and cheap!) to provide solutions that can be used to monitor (among other things) beer-related activities. This article will describe and detail the steps I took to create a solution for monitoring beer consumption on a beer kegerator.

The first thing I needed to do before building anything is to understand and design what it is I wanted to build. Since I want to monitor beer consumption from a kegerator, I needed to draw out the major parts of my solution. Once I know that I can then begin to build and test the different parts of the system. The drawing below shows the major parts of my solution:

1

As you can see, when someone taps a beer from the kegerator, an inline flow meter sensor sends information to an IoT device, which then processes the information and sends it to the cloud, where it is stored for data analysis.

Now that I have an idea of my overall architecture, I can begin to think about what hardware and software I need to create my solution.

 

Hardware

For hardware, I chose to use a Raspberry Pi as my IoT device. The Pi is a low-power, inexpensive device that met my needs for this project (built-in ethernet network, multiple GPIO pins, easy to install apps). Please note that I also considered using the ESP8266 chip for this project – this little chip is great for simple IoT project as it’s really cheap, has built-in wireless networking (with a full TCP/IP stack!), and multiple GPIO pins for use. The main drawback for this project is that this chip only provides 3.3v for power and I needed 5v for the flow sensor, so it was easier to use the Pi. The other drawback is that I can’t install Windows 10 IoT Core on the ESP8266, so using a Pi simplified my design.

The other piece of hardware I need is a flow sensor to measure the flow of beer through the line when it’s being tapped. Initially I chose a really cheap sensor designed for coffee-makers but found out that these won’t work for measuring beer flow (see Testing section), so I went with a more expensive sensor. I chose the Swissflow SF-800 (link), which is about $60 USD. This flow sensor sends digital pulses when a liquid is flowing through it, so that allows me to measure how much beer is being dispensed. This sensor requires +5Vdc to power it properly, so that required me to use a Raspberry Pi (which also provides +5Vdc).

 

Software

The software selections I made were driven (in part) by my hardware choices, but also by what apps I wanted to provide. I wanted to have an app that runs on the Raspberry Pi and processes the incoming pulse data from the SF-800 sensor and then send that data to Azure. I also wanted this app to have a user interface that displayed how much beer is left in the current keg, along with the ability for an administrator to “reset” the app (when the keg is empty and is changed out for a full one).

Windows 10 IoT Core provides the operating system for the Raspberry Pi, and this also allows me to easily deploy and manage any apps I want running on the device. Please review this link on how to install Windows 10 IoT Core on the Raspberry Pi.

2

The app that I am creating for this solution is a Universal Windows Platform (UWP) app and is designed for running on IoT devices that have Windows 10 IoT Core on them. This app will process the incoming digital pulses from the SF-800 and send them to Azure IoT Hub.

3

The following code snippet shows how I receive the incoming digital pulses from the SF-800 flow sensor. I have this sensor connected to GPIO pin 5 from the Raspberry Pi so that when the value on that pin changes it triggers an event in my app to signal that a pulse was sent by the SF-800.

4

I also have a timer on another thread that ticks every 0.5 second and looks to see if any incoming pulses have been received by the SF-800 flow sensor. If there have, it sends them off to Azure IoT Hub for storage.

5

The software in the Azure cloud that I will be leveraging is Azure IoT Hub, Stream Analytics and Azure SQL. Azure IoT Hub provides the mechanism to receive incoming telemetry data from my IoT device and route it for processing and storage. I am having Azure IoT Hub route my data to Stream Analytics, which then will process it and save it in an Azure SQL database. Once in the database, I am free to consume it in a number of ways, such as PowerBI or any custom app that can consume data from SQL.

6

As incoming telemetry data is received from the Raspberry Pi, Azure IoT Hub receives that data and Stream Analytics is used to process that incoming data and save it in an Azure SQL database. This is done through the Stream Analytics interface by setting up and input (Azure IoT Hub) and an output (Azure SQL database) and configuring a query to do any processing needed at that time.

7

Testing

Once I created the software components and connected the hardware that I have, it is time to test the functionality of my solution. I first tested the solution by connecting my Raspberry Pi (with my UWP app installed) to a breadboard where I have the SF-800 flow sensor connected. I also have a couple of LEDs to indicate a heartbeat pulse (green) and to indicate flow sensor pulses (red).

8

I configured Azure IoT Hub and started my Stream Analytics job so that incoming data from my IoT device will be received and processed properly. Testing this way involved blowing air through the SF-800 device (I used my breath – GENTLY!), making sure the air flow was in the proper direction (going the wrong way can damage the sensor).

Once I knew this was working I wanted to validate the accuracy of the digital pulses of the SF-800. To do this, I got some plastic tubing of the same size being used in the kegerator along with a funnel. I then measured out 1 cup of water and then proceeded to pour it through the flow sensor while everything was running.

9

Deployment

Now that I have tested my solution, it is ready for deployment! This included placing the flow sensor inline with the actual kegerator tubing on the line I wanted to monitor. I still kept the breadboard as this was not a fully productized solution (meaning I didn’t create the wiring on a PCB).

10

I encountered a testing issue I failed to realize until after I deployed my solution for the first time. I was originally using a cheap flow sensor designed for coffee makers, and when I deployed this to the beer line I noticed that it made the beer foam as it was passing through the sensor. This was something I didn’t test for prior to deployment so it forced me to rethink my design (and what sensor to use). I eventually found the SF-800 sensor and this worked much better when I deployed it with my solution.

In conclusion, now that this solution is connected to the kegerator, I can monitor how much beer is left in the current keg! I can also enhance my solution by leveraging an Azure Webjob to send an email notification when the keg is getting low. How great is that? No more tapping a beer just to find out that there isn’t any left!

Cheers!

11