Azure Stream Analytics is Microsoft’s PaaS (platform-as-a-service) event-processing engine that allows you to analyze and process large volumes of streaming data from multiple incoming sources. You can configure different input sources including IoT devices, sensors or business applications for data ingestion. Delivery outputs can also be configured to send the processed data to those destinations for performing actions or further analytics.
Azure Stream Analytics works on the concept of jobs. These jobs consist of one or more inputs, a query and an output. Stream Analytics ingests incoming data from one or more of these configured inputs and then a query is applied to this data to filter, sort, aggregate or join other reference data over a period of time. This transformed data is then sent to the configured output for storage or further downstream processing.
In this article, we will be using an example where I have a physical IoT Device (an MXChip dev kit) sending temperature data for processing.
You can create an instance of Stream Analytics in the Azure Portal. Login to your portal at https://portal.azure.com. Once authenticated, click on Create a Resource to start the creation process.
Enter Stream Analytics Job in the search box and press Enter or click the magnifying glass.
This will return a list of results that match your entry. Select Stream Analytics Job to create a new instance of Stream Analytics. A description page will appear describing what Stream Analytics is, along with available pricing plans and documentation resource links. Click Create to begin the creation process.
You need to provide some details when creating a Stream Analytics job in Azure. First, you will need to specify a name for your Stream Analytics instance. This name only needs to be unique to your subscription.
There is also a dropdown for selecting the Azure subscription to create the Stream Analytics instance under. It is auto populated with your current subscription but if you have multiple subscriptions you can select from one in the list.
You also need to specify a resource group that the Stream Analytics instance is hosted under. You can either select from a list of existing resource groups or click Create New to create a new resource group immediately and use that for your Stream Analytics instance.
Location specifies where you will be creating the Stream Analytics instance. Choose the location nearest to your physical location for best performance.
Hosting environment specifies where the Stream Analytics job will be deployed. Cloud specifies that the job will be hosted in the cloud (aka – the Azure Portal) while Edge specifies an on-premise IoT Edge gateway device.
Streaming Units determine the number of computational resources the Stream Analytics job will use when processing a query. Since the jobs perform all processing in memory, increasing this value will increase the amount of CPU and memory resources to process queries. This can be useful for scaling up and providing low latency stream processing.
Once created, your Stream Analytics job is stopped by default. In order to configure your job for processing you need to create three (3) things:
Let’s review each of these in detail.
The first thing to configure are inputs. Inputs define a connection to an existing data source, and Stream Analytics accepts incoming data from these inputs. You can choose from three different input types: Azure Event Hubs, Azure IoT Hub, or Azure Blob Storage.
You can navigate to the Inputs page by clicking Inputs under the Job Topology section (on the left). This page will display a list of configured inputs while also allowing you to create a new input.
There are also two types of inputs: Stream and Reference. Stream inputs define an unbounded sequence of data events over time. This is the stream of input data coming from an input source and Stream Analytics requires at least one of these input types to be defined.
Reference inputs define a static set of data or data that slowly changes. This input type is typically used for data lookups or data correlation and is optional depending on your needs. An example would be having a lookup table that you would join with data from your stream input. Currently Azure Blob Storage and Azure SQL Database are supported as input sources for reference data.
For our example, we will be connecting a stream input of type IoT Hub to an existing Azure IoT Hub instance. You are prompted for an input alias name for this input source, along with selection of the subscription and IoT Hub you are specifying. There are also entries to specify IoT Hub routing endpoint to use as well as event serialization format and encoding.
Clicking Save will create the input and run a connection test to verify that Stream Analytics can connect to the input properly.
Outputs must also be configured in order for the Stream Analytics job to function properly. Stream Analytics needs to send the transformed/processed data to a destination. Configuring one or more output satisfies this requirement.
There are a number of different outputs you can choose from, and are listed here:
- Event Hub
- SQL Database
- Blob Storage/Data Lake Storage Gen2
- Table Storage
- Service Bus Topic
- Service Bus Queue
- Cosmos DB
- Power BI
- Data Lake Storage Gen1
- Azure Function
Additional information is available for each of these types here.
For our example I will be configuring an output to Cosmos DB. Adding an output of this type displays a blade that allows entry of the details of the output being created. You need to enter an alias name for the output, along with the choice of entering the Cosmos DB setting manually or selecting from your Azure subscription. We are selecting an existing Cosmos DB resource that was created previously. Please note that you must have previously created a container in Cosmos DB before specifying the Container Name and Document Id values.
Once you have one or more inputs and outputs configured, you can now create a query to process the incoming data. Queries are the mechanism used by Stream Analytics to transform the incoming data in real time. These queries are written using Stream Analytics Query Language which are similar to SQL statements.
Structurally the query has a SELECT statement which selects data from one or more configured inputs and uses an INTO clause to specify an output to emit the transformed data to. There are also FROM, JOIN and WHERE clauses which help specify and filter the incoming data and perform processing against it.
In our example, when you click on the Query section under Job Topology, a page is displayed that shows the current query for this Stream Analytics job. You are able to edit in the window to specify your select query statements. Notice that we are using SELECT * wildcard to specify all fields from the input source and saving those to the specified output.
You can see preview data displayed from our input source (IoT Hub device):
This querying editor also has the ability to perform discrete field selection and filtering. So, for example if we wanted to specify certain fields we could modify our query to look like this:
Additionally, if we wanted to filter the incoming data we have that ability as well. For example, if we wanted to exclude any incoming temperature data that did not exceed 100 we could specify a filter like this:
Once we have the inputs, outputs and queries configured we are ready to turn on our Stream Analytics job. Looking at the Overview page we can see at a glance the configured inputs, outputs and what the query is specified as.
When we start the Stream Analytics job and wait a few minutes, we should begin to see data coming in and being processed. The Monitoring and Resource Utilization graphs would show some data coming through as it is being processed.
We can also see our data appearing in our CosmosDB database, as shown here:
In conclusion, Azure Stream Analytics provides the ability to process complex events in real time, with the purpose of providing real time analytics to incoming data sources. We also showed an example on how to create and configure Azure Stream Analytics to accept incoming IoT device telemetry data and process and save it into a CosmosDB database. I hope this article has been helpful and please click here for more information.