Send an agregated security report from AWS Security Hub

Table of Contents

Problem statement

AWS Security Hub is an excellent service that collects and aggregates security findings from many AWS services (e.g. Amazon GuardDuty, Amazon Inspector, Patch Manager, AWS Config, Amazon Macie, etc.) and different third-party tools. It works with multi-account environments and provides informative dashboards in AWS Web Console, but some security standards require continuous monitoring of security controls over a three-six month period. How can we prove it? For example, we can provide a history of received reports. Unfortunately, AWS Security Hub does not have the capability to send regular reports in contrast to Palo Alto Prisma or Crowdstrike Falcon so we needed to develop a custom solution for that.

Solution design

An audit account is a standard in the Control Tower implementation and usually, we delegate the administrator of different AWS security tools to it. Then it collects security findings from all member accounts. You can read more about it in these two posts (one link, another one link).

Our requirement was to generate a single Excel file, which contains a separate list for every member AWS account and sorts security findings by severity for better visibility. A new file is generated every week and CISO receives an email with it.

There were a couple of problems:

– some AWS accounts in the organization may have Security Hub intentionally disabled, so we can not just “list-accounts“. We need a list of account where Security Hub is enabled. We create a custom temporary Security Hub insight for that.
– SNS notification can NOT send an attached file. SES is an option for the solution, but this is overhead for us. That’s why we decided to upload an Excel file to S3 and create a pre-signed URL, that will be sent via SNS to the relevant person.

Here is a code of the Lambda function:

import boto3
import datetime
import os
import xlsxwriter
from presigned_url import *
from sns_publish import *

def lambda_handler(event, context):
    creating_xlsx(os.environ['SNS_TOPIC_ARN'], os.environ['INSIGHT_NAME'], os.environ['FINDINGS_S3'])
    
def creating_xlsx(sns_topic_arn, insight_name, findings_s3):
    s3 = boto3.client('s3')
    securityhub_client = boto3.client('securityhub')
    
    ### Creating insight to retrieve Acccount ID list
    custom_insight = securityhub_client.create_insight(Name=insight_name,Filters={},GroupByAttribute='AwsAccountId')
    insight_arn = custom_insight['InsightArn']

    aggregates = securityhub_client.get_insight_results(InsightArn=insight_arn)['InsightResults']['ResultValues']
    group_by_attribute_values = [account['GroupByAttributeValue'] for account in aggregates]

    #### List to store all the Security Findings
    all_findings = []
    
    ### Retrieve all the Security Findings that match the specified filters using pagination
    for account_id in group_by_attribute_values:
        paginator = securityhub_client.get_paginator('get_findings')
        response_iterator = paginator.paginate(
            Filters={
                'ComplianceStatus': [{'Value': 'FAILED','Comparison': 'EQUALS'}],
                'RecordState': [{'Value': 'ACTIVE','Comparison': 'EQUALS'}],
                'AwsAccountId': [{'Value': account_id ,'Comparison': 'EQUALS'}]
            },
            PaginationConfig={
                'PageSize': 100
            }
        )
        
        for response in response_iterator:
            all_findings.extend(response['Findings'])
            
        # findings = all_findings
        sorted_findings = sorted(all_findings, key=lambda x: x['Severity']['Normalized'], reverse=True)
        
        ### Create an XLSX file with the sorted Security Findings in the /tmp directory
        timestamp = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
        filename = f'security_findings_{timestamp}.xlsx'
        xlsx_file = f'/tmp/{filename}'
        
        ### Create a workbook and add a worksheet
        workbook = xlsxwriter.Workbook(xlsx_file)
        
        for account_id in group_by_attribute_values:
            
            worksheet = workbook.add_worksheet(account_id)
            
            ### Adding formats
            worksheet.set_default_row(30) # Sets the default row height to 30 pixels
            worksheet.set_column(0, 0, 15) # 1 column width. Column used for Severity.
            worksheet.set_column(1, 1, 45) # 2 column width. Column used for Title.
            worksheet.set_column(2, 2, 15) # 3 column width. Column used for Account ID.
            worksheet.set_column(3, 3, 140) # 4 column width. Column used for Description.
            worksheet.set_column(4, 4, 25) # 5 column width. Column used for Updated at.
            worksheet.set_column(5, 5, 20) # 6 column width. Column used for Resource Type.
            worksheet.set_column(6, 6, 145) # 7 column width. Column used for Resource ID.
            worksheet.set_column(7, 7, 80) # 8 column width. Column used for Remediation.

            title_format = workbook.add_format({'bold': True, 'border': 1})
            raws_format = workbook.add_format({'text_wrap': True, 'border': 1})
            critical_format = workbook.add_format({'bg_color': '#7d2105', 'border': 1})
            high_format = workbook.add_format({'bg_color': '#ba2e0f', 'border': 1})
            medium_format = workbook.add_format({'bg_color': '#cc6021', 'border': 1})
            low_format = workbook.add_format({'bg_color': '#b49216', 'border': 1}) # Low and Informal
            
            worksheet.write_row(0, 0, ['Severity', 'Title', 'AWS Account ID', 'Description', 'Updated at', 'Resource Type', 'Resource ID', 'Remediation'], title_format)
            current_line = 1
            # Write each Security Finding row
            for i, finding in enumerate(sorted_findings, start=1):
                severity = finding['Severity']['Label']
                
                if finding['AwsAccountId'] != account_id:
                    continue
            
                if severity == 'CRITICAL':
                    # Apply the critical_format to the "Severity" column cell
                    worksheet.write(current_line, 0, severity, critical_format)
                elif severity == 'HIGH':
                    # Apply the high_format to the "Severity" column cell
                    worksheet.write(current_line, 0, severity, high_format)
                elif severity == 'MEDIUM':
                    # Apply the medium_format to the "Severity" column cell
                    worksheet.write(current_line, 0, severity, medium_format)
                else:
                    worksheet.write(current_line, 0, severity, low_format)
                
                try:
                    worksheet.write_url(current_line, 7, finding['Remediation']['Recommendation']['Url'], string=finding['Remediation']['Recommendation']['Text'])
                except KeyError:
                    continue
                worksheet.write_row(current_line, 1, [finding['Title'], finding['AwsAccountId'], finding['Description'], finding['UpdatedAt'], finding['Resources'][0]['Type'], finding['Resources'][0]['Id']], raws_format)
                
                current_line+=1
        # Closing workbook
        workbook.close()

    delete_insight = securityhub_client.delete_insight(InsightArn=insight_arn)
    
    s3.upload_file(xlsx_file, findings_s3, filename)
    
    publish_to_sns(sns_topic_arn, create_presigned_url(os.environ['FINDINGS_S3'], filename))

presigned_url module:

import logging
import boto3
import os
from botocore.exceptions import ClientError

s3_endpoint = "https://s3." + os.environ['AWS_REGION'] + ".amazonaws.com"

def create_presigned_url(bucket_name, object_name, expiration=os.environ['PRESIGNED_URL_TTL']):
    
    s3_client = boto3.client('s3', endpoint_url=s3_endpoint)
    try:
        url = s3_client.generate_presigned_url('get_object',Params={'Bucket': bucket_name, 'Key': object_name}, ExpiresIn=expiration)
    except ClientError as exception:
        logging.error(exception)
        return None
    
    return url

sns_publish module:

import boto3

def publish_to_sns(sns_topic, presigned_url):
    client = boto3.client('sns')
    
    response = client.publish(
        TargetArn=sns_topic,
        Message=f'Good day, \n Please find Security Findings Reports via URL \n\n' + presigned_url,
        Subject='Findings Report',
        MessageStructure='string'
    )

xlsxwriter module can be added as a Lambda layer. Here is a part of Terraform code for it:

### Lambda arhive preparation
resource "null_resource" "install-dependencies" {
  provisioner "local-exec" {
    command = <<EOF
           mkdir ${path.module}/lambda-preparation/python
           pip3 install XlsxWriter==3.1.0 -t ${path.module}/lambda-preparation/python/
           rm -R ${path.module}/lambda-preparation/python/*.dist-info
       EOF
  }
}

### Collect files for lambda
data "archive_file" "lambda-code" {
  type = "zip"
  source_dir  = "${path.module}/lambda-preparation/"
  source_dir  = "${path.module}/lambda-function/code/"
  output_path = "${path.module}/security_hub_exporter.zip"
}
  depends_on = [
    null_resource.install-dependencies
  ]
  
# Lambda layer: xlsxwriter
resource "aws_lambda_layer_version" "xlsxwriter-layer" {
  filename         = "${path.module}/lambda-function/layer/xlsxwriter_layer.zip"
  source_code_hash = filebase64sha256("${path.module}/lambda-function/layer/xlsxwriter_layer.zip")
  layer_name       = "xlsxwritter-layer"
  compatible_runtimes = ["python3.9"]
}

Lambda will be executed by schedule (AWS CloudWatch event rule). Here is a part of Terraform code for it:

variable "cron_expression" {
  type        = string
  default     = "cron(0 11 * * ? *)"
  description = "Cron value to trigger lambda"
}

### Target for CloudWatch Rule
resource "aws_cloudwatch_event_target" "lambda-target" {
  rule      = aws_cloudwatch_event_rule.scheduler.name
  target_id = "lambda"
  arn       = aws_lambda_function.security-hub-export.arn
}

### CloudWatch Rule
resource "aws_cloudwatch_event_rule" "scheduler" {
  name                = "${var.lambda_function_name}-CloudWatch-Rule"
  description         = "Cron to trigger Lambda"
  schedule_expression = var.cron_expression
}

Results and conclusion

The Excel report file looks like this:

“Severity” column is sorted (CRITICAL is on the top)
“Title” – the name of the security finding
“AWS Account ID” (every account is on the separate Excel list)
“Description” – more details about the security finding
“Updated At” – when it was found the last time
“Resource Type” and “Resource ID” – what we actually should fix
“Remediation” – AWS recommendation on how to fix the issue.

So if you need to receive regular reports in order to maintain some security compliance, you don’t need to pay for some extra solutions. You can use AWS native services and achieve the same result.