Writing a Data Provider for MSTICPy

Query Provider query browser

What is a MSTICPy Data Provider?

  • Authentication to the service (usually from configuration values but can also support specifying parameters such as usernames and passwords at run time).
  • Querying data from the service — queries can be either ad hoc queries as strings or templated queries allowing substitutable parameters for common items such as time range, account and host names, etc.
  • Returning the data as a pandas DataFrame. The driver is responsible for converting data types if needed. This is particularly important for datetime data that is usually returned as a string. Most MSTICPy functionality expects datetime in a timezone-aware pandas Timestamp.

Implementing a data provider

  1. Write the driver
  2. Customize the driver (optional)
  3. Register the driver
  4. Add queries
  5. Add a settings definition
  6. Create documentation
  7. Create unit tests

1. Write the driver class

  • __init__
  • connect
  • query
  • query_with_results (optional)

__init__

connect

query

  • query - string of query text
  • query_source - this is populated if the query is a MSTICPy template query read from a query yaml file (see Creating new queries) and is an instance of QuerySource. This is a representation of the yaml query with extracted parameters and metadata available as explicit attributes
  • kwargs - any other keyword arguments passed when running the query that are not consumed as query parameters, etc.

query_with_results

2. Customize the driver

Exposing attributes via the QueryProvider

  • implement the attribute in the driver (this can be a method or other type)
  • set self.public_attribs (of your driver class) to a Python dictionary of {name: value} where name is the public name of the attribute (i.e. what you will see as an attribute of QueryProvider) and value is the value of the attribute that you want to expose (e.g. a method or attribute of the driver). This is shown in the example below.
def __init__(self, **kwargs):
"""Initialize new instance."""
... self.public_attribs = {
"client": self.service,
"saved_searches": self._saved_searches,
"fired_alerts": self._fired_alerts,
}

Custom parameter formatting

# Parameter Formatting methods
@staticmethod
def _format_datetime(date_time: datetime) -> str:
"""Return datetime-formatted string."""
return f'"{date_time.isoformat(sep=" ")}"'
@staticmethod
def _format_list(param_list: Iterable[Any]) -> str:
"""Return formatted list parameter."""
fmt_list = [f'"{item}"' for item in param_list]
return ",".join(fmt_list)
def __init__(self, **kwargs):
"""Initialize new instance."""
... self.formatters = {
Formatters.DATETIME: self._format_datetime,
Formatters.LIST: self._format_list,
}

Customizing the query parameter substitution

   sources:
list_files:
description: Lists all file events by filename
metadata:
args:
query: '
{table}
| where Timestamp >= datetime({start})
| where Timestamp <= datetime({end})
| where FileName has "{file_name}"
{add_query_items}'
  • query — the raw query string from the yaml file
  • param_dict — a dictionary of parameter name, parameter value
def __init__(self, **kwargs):
"""Initialize new instance."""
... self.formatters = {
Formatters.PARAM_HANDLER: self._custom_param_handler,
Formatters.DATETIME: self._format_datetime,
Formatters.LIST: self._format_list,
}

3. Register the driver

Add the provider as a DataEnvironment

@export
class DataEnvironment(Enum):
"""
Enumeration of data environments.
Used to identify which queries are relevant for which
data sources.
"""
Unknown = 0
AzureSentinel = 1 # alias of LogAnalytics
LogAnalytics = 1
MSSentinel = 1
Kusto = 2
...
ResourceGraph = 9
Sumologic = 10
M365D = 11
Cybereason = 12
Elastic = 14
NewProvider = 15 # << Your provider entry

Add an entry to the driver dynamic load table

_ENVIRONMENT_DRIVERS = {
DataEnvironment.LogAnalytics: ("kql_driver", "KqlDriver"),
...
DataEnvironment.Elastic: ("elastic_driver", "ElasticDriver"),
DataEnvironment.NewProvider: ("new_driver", "NewDriverClass"),
}

4. Add queries

Query parameter names

5. Add settings definition

DataProviders:
...
MicrosoftDefender:
Args:
ClientId: str(format=uuid)
TenantId: str(format=uuid)
# [SuppressMessage("Microsoft.Security", ...
ClientSecret: *cred_key

6. Add provider documentation

7. Create driver unit tests

Conclusion

--

--

--

This is the account of the Microsoft Threat Intelligence Center (MSTIC).

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

CS373 Spring 2020: Brian LeQuang — #12

Understanding Java Sockets

Access the Azure Container Registry using Azure Managed Identity programatically

My Laravel TDD 101 Experience: How to set up a database for local testing?

Avoid arrive everything.

Solution!! Avoid Kubernetes/Openshift Node Drain Failure due to active PodDisruptionBudget

Starting SQL: Sorting records with ORDER BY

Deploying Web Server on AWS through ANSIBLE!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
MSTIC

MSTIC

This is the account of the Microsoft Threat Intelligence Center (MSTIC).

More from Medium

Python: Speech To Text Conversion

Wazuh & Sysmon “Quick Setup and a Brief Overview”

SIEM Tools

Finding Inconsistencies In MITRE ATT&CK Data Sources