Agari Developer Documentation

Agari Developer Documentation

API Overview & Guide
Get started
Interactive API Reference
Get started

Reporting on Policy Enforcement

Introduction

After reading through the Agari API documentation you might be asking, "How do all of these pieces fit together?" This section can help answer that question by walking you through the process of writing a Python script that uses the Agari APIs, in this case one that reports on aggregate message enforcement data.

The goal of our script is to obtain two spreadsheets, one containing a list of all messages that triggered an APD policy, and the other containing an ordered count of messages that triggered a policy, grouped by the enforcement action that was ultimately taken on that message.

Explanation of the Reports

The first report is simply a collection of CSVs with each row showing a different message that matches some policy, with columns for every available field in the list messages API endpoint. This can be used as a reference data set for later processing or review.

The second report is a more complex aggregate view. As an APD admin, I’m interested in understanding the effectiveness of my policies, and I also want data to inform any changes to existing policies, such as adjusting match criteria or enforcement actions. For example, I might have a policy that is not currently enforced and I want to deduce whether I should start enforcing this policy. I’ll propose a hypothesis that if I see that a large number of messages matching this policy are ultimately enforced by some other means, other real-time policies, or on-demand Policies, then I will feel more confident enabling enforcement.

To test this, our second report will need to show the number of messages that matched each of my policies, along with the ultimate enforcement outcome. Each row of the report will show a different policy, and the columns will show the final enforcement outcome, for example “move, success”, “inbox, error”, “delete, pending”, and so on.

Getting Started

Before we look at how to create these specific reports, let's make sure that our system is setup with the right prerequisites. After that we'll start building up the boilerplate code we need to retrieve results from arbitrary API endpoints.

Prerequisites

The sample code in this article assumes that you have installed Python3, installed the Python requests package, and set environment variables with your API credentials

🚧

Obtain your client_id and client_secret

If you do not have these credentials please refer to the Quick Start guide. These samples assume that you have already generated an API client ID and client secret in Agari Phishing Defense.

On a Linux or MacOS system you can set these environment variables for the duration of a terminal session with the export command.

export CLIENT_ID=YOUR_CLIENT_ID
export CLIENT_SECRET=YOUR_CLIENT_SECRET

On Windows you can use the set command.

set CLIENT_ID "YOUR_CLIENT_ID"
set CLIENT_SECRET "YOUR_CLIENT_SECRET"

Wrapping Up Authentication

We can start with a snippet for retrieving an Oauth token out of the interactive API documentation, and iterate on this snippet to create reusable code that can be used for any other endpoint.

import requests
url = "https://api.agari.com/v1/ep/token"
headers = {
   'accept': "application/json",
   'content-type': "application/x-www-form-urlencoded"
   }
response = requests.request("POST", url, headers=headers)
print(response.text)

This snippet produces an Oauth access token, but the response is wrapped in JSON. We really just need the access token string, which we’ll use to create an authorization header on other API calls, and we may want to query this endpoint again in case our token expires. So to create reusable token generation code let's wrap the action in a function. While we're doing this let's also look at what a function to retrieve a list of messages would look like.

#!/usr/bin/env python3
# list_messages.py
 
import requests
from os import environ
 
def generate_token(client_id, client_secret):
   url = "https://api.agari.com/v1/ep/token"
 
   payload = f"client_id={client_id}&client_secret={client_secret}"
   headers = {
       'accept': "application/json",
       'content-type': "application/x-www-form-urlencoded"
       }
 
   response = requests.request("POST", url, data=payload, headers=headers)
 
   return response.json()['access_token']
 
def list_messages(access_token, **kwargs):
   url = "https://api.agari.com/v1/ep/messages"
 
   querystring = kwargs
 
   headers = {'accept': 'application/json'}
   headers['authorization'] = f'Bearer {access_token}'
 
   response = requests.request("GET", url, headers=headers, params=querystring)
 
   return response.json()['messages']
 
client_id = environ.get('CLIENT_ID')
client_secret = environ.get('CLIENT_SECRET')
 
access_token = generate_token(client_id, client_secret)
 
messages = list_messages(access_token)
for message in messages:
   print(message)
print(len(messages))

When you execute this file note how many messages are printed. A single API call to the list messages action only retrieves a limited number of results.

Pagination Strategy

We’re going to refactor our list_messages function to paginate through all of the messages for a given set of query parameters. Two things are going to change about our function: the return type, and the way we sort the results.

A byproduct of adding pagination to our function is going to be that the return type of the function changes from a list to a generator. Using a generator will mean that the results are iterable but not subscriptable, but it also means that our code will only call new pages from the API as needed.

When you query lists from the API, the order of the results is not always guaranteed. To reliably retrieve all the messages we want, we can add a date-ascending sort to our query. By adding ‘date ASC’ to the beginning of the sort parameter we’re forcing the results to be sorted the way that we need, even if another date sort already exists.

def list_messages(access_token, **kwargs):
   url = "https://api.agari.com/v1/ep/messages"
 
   querystring = kwargs
   if 'sort' in querystring:
       querystring['sort'] = 'date ASC, ' + querystring['sort']
   else:
       querystring['sort'] = 'date ASC'
 
   headers = {'accept': 'application/json'}
   headers['authorization'] = f'Bearer {access_token}'
 
   response = requests.request("GET", url, headers=headers, params=querystring)
   response_json = response.json()
 
   if not response_json['count']:
       raise StopIteration
 
   for message in response_json['messages']:
       yield message
 
   querystring['offset'] = querystring.get('offset', 0) + response_json['count']
 
   yield from list_messages(access_token, **querystring)

Replace the old list_messages with the new version of the function and run our code again. The results might take a while. At the end you should see a traceback. You can ignore the error for now, which for our purposes means that it worked.

Traceback (most recent call last):
  File "./list_messages.py", line 51, in <module>
    print(len(messages))
TypeError: object of type 'generator' has no len()

object of type 'generator' has no len()

Creating Abstract Endpoint Functions

There are two abstract API endpoint types:

  • index
  • show

Index endpoints return a list with multiple results and show endpoints return a single result. Index endpoints are usually what you will use when you don’t know exactly what you’re looking for because they are searchable or filterable. Show endpoints often provide more detailed information about a given data type by including additional fields.

Let's generalize our list_messages function to work for any list endpoint type. We can do this by adding endpoint and collection_key arguments to the function so that we can then create helper functions for individual data types as needed.

def list_api(access_token, endpoint, collection_key, **kwargs):
   url = f"https://api.agari.com/v1/ep/{endpoint}"
 
   querystring = kwargs
 
   headers = {'accept': 'application/json'}
   headers['authorization'] = f'Bearer {access_token}'
 
   response = requests.request("GET", url, headers=headers, params=querystring)
   response_json = response.json()
 
   if not response_json['count']:
       return
 
   for message in response_json[collection_key]:
       yield message
 
   querystring['offset'] = querystring.get('offset', 0) + response_json['count']
   yield from list_api(access_token, endpoint, collection_key, **querystring)
 
def list_messages(access_token, **kwargs):
   if not kwargs.get('sort'):
       kwargs['sort'] = 'date ASC'
   yield from  list_api(access_token, 'messages', 'messages', **kwargs)
 
def list_policy_events(access_token, **kwargs):
   yield from  list_api(access_token, 'policy_events', 'alert_events', **kwargs)

Similarly, we'll create an abstract framework for retrieving single results from the API. As you can see, the single endpoint functions looks a lot like the early versions of our functions.

def get_api(id, access_token, endpoint, collection_key, **kwargs):
   url = f"https://api.agari.com/v1/ep/{endpoint}/{id}"
 
   querystring = kwargs
 
   headers = {'accept': 'application/json'}
   headers['authorization'] = f'Bearer {access_token}'
 
   response = requests.request("GET", url, headers=headers, params=querystring)
   response_json = response.json()
 
   return response_json[collection_key]
 
def get_message(id, access_token, **kwargs):
   return get_api(id, access_token, 'messages', 'message', **kwargs)
 
def get_policy_event(id, access_token, **kwargs):
   return get_api(id, access_token, 'policy_events', 'alert_event', **kwargs)

Building the Reports

Our end goal is to report on all the messages that matched on some policy, to summarize which policies were matched, and to group the results by final enforcement action on the message.

Retrieving All Messages

Getting a list of messages that match on any policy should now be straightforward. To do so we can use a search query containing the reserved keyword null.

search = "policy_ids is not null"
message_generator = list_messages(access_token, search=search)
messages = [message for message in message_generator]

Adding Matched Policy Names

Our report should be easy to cross-reference with the APD web UI ,so we're going to populate the policy names on each message. Earlier we noted that the list endpoint might not return all the information that a single endpoint returns, and in the case of the messages endpoint the difference that concerns our report has to do with the format in which policy matches are returned and which types of policies are available. For the list messages endpoint policy matches are only available as policy ID numbers, and for the single message endpoint only available as policy names.

In addition, on-demand policies are returned in the list of messages but do not appear on single messages. On-demand policies are policies that you can quickly and easily create based on a message search after the message has been delivered. These differ from real-time policies in that real-time policies are defined beforehand and evaluated only when the message is first delivered.

Grouping Policy Match and Enforcement

We'll start with the list of all policy matching messages that we gathered earlier, and for each one we're going to try a few different methods of populating the matched_policies field. First, we’ll create a cache mapping of policy ID numbers to policy names to minimize the number of API calls we need to make. On cache misses we’ll either query the single message endpoint or query the list policy events with a filter.

🔧

Use the policy events API to adapt to a known limitation

The reason that we may need to query for policy events is due to a known limitation in the API that prevents querying single outbound or internal messages.

from datetime import datetime
 
def policy_events_for_message(message, access_token):
   filter = f'collector_message_id.eq({message["id"]})'
   start_date = message['date']
   end_date = datetime.utcnow().isoformat()
 
   event_generator = list_policy_events(access_token, filter=filter, start_date=start_date, end_date=end_date)
 
   policy_events = [event for event in event_generator]
   return policy_events
 
def populate_matched_policies(messages, access_token):
   cache = {}
 
   def policy_ids_to_names(hashable_policy_ids):
       return cache[hashable_policy_ids]
 
   for message in messages:
       # Sort the policy IDs in case they are unordered
       # Create a tuple so they will be hashable
       hashable_policy_ids = tuple(sorted(message["policy_ids"]))
       try:
           matched_policies = policy_ids_to_names(hashable_policy_ids)
       except KeyError:
           # No match in cache so try getting the single message
           try:
               single_message = get_message(id, access_token)
               matched_policies = single_message["matched_policies"]
           except:
               # Something went wrong getting the message so try by policy_events
               policy_events = policy_events_for_message(message, access_token)
               matched_policies = [event['alert_definition_name'] for event in policy_events]
           cache[hashable_policy_ids] = matched_policies
 
       message["matched_policies"] = matched_policies

Filtering On-Demand Policies

We discussed that On-Demand Policies are not returned for single message queries. In fact, the only place that On-Demand Policies appear in the APD API are in list messages results. The policy_events endpoint only contains real-time policy events. There are two effects this has on our report that worth noting; we will not be able to map individual policy IDs to policy names with certainty, and for messages that show an enforcement_result of error we will not attempt to determine with certainty whether the message has ever been successfully enforced. The second issue may theoretically be solvable, but the complexity of this problem is beyond the scope of our article.

🔧

Scope the report based on the available API endpoints

For these reasons we’re simply going to filter any messages that only matched On-Demand Policies out of our report.

messages = [message for message in message_generator if message['matched_policies']]

Now that we have all of the messages we’re looking for, and we’ve added the policy matches as names, we can create a report that counts the number of messages per policy, grouped by final enforcement action.

from collections import Counter, defaultdict
 
def counts_report(messages):
   counter_rows = defaultdict(Counter)
   for message in messages:
       action = message.get('enforcement_action')
       result =message.get('enforcement_result')
       outcome = [(action, result)]
 
       for policy in message["matched_policies"]:
           counter_rows[policy].update(outcome)
 
       counter_rows['Total'].update(outcome)
 
   potential_outcomes = []
   actions = ['move', 'delete', 'inbox']
   results = ['success', 'error', 'pending']
   for action in actions:
       for result in results:
           potential_outcomes.append((action, result))
 
   headers = ['matched_policy'] + [', '.join(outcome) for outcome in potential_outcomes]
 
   report_rows = []
   for policy, counter in counter_rows.items():
       row = [policy]
       for outcome in potential_outcomes:
           row.append(counter.get(outcome, 0))
       report_rows.append(row)
 
   report_rows.sort(key=lambda row: sum(row[1:]), reverse=True)
 
   report_rows = [headers] + report_rows
   return report_rows

Conclusion

We've walked through how to authenticate to and query the API, how to handle result pagination, covered how to deal with a few edge cases, and processed the results into an aggregate report. Now let's put it all together into one script, add a few final touches, and write our reports to CSV.

Final Script

#!/usr/bin/env python3
# message_policy_events_report.py
import csv
import requests
 
from collections import Counter, defaultdict
from datetime import datetime
from os import environ
 
 
def main():
   client_id = environ.get('CLIENT_ID')
   client_secret = environ.get('CLIENT_SECRET')
 
   access_token = generate_token(client_id, client_secret)
 
   search = "policy_ids is not null"
 
   message_generator = list_messages(access_token, search=search)
   messages = [message for message in message_generator if message['matched_policies']]
 
   populate_matched_policies(messages, access_token)
 
   write_counts_report(messages)
   write_messages_report(messages)
 
def generate_token(client_id, client_secret):
   url = "https://api.agari.com/v1/ep/token"
 
   payload = f"client_id={client_id}&client_secret={client_secret}"
   headers = {
       'accept': "application/json",
       'content-type': "application/x-www-form-urlencoded"
       }
 
   response = requests.request("POST", url, data=payload, headers=headers)
 
   return response.json()['access_token']
 
def list_api(access_token, endpoint, collection_key, **kwargs):
   url = f"https://api.agari.com/v1/ep/{endpoint}"
 
   querystring = kwargs
 
   headers = {'accept': 'application/json'}
   headers['authorization'] = f'Bearer {access_token}'
 
   response = requests.request("GET", url, headers=headers, params=querystring)
   response_json = response.json()
 
   if not response_json['count']:
       return
 
   for message in response_json[collection_key]:
       yield message
 
   querystring['offset'] = querystring.get('offset', 0) + response_json['count']
   yield from list_api(access_token, endpoint, collection_key, **querystring)
 
def list_messages(access_token, **kwargs):
   if not kwargs.get('sort'):
       kwargs['sort'] = 'date ASC'
   yield from  list_api(access_token, 'messages', 'messages', **kwargs)
 
def list_policy_events(access_token, **kwargs):
   yield from  list_api(access_token, 'policy_events', 'alert_events', **kwargs)
 
def get_api(id, access_token, endpoint, collection_key, **kwargs):
   url = f"https://api.agari.com/v1/ep/{endpoint}/{id}"
 
   querystring = kwargs
 
   headers = {'accept': 'application/json'}
   headers['authorization'] = f'Bearer {access_token}'
 
   response = requests.request("GET", url, headers=headers, params=querystring)
   response_json = response.json()
 
   return response_json[collection_key]
 
def get_message(id, access_token, **kwargs):
   return get_api(id, access_token, 'messages', 'message', **kwargs)
 
def get_policy_event(id, access_token, **kwargs):
   return get_api(id, access_token, 'policy_events', 'alert_event', **kwargs)
 
def policy_events_for_message(message, access_token):
   filter = f'collector_message_id.eq({message["id"]})'
   start_date = message['date']
   end_date = datetime.utcnow().isoformat()
 
   event_generator = list_policy_events(access_token, filter=filter, start_date=start_date, end_date=end_date)
 
   policy_events = [event for event in event_generator]
   return policy_events
 
def populate_matched_policies(messages, access_token):
   cache = {}
 
   def policy_ids_to_names(hashable_policy_ids):
       return cache[hashable_policy_ids]
 
   for message in messages:
       # Sort the policy IDs in case they are unordered
       # Create a tuple so they will be hashable
       hashable_policy_ids = tuple(sorted(message["policy_ids"]))
       try:
           matched_policies = policy_ids_to_names(hashable_policy_ids)
       except KeyError:
           # No match in cache so try getting the single message
           try:
               single_message = get_message(id, access_token)
               matched_policies = single_message["matched_policies"]
           except:
               # Something went wrong getting the message so try by policy_events
               policy_events = policy_events_for_message(message, access_token)
               matched_policies = [event['alert_definition_name'] for event in policy_events]
           cache[hashable_policy_ids] = matched_policies
 
       message["matched_policies"] = matched_policies
 
def write_counts_report(messages):
   counter_rows = defaultdict(Counter)
   for message in messages:
       action = message.get('enforcement_action')
       result =message.get('enforcement_result')
       outcome = [(action, result)]
 
       for policy in message["matched_policies"]:
           counter_rows[policy].update(outcome)
 
       counter_rows['Total'].update(outcome)
 
   potential_outcomes = []
   actions = ['move', 'delete', 'inbox']
   results = ['success', 'error', 'pending']
   for action in actions:
       for result in results:
           potential_outcomes.append((action, result))
 
   headers = ['matched_policy'] + [', '.join(outcome) for outcome in potential_outcomes]
 
   report_rows = []
   for policy, counter in counter_rows.items():
       row = [policy]
       for outcome in potential_outcomes:
           row.append(counter.get(outcome, 0))
       report_rows.append(row)
 
   report_rows.sort(key=lambda row: sum(row[1:]), reverse=True)
 
   report_rows = [headers] + report_rows
 
   with open('policy_result_counts.csv', 'w') as f:
       writer = csv.writer(f)
       for row in report_rows:
           writer.writerow(row)
 
def write_messages_report(messages):
   headers = list(set().union(*[message.keys() for message in messages]))
 
   with open('policy_matched_messages.csv', 'w') as f:
       writer = csv.DictWriter(f, headers)
       writer.writeheader()
       for message in messages:
           writer.writerow(message)
 
if __name__ == "__main__":
   main()

Updated 5 months ago


Reporting on Policy Enforcement


Suggested Edits are limited on API Reference Pages

You can only suggest edits to Markdown body content, but not to the API spec.