top of page

Deleting default VPCs in all regions for Control Tower managed account

Updated: Oct 20, 2022

Problem statement


When creating an account using Account Factory there is an option in Network Configuration whether to create VPC for governed accounts. In this case if the check-box for Control Tower governed region is unchecked - no VPC will be created and there won't be any default VPC in this region. However there is no way to delete default VPCs for regions where Control Tower is not enabled, see screenshot below:


There is a solution from AWS to address this problem however it does a lot of other things except default VPC deletion - like AWS RAM ​​to share VPC subnets with the newly created account or AWS Firewall Manager to apply security groups to VPCs in the account:

https://aws.amazon.com/blogs/mt/customizing-account-configuration-aws-control-tower-lifecycle-events/


In order to create a more light-weight solution we will introduce a Step Function with one lambda function which will be triggered by Control Tower account creation event and will assume “AWSControlTowerExecution” role in a newly created account and will delete default VPCs in all regions. The idea of creating Step Function with a single lambda function instead of just a lambda function is that in future you may want to expand the functionality of Step Function which is invoked on Control Tower account creation event without adding new EventBridge rules.


Solution design

The solution diagram is:


The CloudTrail event which will invoke Step Function looks like:

{
  "detail-type": ["AWS Service Event via CloudTrail"],
  "source": ["aws.controltower"],
  "detail": {
    "serviceEventDetails": {
      "createManagedAccountStatus": {
        "state": ["SUCCEEDED"]
      }
    },
    "eventName": ["CreateManagedAccount"]
  }
}

The whole CloudFormation template including lambda function code (python3.8) can be like following:


Parameters:
  CrossAccountRoleName:
    Type: String
    Default: AWSControlTowerExecution
    Description: The cross account role name

Resources:
  LambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: 'delete-default-vpc-role'
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: "lambda.amazonaws.com"
            Action: "sts:AssumeRole"
      Policies:
        - PolicyName: 'delete-default-vpc-policy'
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - ec2:DescribeRegions
                  - sts:AssumeRole
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: "*"
  
  StatesExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - states.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: StepFuncExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                Resource: "*"

  EventRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - events.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: StepFuncExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "states:StartExecution"
                Resource: "*"

  StateMachine:
    Type: "AWS::StepFunctions::StateMachine"
    Properties:
      DefinitionString: !Sub |-
        {
          "Comment": "Delete Default VPC",
          "StartAt": "DeleteVPC",
          "States": {
            "DeleteVPC": {
              "Type": "Task",
              "Resource": "${Lambda.Arn}",
              "Retry": [ {
                      "ErrorEquals": [ "ClientError" ],
                      "IntervalSeconds": 20,
                      "BackoffRate": 1.0,
                      "MaxAttempts": 500
                  } ],
              "End": true
            }
          }
        }

      RoleArn: !GetAtt StatesExecutionRole.Arn

  Lambda:
    Type: "AWS::Lambda::Function"
    Properties: 
      Environment:
        Variables:
          CROSS_ACCOUNT_ROLE_NAME: !Ref CrossAccountRoleName
      Code:
        ZipFile: |
          import boto3
          import traceback
          import time
          import os

          DEFAULT_ROLE_NAME = 'AWSControlTowerExecution'

          CROSS_ACCOUNT_ROLE_NAME = os.environ.get(
              'CROSS_ACCOUNT_ROLE_NAME',
              DEFAULT_ROLE_NAME)

          def delete_vpcs(client, vpc_id):
              try:
                  sg_ids = describe_sec_grp(client, vpc_id)
                  delete_sec_grp(client, sg_ids)

                  pcx_ids = describe_pcx(client, vpc_id)
                  delete_pcx(client, pcx_ids)

                  subnet_ids = describe_subnets(client, vpc_id)
                  delete_subnets(client, subnet_ids)

                  igw_ids = describe_igw(client, vpc_id)
                  detach_igw(client, vpc_id, igw_ids)
                  delete_igw(client, igw_ids)

                  delete = client.delete_vpc(
                      DryRun=False,
                      VpcId=vpc_id
                  )
              except:
                  print('Execution failed while deleting vpc. {}'.format(str(traceback.format_exc())))

          def describe_natgws(client, vpc_id):
              natgw_list = []
              try:
                  response = client.describe_nat_gateways(
                      Filters=[
                          {
                              'Name': 'vpc-id',
                              'Values': [
                                  vpc_id
                              ]
                          }
                      ]
                  )

                  for ids in response["NatGateways"]:
                      natgw_list.append(ids["NatGatewayId"])
                  return natgw_list
              except:
                  print('Execution failed while describing NAT gateways. {}'.format(str(traceback.format_exc())))

          def delete_natgws(client, natgw_ids):
              try:
                  ngw_list = natgw_ids
                  for id in ngw_list:
                      delete_response = client.delete_nat_gateway(
                          NatGatewayId=id
                      )
              except:
                  print('Execution failed while deleting natgw. {}'.format(str(traceback.format_exc())))

          def describe_pcx(client, vpc_id):
              pcx_list = []
              try:
                  accepter = client.describe_vpc_peering_connections(
                      Filters=[
                          {
                              'Name': 'accepter-vpc-info.vpc-id',
                              'Values': [
                                  vpc_id
                              ]
                          }
                      ]
                  )['VpcPeeringConnections']

                  requester = client.describe_vpc_peering_connections(
                      Filters=[
                          {
                              'Name': 'requester-vpc-info.vpc-id',
                              'Values': [
                                  vpc_id
                              ]
                          }
                      ]
                  )['VpcPeeringConnections']

                  for pcx in accepter:
                      pcx_list.append(pcx["VpcPeeringConnectionId"])
                      # print pcx["VpcPeeringConnectionId"]
                  for pcx in requester:
                      # print pcx["VpcPeeringConnectionId"]
                      pcx_list.append(pcx["VpcPeeringConnectionId"])
                  return pcx_list
              except:
                  print('Execution failed while describing peering connections. {}'.format(str(traceback.format_exc())))

          def delete_pcx(client, pcx_ids):
              try:
                  pcx_list = pcx_ids
                  for id in pcx_list:
                      delete_response = client.delete_vpc_peering_connection(
                          DryRun=False,
                          VpcPeeringConnectionId=id
                      )
              except:
                  print('Execution failed while deleting pcx. {}'.format(str(traceback.format_exc())))

          def describe_igw(client, vpcid):
              try:
                  igw_list = []
                  vpc_id = vpcid
                  describe_response = client.describe_internet_gateways(
                      DryRun=False,
                      Filters=[
                          {
                              'Name': 'attachment.vpc-id',
                              'Values': [
                                  vpc_id,
                              ]
                          }
                      ]
                  )
                  for ids in describe_response["InternetGateways"]:
                      igw_list.append(ids["InternetGatewayId"])
                  return igw_list
              except:
                  print('Execution failed while describing igw. {}'.format(str(traceback.format_exc())))

          def detach_igw(client, vpc, igw):
              try:
                  igw_list = igw
                  vpc_id = vpc
                  for id in igw_list:
                      response = client.detach_internet_gateway(
                          DryRun=False,
                          InternetGatewayId=id,
                          VpcId=vpc_id
                      )

              except:
                  print('Execution failed while detaching igw. {}'.format(str(traceback.format_exc())))

          def delete_igw(client, igwid):
              try:
                  igw_list = igwid
                  for id in igw_list:
                      delete_response = client.delete_internet_gateway(
                          DryRun=False,
                          InternetGatewayId=id
                      )
              except:
                  print('Execution failed while deleting igw. {}'.format(str(traceback.format_exc())))
                  print(status_message)

          def describe_sec_grp(client, vpcid):
              try:
                  sg_list = []
                  vpc_id = vpcid
                  describe_response = client.describe_security_groups(
                      DryRun=False,
                      Filters=[
                          {
                              'Name': 'vpc-id',
                              'Values': [
                                  vpc_id,
                              ]
                          },
                      ]
                  )
                  for ids in describe_response["SecurityGroups"]:
                      sg_list.append(ids["GroupId"])
                  return sg_list
              except:
                  print('Execution failed while describing sec grp. {}'.format(str(traceback.format_exc())))

          def delete_sec_grp(client, sgid):
              try:
                  sg_list = sgid
                  for id in sg_list:
                      sg = client.describe_security_groups(
                          DryRun=False,
                          GroupIds=[id]
                      )['SecurityGroups']

                      for desc in sg:
                          if desc["Description"] != "default VPC security group":
                              delete_response = client.delete_security_group(
                                  DryRun=False,
                                  GroupId=id
                              )
              except:
                  print('Execution failed while deleting sec grp. {}'.format(str(traceback.format_exc())))

          def describe_subnets(client, vpcid):
              try:
                  subnet_list = []
                  vpc_id = vpcid
                  describe_response = client.describe_subnets(
                      DryRun=False,
                      Filters=[
                          {
                              'Name': 'vpc-id',
                              'Values': [
                                  vpc_id,
                              ]
                          },
                      ]
                  )