Ingest DCR-based custom logs in Microsoft Sentinel with Logstash
The "Swiss army knife" is back! And better than ever!
UPDATE
I’ve recently worked on the source code for the Microsoft Sentinel / Log Analytics output plugin (together with a fellow-MVP) and we’ve implemented Managed Identity support. This completely eliminates the use of secrets altogether and also makes this key rotation mechanism obsolete.
I’d recommend not using application IDs and secrets any longer, and make use of Managed Identities instead.
It's back!
Already more than 2,5 years ago, I wrote about Logstash for the first time. Then I explained how you can leverage this versatile open-source tool to your benefits, in combination with Microsoft Sentinel, to ingest custom logs.
Fun fact: Microsoft Sentinel was still called Azure Sentinel back then. And before that Microsoft named it "Security Insights", a name which still echoes deep inside the trenches of the Azure cloud. That's why we're still referring to the latter when deploying Sentinel via ARM templates for example.
In the last few months quite a lot of new features, regarding log ingestion for Sentinel were released. One of them is a completely reworked ingestion pipeline for custom logs providing lots of new abilities. Like on-the-fly transformation and filtering, and making use of a new 'basic' logs tier. About a week ago these features became general available.
If you want to learn more about ' basic' (and 'archive') logs, and how the new log ingestion pipeline makes use of new Azure resource like Data Collection Endpoints and Data Collection Rules, please take a closer look at my earlier article about the subject.
Ingesting custom logs through Data Collection Endpoints (DCE) and Data Collection Rules (DCR) has already been added to the Azure Monitoring Agent (AMA). (Albeit still in public preview) But there was one particular solution I was really looking forward to see updated. And that is an updated version of the Logstash output plugin Microsoft released in the past.
Since I helped a global enterprise implementing this solution at scale, I was constantly reaching out to several contacts within Microsoft to ask them if this was still something that was on their roadmap. Because there was no mention of this at all, for a short while I thought that Microsoft wasn't interested in updating this plugin at all.
So it remained silent…. (insert crickets 🦗) Until, one day I was handed a confidential .ZIP file with a question if I’d be willing to try something out! And now that this feature is no longer under NDA, I can finally share my thoughts about the updated plugin for Logstash!
Why is this such a big deal?
Well at least to me it is. 😉 This is because larger companies tend to have quite a few reason why they would favor Logstash over other "built-in" solutions Microsoft offers. The solution might require a little bit more effort, but in my opinion this tradeoff is definitely worth it.
Logstash pros over standard log collectors:
- Logstash can easily run inside a container on your favorite container platform (such as Azure Kubernetes Services) instead of having to deploy (and manage!) Virtual Machines. In my previous article about Logstash I went into more detail about creating a Logstash container. This makes it easy to create a highly available, more resilient log collector.
- With multiple pipelines inside Logstash, it's easier to define multiple listeners (inputs) to forward multiple sources to different tables (outputs) in Microsoft Sentinel.
- Logstash can filter on-the-fly log ingestion before it is send out to the Microsoft agent pushing it into Sentinel. This is huge! Especially for network logs.
- With Logstash's wide variety on input plugin, the type of sources you can collect logs from are essentially endless. I've described PostgreSQL data ingestion in the past.
But by still using the older output plugin, these customers were missing out on new features such as 'basic' logs and transformations. (which is arguable a waaaay better user experience than creating Grok patterns in Logstash)
No more unsafe distribution of keys
Another thing that the new data ingestion via DCE/DCR does much better is authentication!
Companies with multiple physical locations will probably want to upload some logs from network components to Sentinel. These can be firewalls, proxies and other important public facing devices. Like application delivery controllers such as Citrix ADX. Those physical locations might not always be managed by the same team(s). So in practice they'll end up distributing the workspace ID and key to one another to configure these inside their syslog forwarders. (previously running Microsoft Management Agent (MMA) or Logstash)
With this new approach a separate dedicated identity can be used to ingest data into each and every table. This means the sender can only send data to your workspace, and no longer also be able to read data as well. The workspace key can be kept in a safe place again, and shouldn't be shared easily any longer.
Installation
Let's use a VM this time and deploy a standard Ubuntu image in Azure.
You can either download Logstash and install it manually, or use a package manager instead. Elastic points out that if you choose the latter, you need to run Logstash as a background service or your are required to provide a configuration path when running interactively. I'll show both examples at the end of this article.
Elastic's instructions unfortunately makes you add the public signing key to apt-key
which is soon to be depricated. So please use the following commands instead:
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/elastic.gpg >/dev/null
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-8.x.list >/dev/null
sudo apt-get update && sudo apt-get install logstash=1:8.14.1-1
sudo apt-mark hold logstash
Note that I'm specifically using Logstash version 8.14. This is because Microsoft's output plugin currently does not support newer versions! Please read more about the latest supported version on Microsoft's documentation. Therefore I'd also advise to put the Logstash package on hold for automatic updates, as shown in the shell commands above.
This will install the Logstash binaries in /usr/share/logstash/bin/
and the configuration files in /etc/logstash/
where your pipelines are expected to reside inside /etc/logstash/conf.d/
with a .conf
extension.
Install the Microsoft Sentinel output plugin
Next, we need to install the plugin for Logstash:
sudo /usr/share/logstash/bin/logstash-plugin install microsoft-sentinel-logstash-output-plugin
And let's create a temporary folder for our sample log file, which we'll be needing later:
sudo mkdir /tmp/logstash
Prepare a sample log file
With the new DCR-based log tables approach, you need to create the table beforehand and determine its schema, before you ingest data into it.
That is why we need to create a sample of our logs in a .json
format before we can prepare our resources in Azure. Luckily, Microsoft has provided an option inside the output plugin to first save one or more samples, before using it to push the date into Log Analytics.
Create your first Logstash pipeline:
sudo nano /etc/logstash/conf.d/syslog-to-dcr-based-sentinel-sample.conf
And input the following:
input {
syslog {
port => 514
}
}
output {
microsoft-sentinel-logstash-output-plugin {
create_sample_file => true
sample_file_path => "/tmp/logstash/"
}
}
As you can see, I'll be using syslog to listen on port 514. If Logstash will be running, and it will receive syslog messages, it will convert and save them into the temp folder provided.
Run Logstash interactively by running:
sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog-to-dcr-based-sentinel-sample.conf
After a couple of seconds Logstash will be running and you'll notice that it's waiting for syslog traffic to arrive:
[INFO ] 2022-12-09 13:43:07.819 [Ruby-0-Thread-18: :1] syslog - Starting syslog tcp listener {:address=>"0.0.0.0:514"}
We can use logger
to send a test message as if it was a syslog message. We can either run this from the same machine (in a separate session) or from another machine in the same network.
logger -p local4.warn --rfc3164 --tcp -t CEF: "0|NETWORK|YOURFAVORITEFIREWALL|DnsProxyLog|Datacenter|proxy|10.73.4.7|48454|113.1.15.87|443|123|OUT|eu-v20.events.data.microsoft.com|TCP|ALLOW" -P 514 -d -n 127.0.0.1
BEWARE! If you're going to send test messages, these will be used to determine the schema of the custom table in our next step(s)! Future messages with a different format will not be stored correctly afterwards! For testing purposes this is obviously fine, but if you want to set this up for actual syslog messages, make sure that your syslog appliance will send messages to Logstash while
create_sample_file => true
is set.
Logstash might not immediately notify you of the fact that it has written the sample log file. But you can CTRL-C
out of Logstash and eventually the event should pop-up:
[INFO ] 2022-12-09 13:43:18.932 [[main]-pipeline-manager] microsoftsentineloutput - Sample file was written in path: /tmp/logstash/sampleFile1670593398.json
Copy the contents of the sample file and save it somewhere, because we'll be needing it later during our next steps.
Prepare Azure resources
Before we can continue further on our Logstash VM, we need to attend to the Azure portal next. There we need to perform several steps:
- Create an Data Collection Endpoint (DCE).
- Create a DCR-based custom table in the Log Analytics workspace.
- Create an app registration and generate a secret for it.
- Grant the app registration appropriate permissions.
Create a Data Collection Endpoint (DCE)
Creating the DCE is very straightforward. Inside the Azure Portal go to Monitor → Select Data Collection Endpoint under Settings → click on Create and provide a valid name and Azure region.
After the DCE is created, copy and note down the "Logs ingestion" URI:
Create a DCR-based custom logs table
Next, we need to head over to our Log Analytics workspace and select Tables under Settings and click on Create:
A wizard will pop-up and we need to provide a valid name for our custom table. Here we can also select the DCE created earlier. Click on Create a new data collection rule.
Define schema and transformation query
Next, you'll need to upload the sample .json
we generated and saved earlier. A preview will be shown and we can enter the Transformation editor.
The transformation editor is one of the main attractions here. You'll be able to parse and optionally filter your logs upon ingesting them into Sentinel!
If you've used my logger
command above to try this out, you can use the next transformation KQL
query to parse everything neatly into columns:
source
| extend TimeGenerated = ls_timestamp
| parse message
with MessageId: int
"|" DeviceType: string
"|" DeviceName: string
"|" EventType: string
"|" Location: string
"|" NetworkType: string
"|" SrcIp: string
"|" SrcPort: int
"|" DstIp: string
"|" DstPort: int
"|" MessageSize: int
"|" Direction: string
"|" DstAddress: string
"|" Protocol: string
"|" Result: string
| project-away message
As you can see I've used the contents of the ls_timestamp
column (which was already of a datetime
datatype) and extend
this into a new column called TimeGenerated
. This will also resolve the error message seen on the previous screen.
Finish the wizard. We're almost done in the Azure portal now.
Create an identity to authenticate with the DCR
Open up Azure Active Directory and select App registrations under Manage. Create a new application and make sure to configure a secret as well.
Note down the
clientId
,tenantdId
, andsecret
. We'll also be needing these later.
Retrieve DCR immutable id
In the Azure portal go into Monitor → Data Collection Rules under Settings and select the DCR you've just created.
There are some settings hidden from plain sight which we need to compile a final Logstash configuration at the end. One of them is the immutableId
of the DCR we've created as part of our DCR-based custom table in the previous wizard.
You can find this in the Overview blade of the DCR. Click on JSON View in the top-right corner. The json
definition of the DCR will show you the immutableId
property:
{
"properties": {
"immutableId": "dcr-4f8fca38487647ed8dfedef124634bf3"
}
}
Note this down as well for later use.
We also need to double-check the "stream name" value. This will refer to the name of the custom table we created earlier. The easiest way is to open up Export template under Automation. Scroll down and you'll find the stream name right above the transformation query:
{
"resources": [
{
"type": "Microsoft.Insights/dataCollectionRules",
"properties": {
"dataFlows": [
{
"streams": [
"Custom-logstash_syslog_CL"
],
"destinations": [
"06aabcb106b244b9a0bfc06768924553"
],
"transformKql": "source\n| extend TimeGenerated = ls_timestamp\n| parse message\n with MessageId: int\n \"|\" DeviceType: string\n \"|\" DeviceName: string\n \"|\" EventType: string\n \"|\" Location: string\n \"|\" NetworkType: string\n \"|\" SrcIp: string\n \"|\" SrcPort: int\n \"|\" DstIp: string\n \"|\" DstPort: int\n \"|\" MessageSize: int\n \"|\" Direction: string\n \"|\" DstAddress: string\n \"|\" Protocol: string\n \"|\" Result: string\n| project-away message\n\n",
"outputStream": "Custom-logstash_syslog_CL"
}
]
}
}
]
}
You've probably already guessed it; write that one down as well.
Authorize the application to the DCR
While we're in the DCR, go into the Access Control (IAM) blade and click Add role assignment.
We need to grant our recently created application the built-in Monitoring Metrics Publisher role. Otherwise our log ingestion requests on the Data Collection Rule will be disallowed.
Create Logstash ingestion pipeline
Now that we have everything we need, we can create our actual Logstash pipeline. 🎉
Create a new Logstash config file:
sudo nano /etc/logstash/conf.d/syslog-to-dcr-based-sentinel.conf
And provide the following code (example):
input {
syslog {
port => 514
}
}
output {
microsoft-sentinel-logstash-output-plugin {
client_app_Id => "<APPID>"
client_app_secret => "<APPSECRET>"
tenant_id => "<TENANTID>"
data_collection_endpoint => "https://<DCEURI>.westeurope-1.ingest.monitor.azure.com"
dcr_immutable_id => "dcr-<IMMUTABLEID>"
dcr_stream_name => "Custom-<TABLENAME>"
}
}
Here everything comes together nicely. If everything went well you've already gathered all the information needed here.
The final result
With the final configuration in place, we can now finally run Logstash and check if we get some logs into Sentinel.
Run Logstash manually:
sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog-to-dcr-based-sentinel.conf
Or start the Logstash service. This will automatically run all pipelines in /etc/logstash/conf.d/*.conf
. So do remember to remove the temporary "sample"-pipeline created earlier. Because you don't want Logstash to try and open multiple listeners using the same port.
sudo systemctl start logstash
Check service status:
systemctl status logstash
● logstash.service - logstash
Loaded: loaded (/lib/systemd/system/logstash.service; disabled; vendor preset: enabled)
Active: active (running) since Fri 2022-12-09 21:26:57 UTC; 3min 20s ago
Main PID: 45704 (java)
Tasks: 49 (limit: 4699)
Memory: 717.1M
CPU: 40.891s
CGroup: /system.slice/logstash.service
└─45704 /usr/share/logstash/jdk/bin/java
After firing a couple of different logger
commands again, we can check if everything shows up nicely in Sentinel.
And it works! Everything is nicely parsed thanks to that transformation query configured inside the DCR.
Conclusion
I was already a big fan of the new data ingestion capabilities, because the transformation rules are a real game changer if you'd ask me.
It's great to see Microsoft providing and supporting an output plug-in for an open-source solution they didn't create themselves. To me, Logstash has proven to be a very versitile tool in the past, and it appears that Microsoft also recognizes its potential and its usecases.
I’m sure a lot of customer will be happy with this.
I hope this was informative. If you have any question never hesitate to reach out to me!
— Koos