Deleting default VPCs in all regions for Control Tower managed account

Updated: Aug 29

Problem statement


When creating an account using Account Factory there is an option in Network Configuration whether to create VPC for governed accounts. In this case if the check-box for Control Tower governed region is unchecked - no VPC will be created and there won't be any default VPC in this region. However there is no way to delete default VPCs for regions where Control Tower is not enabled, see screenshot below:


There is a solution from AWS to address this problem however it does a lot of other things except default VPC deletion - like AWS RAM ​​to share VPC subnets with the newly created account or AWS Firewall Manager to apply security groups to VPCs in the account:

https://aws.amazon.com/blogs/mt/customizing-account-configuration-aws-control-tower-lifecycle-events/


In order to create a more light-weight solution we will introduce a Step Function with one lambda function which will be triggered by Control Tower account creation event and will assume “AWSControlTowerExecution” role in a newly created account and will delete default VPCs in all regions. The idea of creating Step Function with a single lambda function instead of just a lambda function is that in future you may want to expand the functionality of Step Function which is invoked on Control Tower account creation event without adding new EventBridge rules.


Solution design

The solution diagram is:


The CloudTrail event which will invoke Step Function looks like:

{
  "detail-type": ["AWS Service Event via CloudTrail"],
  "source": ["aws.controltower"],
  "detail": {
    "serviceEventDetails": {
      "createManagedAccountStatus": {
        "state": ["SUCCEEDED"]
      }
    },
    "eventName": ["CreateManagedAccount"]
  }
}

The whole CloudFormation template including lambda function code (python3.8) can be like following:


Parameters:
  CrossAccountRoleName:
    Type: String
    Default: AWSControlTowerExecution
    Description: The cross account role name

Resources:
  LambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: 'delete-default-vpc-role'
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: "lambda.amazonaws.com"
            Action: "sts:AssumeRole"
      Policies:
        - PolicyName: 'delete-default-vpc-policy'
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - ec2:DescribeRegions
                  - sts:AssumeRole
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: "*"
  
  StatesExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - states.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: StepFuncExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                Resource: "*"

  EventRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - events.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: StepFuncExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "states:StartExecution"
                Resource: "*"

  StateMachine:
    Type: "AWS::StepFunctions::StateMachine"
    Properties:
      DefinitionString: !Sub |-
        {
          "Comment": "Delete Default VPC",
          "StartAt": "DeleteVPC",
          "States": {
            "DeleteVPC": {
              "Type": "Task",
              "Resource": "${Lambda.Arn}",
              "Retry": [ {
                      "ErrorEquals": [ "ClientError" ],
                      "IntervalSeconds": 20,
                      "BackoffRate": 1.0,
                      "MaxAttempts": 500
                  } ],
              "End": true
            }
          }
        }

      RoleArn: !GetAtt StatesExecutionRole.Arn

  Lambda:
    Type: "AWS::Lambda::Function"
    Properties: 
      Environment:
        Variables:
          CROSS_ACCOUNT_ROLE_NAME: !Ref CrossAccountRoleName
      Code:
        ZipFile: |
          import boto3
          import traceback
          import time
          import os

          DEFAULT_ROLE_NAME = 'AWSControlTowerExecution'

          CROSS_ACCOUNT_ROLE_NAME = os.environ.get(
              'CROSS_ACCOUNT_ROLE_NAME',
              DEFAULT_ROLE_NAME)

          def delete_vpcs(client, vpc_id):
              try:
                  sg_ids = describe_sec_grp(client, vpc_id)
                  delete_sec_grp(client, sg_ids)

                  pcx_ids = describe_pcx(client, vpc_id)
                  delete_pcx(client, pcx_ids)

                  subnet_ids = describe_subnets(client, vpc_id)
                  delete_subnets(client, subnet_ids)

                  igw_ids = describe_igw(client, vpc_id)
                  detach_igw(client, vpc_id, igw_ids)
                  delete_igw(client, igw_ids)

                  delete = client.delete_vpc(
                      DryRun=False,
                      VpcId=vpc_id
                  )
              except:
                  print('Execution failed while deleting vpc. {}'.format(str(traceback.format_exc())))

          def describe_natgws(client, vpc_id):
              natgw_list = []
              try:
                  response = client.describe_nat_gateways(
                      Filters=[
                          {
                              'Name': 'vpc-id',
                              'Values': [
                                  vpc_id
                              ]
                          }
                      ]
                  )

                  for ids in response["NatGateways"]:
                      natgw_list.append(ids["NatGatewayId"])
                  return natgw_list
              except:
                  print('Execution failed while describing NAT gateways. {}'.format(str(traceback.format_exc())))

          def delete_natgws(client, natgw_ids):
              try:
                  ngw_list = natgw_ids
                  for id in ngw_list:
                      delete_response = client.delete_nat_gateway(
                          NatGatewayId=id
                      )
              except:
                  print('Execution failed while deleting natgw. {}'.format(str(traceback.format_exc())))

          def describe_pcx(client, vpc_id):
              pcx_list = []
              try:
                  accepter = client.describe_vpc_peering_connections(
                      Filters=[
                          {
                              'Name': 'accepter-vpc-info.vpc-id',
                              'Values': [
                                  vpc_id
                              ]
                          }
                      ]
                  )['VpcPeeringConnections']

                  requester = client.describe_vpc_peering_connections(
                      Filters=[
                          {
                              'Name': 'requester-vpc-info.vpc-id',
                              'Values': [
                                  vpc_id
                              ]
                          }
                      ]
                  )['VpcPeeringConnections']

                  for pcx in accepter:
                      pcx_list.append(pcx["VpcPeeringConnectionId"])
                      # print pcx["VpcPeeringConnectionId"]
                  for pcx in requester:
                      # print pcx["VpcPeeringConnectionId"]
                      pcx_list.append(pcx["VpcPeeringConnectionId"])
                  return pcx_list
              except:
                  print('Execution failed while describing peering connections. {}'.format(str(traceback.format_exc())))

          def delete_pcx(client, pcx_ids):
              try:
                  pcx_list = pcx_ids
                  for id in pcx_list:
                      delete_response = client.delete_vpc_peering_connection(
                          DryRun=False,
                          VpcPeeringConnectionId=id
                      )
              except:
                  print('Execution failed while deleting pcx. {}'.format(str(traceback.format_exc())))

          def describe_igw(client, vpcid):
              try:
                  igw_list = []
                  vpc_id = vpcid
                  describe_response = client.describe_internet_gateways(
                      DryRun=False,
                      Filters=[
                          {
                              'Name': 'attachment.vpc-id',
                              'Values': [
                                  vpc_id,
                              ]
                          }
                      ]
                  )
                  for ids in describe_response["InternetGateways"]:
                      igw_list.append(ids["InternetGatewayId"])
                  return igw_list
              except:
                  print('Execution failed while describing igw. {}'.format(str(traceback.format_exc())))

          def detach_igw(client, vpc, igw):
              try:
                  igw_list = igw
                  vpc_id = vpc
                  for id in igw_list:
                      response = client.detach_internet_gateway(
                          DryRun=False,
                          InternetGatewayId=id,
                          VpcId=vpc_id
                      )

              except:
                  print('Execution failed while detaching igw. {}'.format(str(traceback.format_exc())))

          def delete_igw(client, igwid):
              try:
                  igw_list = igwid
                  for id in igw_list:
                      delete_response = client.delete_internet_gateway(
                          DryRun=False,
                          InternetGatewayId=id
                      )
              except:
                  print('Execution failed while deleting igw. {}'.format(str(traceback.format_exc())))
                  print(status_message)

          def describe_sec_grp(client, vpcid):
              try:
                  sg_list = []
                  vpc_id = vpcid
                  describe_response = client.describe_security_groups(
                      DryRun=False,
                      Filters=[
                          {
                              'Name': 'vpc-id',
                              'Values': [
                                  vpc_id,
                              ]
                          },
                      ]
                  )
                  for ids in describe_response["SecurityGroups"]:
                      sg_list.append(ids["GroupId"])
                  return sg_list
              except:
                  print('Execution failed while describing sec grp. {}'.format(str(traceback.format_exc())))

          def delete_sec_grp(client, sgid):
              try:
                  sg_list = sgid
                  for id in sg_list:
                      sg = client.describe_security_groups(
                          DryRun=False,
                          GroupIds=[id]
                      )['SecurityGroups']

                      for desc in sg:
                          if desc["Description"] != "default VPC security group":
                              delete_response = client.delete_security_group(
                                  DryRun=False,
                                  GroupId=id
                              )
              except:
                  print('Execution failed while deleting sec grp. {}'.format(str(traceback.format_exc())))

          def describe_subnets(client, vpcid):
              try:
                  subnet_list = []
                  vpc_id = vpcid
                  describe_response = client.describe_subnets(
                      DryRun=False,
                      Filters=[
                          {
                              'Name': 'vpc-id',
                              'Values': [
                                  vpc_id,
                              ]
                          },
                      ]
                  )
                  for ids in describe_response["Subnets"]:
                      subnet_list.append(ids["SubnetId"])
                  return subnet_list
              except:
                  print('Execution failed while describing subnets. {}'.format(str(traceback.format_exc())))

          def delete_subnets(client, subnet_ids):
              try:
                  subnet_id_list = subnet_ids
                  for id in subnet_id_list:
                      delete = client.delete_subnet(
                          DryRun=False,
                          SubnetId=id
                      )
              except:
                  print('Execution failed while deleting subnets. {}'.format(str(traceback.format_exc())))

          def lambda_handler(event, context):
              lambda_result = {
                  "DefaultVPCs": [
                  ]
              }
              account_id = event["detail"]["serviceEventDetails"]["createManagedAccountStatus"]["account"]["accountId"]
              print('Deleting DefaultVPCs...Account:{0}'.format(account_id))
              client = boto3.client('ec2')
              awsregions = client.describe_regions()['Regions']
              #Assume role to the newly account.
              sts = boto3.client('sts')
              role = sts.assume_role(
                  RoleArn="arn:aws:iam::{aid}:role/{name}".format(
                      aid=account_id,
                      name=CROSS_ACCOUNT_ROLE_NAME
                  ),
                  RoleSessionName='DeleteDefaultVPC'
              )

              ec2 = boto3.client('ec2', 
                  region_name=region["RegionName"],
                  aws_access_key_id=role['Credentials']['AccessKeyId'],
                  aws_secret_access_key=role['Credentials']['SecretAccessKey'],
                  aws_session_token=role['Credentials']['SessionToken']
              )
              for region in awsregions:
                  defaultvpc = ec2.describe_vpcs(
                      Filters=[
                          {
                              'Name': 'isDefault',
                              'Values': [
                                  'true',
                              ]
                          }
                      ]
                  )['Vpcs']

                  for vpc in defaultvpc:
                      if vpc:
                          print('Removing default VPC in {}'.format(region["RegionName"]))
                          delete_vpcs(ec2, vpc["VpcId"])

                          lambda_result["DefaultVPCs"].append(
                              {
                                  "VpcId": vpc["VpcId"],
                                  "Region": region["RegionName"]
                              }
                          )
              for default in lambda_result["DefaultVPCs"]:
                  if default["Region"]:
                      print("DEFAULT_VPC_FOUND")
                      return lambda_result

              print("NO_DEFAULT_VPCS_FOUND")
              return 0


      Description: This Lambda deletes all default vpcs.
      FunctionName: delete-default-vpc-lambda
      Handler: index.lambda_handler
      MemorySize: 128
      Role: !GetAtt LambdaRole.Arn
      Runtime: python3.8
      Timeout: 120

  CloudwatchEventsRule:
    Type: AWS::Events::Rule
    Properties:
      Description: Triggers SFN on CT account creation
      EventPattern:
        source:
          - aws.controltower
        detail-type:
          - AWS Service Event via CloudTrail
        detail:
          eventName:
            - CreateManagedAccount
          serviceEventDetails:
            createManagedAccountStatus:
              state: ["SUCCEEDED"]
      State: "ENABLED"
      Targets:
        - Arn: !Ref StateMachine
          RoleArn: !GetAtt EventRole.Arn
          Id: InvokeStateMachine

There is a parameter called “CrossAccountRoleName” - it is a name of a role which will be assumed by the lambda function in order to perform operations on newly created account. The default value is “AWSControlTowerExecution”.

In order to make CloudFormation shorter the lambda function code can be placed in the S3 bucket.


Now, when new account is created using Control Tower Account Factory, the Step Function will be invoked and default VPCs in all regions will be deleted:


The solution can be used even in a generic Organization without Control Tower - the only prerequisite is having a cross-account role in the new account. The EventBridge event which will trigger the automation can be a movement of an account to OU, e.g.:

{
  "source": ["aws.organizations"],
  "Detail”: {
    “eventSource”: [“organizations.amazonaws.com”],
    “eventName”: [“MoveAccount”]

}

Conclusion

Control Tower account factory gives the possibility to roll out new accounts easily however there is no way to create an account without default VPCs in regions where Control Tower is not enabled. There is a solution from AWS which addresses this however it performs much more actions except just default VPC deletion. The solution above is a light-weight implementation of default VPC deletion without any additional actions.