Introducing LogFeeder - A log collection system
-
Yonny Tonui, Information Security Engineer
- Mar 2, 2018
Introduction
The collection and processing of logs is essential to good security. One of the primary functions of a security team is to keep organizations safe by eliminating blind spots in infrastructure. Breach investigations without logs result in a lot of guesswork. Worse, the activities of an attacker can easily remain undiscovered without adequate logging. To ensure we have a robust log storage and visualization platform, we use Elasticsearch, Logstash and Kibana (ELK). These tools form part of the toolset that we use in our Security Incident and Event Monitoring (SIEM) solution. ElastAlert is the primary means by which alerts are generated from the event data that is stored in Elasticsearch. It is the best thing since sliced bread, check out these blog posts to learn more.
A majority of the log collection software that is currently available today focuses on collecting logs from syslog, which is good for systems that we control directly. Unfortunately, today’s solutions fall short when it came to infrastructure we depended on heavily but do not control, such as Duo or Cisco Umbrella. We felt there was a need for a system that would enable us to collect logs for the different SaaS (Software-as-a-service) systems that we subscribe to. We needed these logs in our SIEM so that we could have visibility on various actions, create alerts for threat detection, and comply with the relevant regulations that guide log storage and auditing.
The basic problems we faced were:
-
We needed to ingest logs generated by very different systems
-
These systems each had different log output formats and authentication schemes
-
We needed to curate, normalize and standardize certain fields in the logs in a form that would make it easy to query e.g username@domain, domain\username etc
-
Service providers didn’t keep logs as far back as we wanted
Introducing LogFeeder
In searching for a log collection and ingestion tool that fit our needs, we developed LogFeeder. This service enabled us to collect logs from different systems and ingest them into our SIEM for alerting. The architecture is very modular and enables the ingestion of new SaaS log sources by writing new plugins. Writing a new plugin is as simple as extending the BaseFeeder class and adding the specific log retrieval API shared by the service provider. It currently uses Elasticsearch as a datastore and Amazon SQS for queueing, but the architecture is modular enough that any other datastore or queue could be integrated. LogFeeder allows for the tokenization of the event log fields in such a way that makes querying easy.
The Class Architecture
The service is written in Python. The class architecture takes advantage of standard Python inheritance to model input and output plugins. The classes are detailed below:
-
BaseFeeder - The base class, sets up the common logic for example command-line argument parsing, tracking the number of processed records, timestamps etc
-
S3Feeder - The class enables reading from Amazon S3 buckets
-
SqsOutput - The class enables writing out data to an Amazon SQS queue
-
EsOutput - The class enables writing out data to Elasticsearch
Sample Scenarios
We have open-sourced feeders that we use for Cisco Umbrella, Duo and OneLogin. In the following sections we will go through some practical sample scenarios on how you can leverage the existing feeders and also demonstrate how you can extend the platform to support feeders for systems that you already use in your organization.
OneLogin User Authentication Logs
OneLogin provides single sign-on (SSO) solutions to organizations. They allow the extraction of various types of logs, but the logs we are concerned with are user authentication logs. These logs are invaluable to an analyst responding to a stolen-credentials scenario that might be related to phishing or any other type of attack. The diagram below shows how the input one gets from OneLogin is transformed by LogFeeder.
Input | Output |
---|---|
{ “id”: 999999999, “created_at”: “2018-12-19T02:02:39.276Z”, “account_id”: 55555, “user_id”: 88888888, “event_type_id”: 13, “notes”: “password”, “ipaddr”: “11.111.11.111”, “actor_user_id”: 7777777, “assuming_acting_user_id”: null, “app_name”: null, “group_name”: null, “actor_user_name”: “John Doe”, “user_name”: “jdoe”, “policy_name”: null, “otp_device_name”: null, “operation_name”: null, “directory_sync_run_id”: null, “directory_id”: null, “resolution”: null, “client_id”: null, “resource_type_id”: null, “error_description”: null, “proxy_ip”: “127.0.0.1” } |
{ “@version”: “1”, “@timestamp”: “2018-12-19T02:02:39.276Z”, “type”: “logfeeder”, “event_time”: “2018-12-19T02:02:39.276Z”, “onelogin_data”: { “id”: 999999999, “created_at”: “2018-12-19T02:02:39.276Z”, “account_id”: 55555, “user_id”: 88888888, “event_type_id”: 13, “notes”: “password”, “ipaddr”: “11.111.11.111”, “actor_user_id”: 7777777, “assuming_acting_user_id”: null, “app_name”: null, “group_name”: null, “actor_user_name”: “John Doe”, “user_name”: “jdoe”, “policy_name”: null, “otp_device_name”: null, “operation_name”: null, “directory_sync_run_id”: null, “directory_id”: null, “resolution”: null, “client_id”: null, “resource_type_id”: null, “error_description”: null, “proxy_ip”: “127.0.0.1” }, “logfeeder_account”: “example.org” “logfeeder_subapi”: “auth” “logfeeder_type”: “onelogin” “org_username”: “jdoe” } |
Duo 2FA Logs
Duo provides two-factor authentication solutions to organizations. Attackers are getting craftier and more innovative by the day. Even with two-factor authentication, some users are unfortunately convinced to give the attackers access. In these cases, two-factor authentication logs become important for scoping the extent of a compromise. Two-factor authentication logs can also be used to detect anomalies (or limit post-exploitation) with regard to login location, time of day and other indicators of compromise (IOC) parameters. Other log types of interest may be those related to a change in a user privilege. Remember context matters in most cases; if a change is expected and all the change procedures have been followed, then everything is fine. If the change is unexpected and a potential breach is being investigated at the same time - then it just may be malicious. Duo also has a nifty feature that allows users to “report as fraud” pushes that were not initiated by them. These logs and corresponding alerts are very helpful in identifying compromised users. The diagram below shows how the Duo logs are transformed by LogFeeder.
Input | Output |
---|---|
{ “username”: “jadmin”, “description”: { “phone”: “+1(234)-56789”, “email”: “jdoe@example.org”, “role”: “Admins”, “name”: “jdoe”, “hardtoken”: null, } “timestamp”: 1512020011, “object”: “jdoe”, “host”: “za.duo.com”, “eventtype”: “administrator”, “action”: “admin_create” } |
{ “@version”: “1”, “@timestamp”: “2018-12-19T02:02:39.276Z”, “type”: “logfeeder”, “event_time”: “2018-12-19T02:02:39.276Z”, “duo_data”: { “username”: “jadmin”, “description”: { “phone”: “+1(234)-56789”, “email”: “jdoe@example.org”, “role”: “Admins”, “name”: “jdoe”, “hardtoken”: null, } “timestamp”: 1512020011, “object”: “jdoe”, “host”: “za.duo.com”, “eventtype”: “administrator”, “action”: “admin_create” }, “logfeeder_account”: “example.org” “logfeeder_subapi”: “auth” “logfeeder_type”: “duo” “org_username”: “jdoe” } |
Cisco Umbrella Logs
Nowadays employees access the organization’s network from a myriad of places. A security analyst needs visibility into domain resolutions wherever they happen. DNS records greatly assist in mapping out affected users in a breach and identifying the presence and origin of malware in a network. Cisco Umbrella makes this information available in a comma separated values (CSV) format. LogFeeder parses this information (as shown below) and makes it available in Elasticsearch; ready for querying whenever a situation strikes!
Input | Output |
---|---|
“2018-01-22 14:14:23”, “COMPUTER_001”, “COMPUTER_001”, “112.11.21.31”, “172.16.1.1”, “Allowed”, “1(A)”, “NOERROR”, “gimmeyourpassword.com”, “Search Engines” |
{ “@version”: “1”, “@timestamp”: “2018-01-22T14:14:23.276Z”, “event_time”: “2018-01-22T14:14:23.276Z”, “opendns_data”: { “domain”: “gimmeyourpassword.com”, “external_ip”: “112.11.21.31”, “response_code”: “NOERROR”, “internal_ip”: “172.16.1.1”, “query_type”: “1(A)”, “action”: “Allowed”, “identities”: “COMPUTER_001”, “categories”: “Search Engines”, “most_granular_identity”: “COMPUTER_001” }, “logfeeder_account”: “example.org”, “logfeeder_subapi”: “opendns”, “logfeeder_type”: “opendns” } |
Sample Feeder
Now we shall look at extending LogFeeder to import logs from a sample service Foo. In this example we shall assume that the service provider has shared a python library (foo_client) that allows for the extraction of timestamped logs. We shall also assume you want to write the data out to an Elasticsearch cluster (writing out to any other data store is as simple as extending BaseFeeder and overriding the send_records function) and as such FooFeeder will extend EsOutput in order to take advantage of the EsOutput class that writes out to Elasticsearch. The following methods will need to be overridden in order to leverage the functionality offered by the BaseFeeder:
-
read_api_creds - This function enables the reading of Foo’s API credentials from a file. It enables you to define how the credentials will be read out and it is a good place to initialize Foo’s API client.
-
convert_dt_to_api_timestring - This function enables the formatting of timestamps in a format that is compatible to Foo’s API date format.
-
convert_api_timestring_to_iso8601 - This function enables the standardization of timestamps when outputting the logs to Elasticsearch.
-
_make_queries - This is where all the magic happens, the core function that is expected to return the log records read from the Foo API. Two variables are provided to enable you to limit the start and end times of the specific time range you would like to retrieve logs for.
The sample feeder for Foo’s service logs is shown below.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Reads events from the Foo API and sends them to Elasticsearch
"""
from __future__ import absolute_import
from datetime import datetime
import foo_client
import simplejson
from pytz import utc
from log_feeder.utils import datetime_to_unixtime
from log_feeder.utils import format_dt_as_iso8601
from log_feeder.utils import lock_and_load
from log_feeder.utils import read_yaml_file
from log_feeder.base_feeder import parse_base_feeder_options
class FooFeeder(EsOutput):
"""Reads Activity Data from the Foo API and feeds it to Elasticsearch"""
def __init__(self, **kwargs):
super(FooFeeder, self).__init__(**kwargs)
APP_NAME = 'foo'
TIMESTAMP_KEYNAME = 'timestamp'
def read_api_creds(self, config_file):
"""Read Foo credentials necessary to access activity data from the Foo API
Args:
config_file: file containing the service’s credentials
Returns:
Foo credentials object in a dictionary, or None on failure
"""
yaml_content = read_yaml_file(
config_file, ['api_key', 'secret_key', 'api_hostname'])
foo_api = foo_client.FooApiClient(
api_key=yaml_content[api_key],
host=yaml_content['api_hostname'])
return {'foo_api': foo_api}
def _make_queries(self, start_time, end_time, api_info_dict, sub_api_name):
"""Makes queries to the Foo API for records in the specified time range
Args:
start_time (string): starting time (as a string recognized by the
API) for log ingestion
end_time (string): ending time (as a string recognized by the
API) for log ingestion
api_info_dict: a dictionary containing a Foo API access object
sub_api_name (string): the name of the sub_api to be called (if
the API has sub_apis)
Yields:
An enumerable of dicts containing data from Foo API
"""
foo_api = api_info_dict['foo_api']
# Call the Foo API
query_results = foo_api.get_auth_logs(start_time, end_time)
for qr in query_results:
yield qr
def convert_dt_to_api_timestring(self, datetime_obj):
"""Converts a datetime object to a time string accepted by the API
Args:
datetime_obj (datetime): A datetime object
Returns:
A string representing the datetime that is accepted by the API
"""
return datetime_to_unixtime(datetime_obj)
def convert_api_timestring_to_iso8601(self, api_timestring):
"""Converts a time string output by the API to a iso8601 time string
Args:
api_timestring (string): A time string output by the API
Returns:
A time string in the iso8601 format
"""
return format_dt_as_iso8601(datetime.utcfromtimestamp(
api_timestring).replace(tzinfo=utc))
def parse_options():
parser = parse_base_feeder_options()
(options, args) = parser.parse_args()
return options
def main():
options = parse_options()
lock_and_load(FooFeeder(options=options))
if __name__ == "__main__":
main()
LogFeeder has recently been open-sourced and is available on GitHub. Happy Hacking, stay secure and we hope you find this tool useful!