Ingest DCR-based custom logs in Microsoft Sentinel with Logstash

The "Swiss army knife" is back! And better than ever!

Koos Goossens
12 min readDec 9, 2022
While creating this composition in Photoshop, I couldn't help myself constantly thinking about 'Pickle Rick' 🙈

UPDATE

I’ve recently worked on the source code for the Microsoft Sentinel / Log Analytics output plugin (together with a fellow-MVP) and we’ve implemented Managed Identity support. This completely eliminates the use of secrets altogether and also makes this key rotation mechanism obsolete.

I’d recommend not using application IDs and secrets any longer, and make use of Managed Identities instead.

Read more about this update, and how to use it here.

It's back!

Already more than 2,5 years ago, I wrote about Logstash for the first time. Then I explained how you can leverage this versatile open-source tool to your benefits, in combination with Microsoft Sentinel, to ingest custom logs.

Fun fact: Microsoft Sentinel was still called Azure Sentinel back then. And before that Microsoft named it "Security Insights", a name which still echoes deep inside the trenches of the Azure cloud. That's why we're still referring to the latter when deploying Sentinel via ARM templates for example.

In the last few months quite a lot of new features, regarding log ingestion for Sentinel were released. One of them is a completely reworked ingestion pipeline for custom logs providing lots of new abilities. Like on-the-fly transformation and filtering, and making use of a new 'basic' logs tier. About a week ago these features became general available.

If you want to learn more about ' basic' (and 'archive') logs, and how the new log ingestion pipeline makes use of new Azure resource like Data Collection Endpoints and Data Collection Rules, please take a closer look at my earlier article about the subject.

Ingesting custom logs through Data Collection Endpoints (DCE) and Data Collection Rules (DCR) has already been added to the Azure Monitoring Agent (AMA). (Albeit still in public preview) But there was one particular solution I was really looking forward to see updated. And that is an updated version of the Logstash output plugin Microsoft released in the past.

Since I helped a global enterprise implementing this solution at scale, I was constantly reaching out to several contacts within Microsoft to ask them if this was still something that was on their roadmap. Because there was no mention of this at all, for a short while I thought that Microsoft wasn't interested in updating this plugin at all.

So it remained silent…. (insert crickets 🦗) Until, one day I was handed a confidential .ZIP file with a question if I’d be willing to try something out! And now that this feature is no longer under NDA, I can finally share my thoughts about the updated plugin for Logstash!

Why is this such a big deal?

Well at least to me it is. 😉 This is because larger companies tend to have quite a few reason why they would favor Logstash over other "built-in" solutions Microsoft offers. The solution might require a little bit more effort, but in my opinion this tradeoff is definitely worth it.

Logstash pros over standard log collectors:

  • Logstash can easily run inside a container on your favorite container platform (such as Azure Kubernetes Services) instead of having to deploy (and manage!) Virtual Machines. In my previous article about Logstash I went into more detail about creating a Logstash container. This makes it easy to create a highly available, more resilient log collector.
  • With multiple pipelines inside Logstash, it's easier to define multiple listeners (inputs) to forward multiple sources to different tables (outputs) in Microsoft Sentinel.
  • Logstash can filter on-the-fly log ingestion before it is send out to the Microsoft agent pushing it into Sentinel. This is huge! Especially for network logs.
  • With Logstash's wide variety on input plugin, the type of sources you can collect logs from are essentially endless. I've described PostgreSQL data ingestion in the past.
Schematic overview of Logstash with multiple 'pipelines' each with their own inputs, filters and outputs

But by still using the older output plugin, these customers were missing out on new features such as 'basic' logs and transformations. (which is arguable a waaaay better user experience than creating Grok patterns in Logstash)

No more unsafe distribution of keys

Another thing that the new data ingestion via DCE/DCR does much better is authentication!

Companies with multiple physical locations will probably want to upload some logs from network components to Sentinel. These can be firewalls, proxies and other important public facing devices. Like application delivery controllers such as Citrix ADX. Those physical locations might not always be managed by the same team(s). So in practice they'll end up distributing the workspace ID and key to one another to configure these inside their syslog forwarders. (previously running Microsoft Management Agent (MMA) or Logstash)

With this new approach a separate dedicated identity can be used to ingest data into each and every table. This means the sender can only send data to your workspace, and no longer also be able to read data as well. The workspace key can be kept in a safe place again, and shouldn't be shared easily any longer.

Installation

Let's use a VM this time and deploy a standard Ubuntu image in Azure.

You can either download Logstash and install it manually, or use a package manager instead. Elastic points out that if you choose the latter, you need to run Logstash as a background service or your are required to provide a configuration path when running interactively. I'll show both examples at the end of this article.

Elastic's instructions unfortunately makes you add the public signing key to apt-key which is soon to be depricated. So please use the following commands instead:

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/elastic.gpg >/dev/null

echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-8.x.list >/dev/null

sudo apt-get update && sudo apt-get install logstash=1:8.14.1-1

sudo apt-mark hold logstash

Note that I'm specifically using Logstash version 8.14. This is because Microsoft's output plugin currently does not support newer versions! Please read more about the latest supported version on Microsoft's documentation. Therefore I'd also advise to put the Logstash package on hold for automatic updates, as shown in the shell commands above.

This will install the Logstash binaries in /usr/share/logstash/bin/ and the configuration files in /etc/logstash/ where your pipelines are expected to reside inside /etc/logstash/conf.d/ with a .conf extension.

You OBVIOUSLY want to make your Linux login experience a little prettier by adding a little bit of ASCII art 😎

Install the Microsoft Sentinel output plugin

Next, we need to install the plugin for Logstash:

sudo /usr/share/logstash/bin/logstash-plugin install microsoft-sentinel-logstash-output-plugin

And let's create a temporary folder for our sample log file, which we'll be needing later:

sudo mkdir /tmp/logstash

Prepare a sample log file

With the new DCR-based log tables approach, you need to create the table beforehand and determine its schema, before you ingest data into it.

That is why we need to create a sample of our logs in a .json format before we can prepare our resources in Azure. Luckily, Microsoft has provided an option inside the output plugin to first save one or more samples, before using it to push the date into Log Analytics.

Create your first Logstash pipeline:

sudo nano /etc/logstash/conf.d/syslog-to-dcr-based-sentinel-sample.conf

And input the following:

input {
syslog {
port => 514
}
}

output {
microsoft-sentinel-logstash-output-plugin {
create_sample_file => true
sample_file_path => "/tmp/logstash/"
}
}

As you can see, I'll be using syslog to listen on port 514. If Logstash will be running, and it will receive syslog messages, it will convert and save them into the temp folder provided.

Run Logstash interactively by running:

sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog-to-dcr-based-sentinel-sample.conf

After a couple of seconds Logstash will be running and you'll notice that it's waiting for syslog traffic to arrive:

[INFO ] 2022-12-09 13:43:07.819 [Ruby-0-Thread-18: :1] syslog - Starting syslog tcp listener {:address=>"0.0.0.0:514"}

We can use logger to send a test message as if it was a syslog message. We can either run this from the same machine (in a separate session) or from another machine in the same network.

logger -p local4.warn --rfc3164 --tcp -t CEF: "0|NETWORK|YOURFAVORITEFIREWALL|DnsProxyLog|Datacenter|proxy|10.73.4.7|48454|113.1.15.87|443|123|OUT|eu-v20.events.data.microsoft.com|TCP|ALLOW" -P 514 -d -n 127.0.0.1

BEWARE! If you're going to send test messages, these will be used to determine the schema of the custom table in our next step(s)! Future messages with a different format will not be stored correctly afterwards! For testing purposes this is obviously fine, but if you want to set this up for actual syslog messages, make sure that your syslog appliance will send messages to Logstash while create_sample_file => true is set.

Logstash might not immediately notify you of the fact that it has written the sample log file. But you can CTRL-C out of Logstash and eventually the event should pop-up:

[INFO ] 2022-12-09 13:43:18.932 [[main]-pipeline-manager] microsoftsentineloutput - Sample file was written in path: /tmp/logstash/sampleFile1670593398.json

Copy the contents of the sample file and save it somewhere, because we'll be needing it later during our next steps.

Prepare Azure resources

Before we can continue further on our Logstash VM, we need to attend to the Azure portal next. There we need to perform several steps:

  • Create an Data Collection Endpoint (DCE).
  • Create a DCR-based custom table in the Log Analytics workspace.
  • Create an app registration and generate a secret for it.
  • Grant the app registration appropriate permissions.

Create a Data Collection Endpoint (DCE)

Creating the DCE is very straightforward. Inside the Azure Portal go to Monitor → Select Data Collection Endpoint under Settings → click on Create and provide a valid name and Azure region.

After the DCE is created, copy and note down the "Logs ingestion" URI:

We'll be needing that URI later…

Create a DCR-based custom logs table

Next, we need to head over to our Log Analytics workspace and select Tables under Settings and click on Create:

Make sure to click on "DCR-based"

A wizard will pop-up and we need to provide a valid name for our custom table. Here we can also select the DCE created earlier. Click on Create a new data collection rule.

Do not create the DCR outside of this wizard! Otherwise it will not be attached to this custom table (stream)

Define schema and transformation query

Next, you'll need to upload the sample .json we generated and saved earlier. A preview will be shown and we can enter the Transformation editor.

Depending on your sample there might not already be a 'TimeGenerated' column which is mandatory. Hence the error message seen here. However, we'll be able to resolve this within the transformation editor.

The transformation editor is one of the main attractions here. You'll be able to parse and optionally filter your logs upon ingesting them into Sentinel!

If you've used my logger command above to try this out, you can use the next transformation KQL query to parse everything neatly into columns:

source
| extend TimeGenerated = ls_timestamp
| parse message
with MessageId: int
"|" DeviceType: string
"|" DeviceName: string
"|" EventType: string
"|" Location: string
"|" NetworkType: string
"|" SrcIp: string
"|" SrcPort: int
"|" DstIp: string
"|" DstPort: int
"|" MessageSize: int
"|" Direction: string
"|" DstAddress: string
"|" Protocol: string
"|" Result: string
| project-away message

As you can see I've used the contents of the ls_timestamp column (which was already of a datetime datatype) and extend this into a new column called TimeGenerated. This will also resolve the error message seen on the previous screen.

Finish the wizard. We're almost done in the Azure portal now.

Create an identity to authenticate with the DCR

Open up Azure Active Directory and select App registrations under Manage. Create a new application and make sure to configure a secret as well.

Note down the clientId, tenantdId, and secret. We'll also be needing these later.

Retrieve DCR immutable id

In the Azure portal go into MonitorData Collection Rules under Settings and select the DCR you've just created.

Wait… How can the DCR appear to be completely empty?! No data sources or resources are here to be found?
As a wise man once said: "your eyes can deceive you, don’t trust them"

There are some settings hidden from plain sight which we need to compile a final Logstash configuration at the end. One of them is the immutableId of the DCR we've created as part of our DCR-based custom table in the previous wizard.

You can find this in the Overview blade of the DCR. Click on JSON View in the top-right corner. The json definition of the DCR will show you the immutableId property:

{
"properties": {
"immutableId": "dcr-4f8fca38487647ed8dfedef124634bf3"
}
}

Note this down as well for later use.

We also need to double-check the "stream name" value. This will refer to the name of the custom table we created earlier. The easiest way is to open up Export template under Automation. Scroll down and you'll find the stream name right above the transformation query:

{
"resources": [
{
"type": "Microsoft.Insights/dataCollectionRules",
"properties": {
"dataFlows": [
{
"streams": [
"Custom-logstash_syslog_CL"
],
"destinations": [
"06aabcb106b244b9a0bfc06768924553"
],
"transformKql": "source\n| extend TimeGenerated = ls_timestamp\n| parse message\n with MessageId: int\n \"|\" DeviceType: string\n \"|\" DeviceName: string\n \"|\" EventType: string\n \"|\" Location: string\n \"|\" NetworkType: string\n \"|\" SrcIp: string\n \"|\" SrcPort: int\n \"|\" DstIp: string\n \"|\" DstPort: int\n \"|\" MessageSize: int\n \"|\" Direction: string\n \"|\" DstAddress: string\n \"|\" Protocol: string\n \"|\" Result: string\n| project-away message\n\n",
"outputStream": "Custom-logstash_syslog_CL"
}
]
}
}
]
}

You've probably already guessed it; write that one down as well.

Authorize the application to the DCR

While we're in the DCR, go into the Access Control (IAM) blade and click Add role assignment.

We need to grant our recently created application the built-in Monitoring Metrics Publisher role. Otherwise our log ingestion requests on the Data Collection Rule will be disallowed.

Create Logstash ingestion pipeline

Now that we have everything we need, we can create our actual Logstash pipeline. 🎉

Create a new Logstash config file:

sudo nano /etc/logstash/conf.d/syslog-to-dcr-based-sentinel.conf

And provide the following code (example):

input {
syslog {
port => 514
}
}

output {
microsoft-sentinel-logstash-output-plugin {
client_app_Id => "<APPID>"
client_app_secret => "<APPSECRET>"
tenant_id => "<TENANTID>"
data_collection_endpoint => "https://<DCEURI>.westeurope-1.ingest.monitor.azure.com"
dcr_immutable_id => "dcr-<IMMUTABLEID>"
dcr_stream_name => "Custom-<TABLENAME>"
}
}

Here everything comes together nicely. If everything went well you've already gathered all the information needed here.

Love it when a plan comes together!

The final result

With the final configuration in place, we can now finally run Logstash and check if we get some logs into Sentinel.

Run Logstash manually:

sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog-to-dcr-based-sentinel.conf

Or start the Logstash service. This will automatically run all pipelines in /etc/logstash/conf.d/*.conf. So do remember to remove the temporary "sample"-pipeline created earlier. Because you don't want Logstash to try and open multiple listeners using the same port.

sudo systemctl start logstash

Check service status:

systemctl status logstash

● logstash.service - logstash
Loaded: loaded (/lib/systemd/system/logstash.service; disabled; vendor preset: enabled)
Active: active (running) since Fri 2022-12-09 21:26:57 UTC; 3min 20s ago
Main PID: 45704 (java)
Tasks: 49 (limit: 4699)
Memory: 717.1M
CPU: 40.891s
CGroup: /system.slice/logstash.service
└─45704 /usr/share/logstash/jdk/bin/java

After firing a couple of different logger commands again, we can check if everything shows up nicely in Sentinel.

Data neatly parsed by the transformation rule

And it works! Everything is nicely parsed thanks to that transformation query configured inside the DCR.

Conclusion

I was already a big fan of the new data ingestion capabilities, because the transformation rules are a real game changer if you'd ask me.

It's great to see Microsoft providing and supporting an output plug-in for an open-source solution they didn't create themselves. To me, Logstash has proven to be a very versitile tool in the past, and it appears that Microsoft also recognizes its potential and its usecases.

I’m sure a lot of customer will be happy with this.

I hope this was informative. If you have any question never hesitate to reach out to me!

— Koos

--

--

Koos Goossens

Microsoft Security MVP | Photographer | Watch nerd | Pinball enthusiast | BBQ Grillmaster