Deleting default VPCs in all regions for Control Tower managed account
Updated: Oct 20, 2022
Problem statement
When creating an account using Account Factory there is an option in Network Configuration whether to create VPC for governed accounts. In this case if the check-box for Control Tower governed region is unchecked - no VPC will be created and there won't be any default VPC in this region. However there is no way to delete default VPCs for regions where Control Tower is not enabled, see screenshot below:

There is a solution from AWS to address this problem however it does a lot of other things except default VPC deletion - like AWS RAM to share VPC subnets with the newly created account or AWS Firewall Manager to apply security groups to VPCs in the account:
In order to create a more light-weight solution we will introduce a Step Function with one lambda function which will be triggered by Control Tower account creation event and will assume “AWSControlTowerExecution” role in a newly created account and will delete default VPCs in all regions. The idea of creating Step Function with a single lambda function instead of just a lambda function is that in future you may want to expand the functionality of Step Function which is invoked on Control Tower account creation event without adding new EventBridge rules.
Solution design
The solution diagram is:

The CloudTrail event which will invoke Step Function looks like:
{
"detail-type": ["AWS Service Event via CloudTrail"],
"source": ["aws.controltower"],
"detail": {
"serviceEventDetails": {
"createManagedAccountStatus": {
"state": ["SUCCEEDED"]
}
},
"eventName": ["CreateManagedAccount"]
}
}
The whole CloudFormation template including lambda function code (python3.8) can be like following:
Parameters:
CrossAccountRoleName:
Type: String
Default: AWSControlTowerExecution
Description: The cross account role name
Resources:
LambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: 'delete-default-vpc-role'
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: "lambda.amazonaws.com"
Action: "sts:AssumeRole"
Policies:
- PolicyName: 'delete-default-vpc-policy'
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action:
- ec2:DescribeRegions
- sts:AssumeRole
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
StatesExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- states.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: StepFuncExecutionPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "lambda:InvokeFunction"
Resource: "*"
EventRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- events.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: StepFuncExecutionPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "states:StartExecution"
Resource: "*"
StateMachine:
Type: "AWS::StepFunctions::StateMachine"
Properties:
DefinitionString: !Sub |-
{
"Comment": "Delete Default VPC",
"StartAt": "DeleteVPC",
"States": {
"DeleteVPC": {
"Type": "Task",
"Resource": "${Lambda.Arn}",
"Retry": [ {
"ErrorEquals": [ "ClientError" ],
"IntervalSeconds": 20,
"BackoffRate": 1.0,
"MaxAttempts": 500
} ],
"End": true
}
}
}
RoleArn: !GetAtt StatesExecutionRole.Arn
Lambda:
Type: "AWS::Lambda::Function"
Properties:
Environment:
Variables:
CROSS_ACCOUNT_ROLE_NAME: !Ref CrossAccountRoleName
Code:
ZipFile: |
import boto3
import traceback
import time
import os
DEFAULT_ROLE_NAME = 'AWSControlTowerExecution'
CROSS_ACCOUNT_ROLE_NAME = os.environ.get(
'CROSS_ACCOUNT_ROLE_NAME',
DEFAULT_ROLE_NAME)
def delete_vpcs(client, vpc_id):
try:
sg_ids = describe_sec_grp(client, vpc_id)
delete_sec_grp(client, sg_ids)
pcx_ids = describe_pcx(client, vpc_id)
delete_pcx(client, pcx_ids)
subnet_ids = describe_subnets(client, vpc_id)
delete_subnets(client, subnet_ids)
igw_ids = describe_igw(client, vpc_id)
detach_igw(client, vpc_id, igw_ids)
delete_igw(client, igw_ids)
delete = client.delete_vpc(
DryRun=False,
VpcId=vpc_id
)
except:
print('Execution failed while deleting vpc. {}'.format(str(traceback.format_exc())))
def describe_natgws(client, vpc_id):
natgw_list = []
try:
response = client.describe_nat_gateways(
Filters=[
{
'Name': 'vpc-id',
'Values': [
vpc_id
]
}
]
)
for ids in response["NatGateways"]:
natgw_list.append(ids["NatGatewayId"])
return natgw_list
except:
print('Execution failed while describing NAT gateways. {}'.format(str(traceback.format_exc())))
def delete_natgws(client, natgw_ids):
try:
ngw_list = natgw_ids
for id in ngw_list:
delete_response = client.delete_nat_gateway(
NatGatewayId=id
)
except:
print('Execution failed while deleting natgw. {}'.format(str(traceback.format_exc())))
def describe_pcx(client, vpc_id):
pcx_list = []
try:
accepter = client.describe_vpc_peering_connections(
Filters=[
{
'Name': 'accepter-vpc-info.vpc-id',
'Values': [
vpc_id
]
}
]
)['VpcPeeringConnections']
requester = client.describe_vpc_peering_connections(
Filters=[
{
'Name': 'requester-vpc-info.vpc-id',
'Values': [
vpc_id
]
}
]
)['VpcPeeringConnections']
for pcx in accepter:
pcx_list.append(pcx["VpcPeeringConnectionId"])
# print pcx["VpcPeeringConnectionId"]
for pcx in requester:
# print pcx["VpcPeeringConnectionId"]
pcx_list.append(pcx["VpcPeeringConnectionId"])
return pcx_list
except:
print('Execution failed while describing peering connections. {}'.format(str(traceback.format_exc())))
def delete_pcx(client, pcx_ids):
try:
pcx_list = pcx_ids
for id in pcx_list:
delete_response = client.delete_vpc_peering_connection(
DryRun=False,
VpcPeeringConnectionId=id
)
except:
print('Execution failed while deleting pcx. {}'.format(str(traceback.format_exc())))
def describe_igw(client, vpcid):
try:
igw_list = []
vpc_id = vpcid
describe_response = client.describe_internet_gateways(
DryRun=False,
Filters=[
{
'Name': 'attachment.vpc-id',
'Values': [
vpc_id,
]
}
]
)
for ids in describe_response["InternetGateways"]:
igw_list.append(ids["InternetGatewayId"])
return igw_list
except:
print('Execution failed while describing igw. {}'.format(str(traceback.format_exc())))
def detach_igw(client, vpc, igw):
try:
igw_list = igw
vpc_id = vpc
for id in igw_list:
response = client.detach_internet_gateway(
DryRun=False,
InternetGatewayId=id,
VpcId=vpc_id
)
except:
print('Execution failed while detaching igw. {}'.format(str(traceback.format_exc())))
def delete_igw(client, igwid):
try:
igw_list = igwid
for id in igw_list:
delete_response = client.delete_internet_gateway(
DryRun=False,
InternetGatewayId=id
)
except:
print('Execution failed while deleting igw. {}'.format(str(traceback.format_exc())))
print(status_message)
def describe_sec_grp(client, vpcid):
try:
sg_list = []
vpc_id = vpcid
describe_response = client.describe_security_groups(
DryRun=False,
Filters=[
{
'Name': 'vpc-id',
'Values': [
vpc_id,
]
},
]
)
for ids in describe_response["SecurityGroups"]:
sg_list.append(ids["GroupId"])
return sg_list
except:
print('Execution failed while describing sec grp. {}'.format(str(traceback.format_exc())))
def delete_sec_grp(client, sgid):
try:
sg_list = sgid
for id in sg_list:
sg = client.describe_security_groups(
DryRun=False,
GroupIds=[id]
)['SecurityGroups']
for desc in sg:
if desc["Description"] != "default VPC security group":
delete_response = client.delete_security_group(
DryRun=False,
GroupId=id
)
except:
print('Execution failed while deleting sec grp. {}'.format(str(traceback.format_exc())))
def describe_subnets(client, vpcid):
try:
subnet_list = []
vpc_id = vpcid
describe_response = client.describe_subnets(
DryRun=False,
Filters=[
{
'Name': 'vpc-id',
'Values': [
vpc_id,
]
},
]
)