gtat-tech-career-kickstarte.../setup_utils/provision_aws.py

1090 lines
36 KiB
Python

#!/usr/bin/env python3
"""
AWS infrastructure provisioning for the Tech Career Kickstarter program.
Creates EC2 instances (1 UAT + N dev machines) with all required software,
SSH keys, and environment variables pre-configured via user-data scripts.
Usage:
# Provision infrastructure
python provision_aws.py --vpc-id vpc-xxx --subnet-id subnet-xxx --dev-count 5
# Custom username prefix
python provision_aws.py --vpc-id vpc-xxx --subnet-id subnet-xxx --dev-count 3 --username-prefix team
# Destroy all resources
python provision_aws.py --destroy
"""
import argparse
import base64
import json
import logging
import os
import re
import secrets
import stat
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Optional
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger("provision_aws")
UAT_DEPLOYMENT_DIR = "/srv/deployments"
UAT_TESTS_DIR_SUFFIX = "gtat-tech-career-kickstarter/solution/tests"
DEPLOY_GROUP = "ck-deploy"
DEPLOYER_USER = "deployer"
SCRIPT_DIR = Path(__file__).resolve().parent
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Provision AWS infrastructure for Tech Career Kickstarter"
)
parser.add_argument("--vpc-id", type=str, help="VPC ID")
parser.add_argument("--subnet-id", type=str, help="Subnet ID")
parser.add_argument(
"--ami-id",
type=str,
default="ami-086b9c66ec8e1126a",
help="EC2 AMI ID (default: ami-086b9c66ec8e1126a, RHEL 9 free tier)",
)
parser.add_argument(
"--dev-count", type=int, default=1, help="Number of dev instances (default: 1)"
)
parser.add_argument(
"--uat-instance-type",
type=str,
default="c6i.4xlarge",
help="EC2 instance type for the UAT instance (default: c6i.4xlarge)",
)
parser.add_argument(
"--dev-instance-type",
type=str,
default="c6i.xlarge",
help="EC2 instance type for dev instances (default: c6i.xlarge)",
)
parser.add_argument(
"--username-prefix",
type=str,
default="dev",
help="Prefix for dev instance usernames, e.g. 'dev' creates dev-1, dev-2 (default: dev)",
)
parser.add_argument(
"--destroy", action="store_true", help="Destroy all tagged resources"
)
parser.add_argument(
"--project-tag",
type=str,
default="career-kickstarter",
help="Project tag value (default: career-kickstarter)",
)
parser.add_argument(
"--uat-repo-url", type=str, default="https://github.com/optiver-external/gtat-tech-career-kickstarter.git", help="Git URL for UAT repo (solution + tests)"
)
parser.add_argument(
"--template-repo-url", type=str, default="https://github.com/optiver-external/gtat-tech-career-kickstarter-challenge.git", help="Git URL for template repo (cloned on dev machines)"
)
parser.add_argument(
"--region", type=str, default="eu-central-1", help="AWS region (default: eu-central-1)"
)
parser.add_argument(
"--key-name",
type=str,
default="id_optivex",
help="EC2 key pair name (default: id_optivex)",
)
args = parser.parse_args()
if not args.destroy:
if not args.vpc_id:
parser.error("--vpc-id is required when not using --destroy")
if not args.subnet_id:
parser.error("--subnet-id is required when not using --destroy")
if args.dev_count < 1:
parser.error("--dev-count must be >= 1")
return args
# ---------------------------------------------------------------------------
# Tagging helper
# ---------------------------------------------------------------------------
def get_tag_specifications(
resource_type: str, project_tag: str, name: str, role: str = ""
) -> list:
tags = [
{"Key": "Project", "Value": project_tag},
{"Key": "Name", "Value": name},
{"Key": "ManagedBy", "Value": "provision_aws.py"},
]
if role:
tags.append({"Key": "Role", "Value": role})
return [{"ResourceType": resource_type, "Tags": tags}]
# ---------------------------------------------------------------------------
# Key pair management
# ---------------------------------------------------------------------------
def find_existing_key_pair(ec2_client, key_name: str) -> bool:
try:
ec2_client.describe_key_pairs(KeyNames=[key_name])
return True
except ClientError:
return False
def create_key_pair(ec2_client, key_name: str, project_tag: str) -> str:
"""Create an EC2 key pair and return the private key material."""
response = ec2_client.create_key_pair(
KeyName=key_name,
KeyType="rsa",
TagSpecifications=get_tag_specifications("key-pair", project_tag, key_name),
)
logger.info(f"Created key pair: {key_name}")
return response["KeyMaterial"]
def _restrict_file_permissions(path) -> None:
"""Restrict a file to owner-only access (cross-platform)."""
if os.name == "nt":
# Windows: remove inherited ACLs, grant only current user full control
import subprocess as _sp
username = os.environ.get("USERNAME", os.environ.get("USER", ""))
_sp.run(
["icacls", str(path), "/inheritance:r", "/grant:r", f"{username}:(R,W)"],
capture_output=True,
)
else:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
def save_private_key(private_key_material: str, key_name: str) -> Path:
key_path = Path(f"./{key_name}.pem")
key_path.write_text(private_key_material)
_restrict_file_permissions(key_path)
logger.info(f"Saved private key to {key_path}")
return key_path
def delete_key_pair(ec2_client, key_name: str) -> None:
try:
ec2_client.describe_key_pairs(KeyNames=[key_name])
logger.info(f"Deleting key pair: {key_name}")
ec2_client.delete_key_pair(KeyName=key_name)
except ClientError as e:
if "InvalidKeyPair.NotFound" in str(e):
logger.info(f"Key pair {key_name} not found, skipping.")
else:
raise
# ---------------------------------------------------------------------------
# Security group
# ---------------------------------------------------------------------------
def find_existing_security_groups(
ec2_client, vpc_id: str, project_tag: str
) -> tuple[Optional[str], Optional[str]]:
"""Find existing UAT and Dev security groups. Returns (uat_sg_id, dev_sg_id)."""
response = ec2_client.describe_security_groups(
Filters=[
{"Name": "tag:Project", "Values": [project_tag]},
{"Name": "vpc-id", "Values": [vpc_id]},
]
)
uat_sg_id = None
dev_sg_id = None
for sg in response["SecurityGroups"]:
name = sg.get("GroupName", "")
if "uat-sg" in name:
uat_sg_id = sg["GroupId"]
logger.info(f"Found existing UAT security group: {uat_sg_id}")
elif "dev-sg" in name:
dev_sg_id = sg["GroupId"]
logger.info(f"Found existing Dev security group: {dev_sg_id}")
return uat_sg_id, dev_sg_id
def create_security_groups(
ec2_client, vpc_id: str, project_tag: str
) -> tuple[str, str]:
"""Create UAT and Dev security groups with dev-instance isolation.
Returns (uat_sg_id, dev_sg_id).
"""
# ---- UAT security group ----
uat_sg_name = f"ck-{project_tag}-uat-sg"
response = ec2_client.create_security_group(
GroupName=uat_sg_name,
Description="Career Kickstarter - UAT instance",
VpcId=vpc_id,
TagSpecifications=get_tag_specifications(
"security-group", project_tag, uat_sg_name
),
)
uat_sg_id = response["GroupId"]
logger.info(f"Created UAT security group: {uat_sg_id}")
# ---- Dev security group ----
dev_sg_name = f"ck-{project_tag}-dev-sg"
response = ec2_client.create_security_group(
GroupName=dev_sg_name,
Description="Career Kickstarter - Dev instances (isolated)",
VpcId=vpc_id,
TagSpecifications=get_tag_specifications(
"security-group", project_tag, dev_sg_name
),
)
dev_sg_id = response["GroupId"]
logger.info(f"Created Dev security group: {dev_sg_id}")
# ---- UAT SG ingress rules ----
ec2_client.authorize_security_group_ingress(
GroupId=uat_sg_id,
IpPermissions=[
# SSH from the internet
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [
{"CidrIp": "0.0.0.0/0", "Description": "SSH from internet"}
],
},
# SSH from dev instances (deploy.sh)
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"UserIdGroupPairs": [
{"GroupId": dev_sg_id, "Description": "SSH from dev instances"}
],
},
],
)
# ---- Dev SG ingress rules ----
ec2_client.authorize_security_group_ingress(
GroupId=dev_sg_id,
IpPermissions=[
# SSH from the internet
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [
{"CidrIp": "0.0.0.0/0", "Description": "SSH from internet"}
],
},
# SSH from UAT (SCP test results back)
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"UserIdGroupPairs": [
{"GroupId": uat_sg_id, "Description": "SSH from UAT (results)"}
],
},
],
)
logger.info(f"Configured ingress rules for UAT ({uat_sg_id}) and Dev ({dev_sg_id})")
return uat_sg_id, dev_sg_id
# ---------------------------------------------------------------------------
# Password generation
# ---------------------------------------------------------------------------
def generate_password() -> str:
"""Generate a random URL-safe password."""
return secrets.token_urlsafe(12)
# ---------------------------------------------------------------------------
# SSH public key derivation
# ---------------------------------------------------------------------------
def derive_public_key(private_key_material: str) -> str:
"""Derive an SSH public key from PEM private key material."""
fd, tmp_path = tempfile.mkstemp(suffix=".pem")
try:
os.write(fd, private_key_material.encode())
os.close(fd)
_restrict_file_permissions(tmp_path)
result = subprocess.run(
["ssh-keygen", "-y", "-f", tmp_path],
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
finally:
# Retry unlink on Windows where antivirus may hold a brief lock
for attempt in range(3):
try:
os.unlink(tmp_path)
break
except PermissionError:
if attempt < 2:
time.sleep(0.5)
else:
logger.warning(f"Could not delete temp file: {tmp_path}")
# ---------------------------------------------------------------------------
# IAM roles and instance profiles
# ---------------------------------------------------------------------------
EC2_TRUST_POLICY = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "ec2.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
)
UAT_ROLE_POLICY = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:DescribeInstances",
"Resource": "*",
}
],
}
)
def _uat_iam_names(project_tag: str) -> tuple[str, str]:
"""Return (role_name, profile_name) for the UAT instance."""
prefix = f"ck-{project_tag}"
return f"{prefix}-uat-role", f"{prefix}-uat-profile"
def find_existing_uat_profile(
iam_client, project_tag: str
) -> Optional[str]:
"""Return the UAT instance profile ARN if it already exists."""
_, profile_name = _uat_iam_names(project_tag)
try:
resp = iam_client.get_instance_profile(InstanceProfileName=profile_name)
arn = resp["InstanceProfile"]["Arn"]
logger.info(f"Found existing UAT instance profile: {arn}")
return arn
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchEntity":
raise
return None
def create_uat_iam_resources(iam_client, project_tag: str) -> str:
"""Create IAM role and instance profile for the UAT instance.
Dev instances intentionally get no IAM profile (least-privilege).
Returns the UAT instance profile ARN.
"""
role_name, profile_name = _uat_iam_names(project_tag)
tags = [
{"Key": "Project", "Value": project_tag},
{"Key": "ManagedBy", "Value": "provision_aws"},
]
iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=EC2_TRUST_POLICY,
Description="Career Kickstarter - UAT instance role",
Tags=tags,
)
iam_client.put_role_policy(
RoleName=role_name,
PolicyName="ec2-describe-instances",
PolicyDocument=UAT_ROLE_POLICY,
)
iam_client.create_instance_profile(
InstanceProfileName=profile_name, Tags=tags
)
iam_client.add_role_to_instance_profile(
InstanceProfileName=profile_name, RoleName=role_name
)
resp = iam_client.get_instance_profile(InstanceProfileName=profile_name)
arn = resp["InstanceProfile"]["Arn"]
logger.info(f"Created UAT role ({role_name}) and profile ({arn})")
# IAM is eventually consistent — wait for profile to propagate
logger.info("Waiting 10s for IAM instance profile to propagate...")
time.sleep(10)
return arn
def destroy_iam_resources(iam_client, project_tag: str) -> None:
"""Delete the UAT IAM role and instance profile."""
role_name, profile_name = _uat_iam_names(project_tag)
# Remove role from instance profile, then delete profile
try:
iam_client.remove_role_from_instance_profile(
InstanceProfileName=profile_name, RoleName=role_name
)
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchEntity":
raise
try:
iam_client.delete_instance_profile(InstanceProfileName=profile_name)
logger.info(f"Deleted instance profile: {profile_name}")
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchEntity":
raise
# Delete inline policies, then delete role
try:
policies = iam_client.list_role_policies(RoleName=role_name)
for policy_name in policies.get("PolicyNames", []):
iam_client.delete_role_policy(
RoleName=role_name, PolicyName=policy_name
)
iam_client.delete_role(RoleName=role_name)
logger.info(f"Deleted IAM role: {role_name}")
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchEntity":
raise
# ---------------------------------------------------------------------------
# User-data generation
# ---------------------------------------------------------------------------
def generate_uat_user_data(
team_deploy_keys: list[dict],
results_key_b64: str,
results_key_name: str,
uat_repo_url: str,
template_repo_url: str,
restricted_deploy_script_content: str,
project_tag: str,
region: str,
) -> str:
# Build authorized_keys entries — one per team with forced command
authorized_keys_lines = []
team_dir_lines = []
for entry in team_deploy_keys:
dev_name = entry["dev_name"]
pub_key = entry["public_key"]
ak_line = (
f'command="/usr/local/bin/restricted_deploy.sh {dev_name}",'
f"no-port-forwarding,no-agent-forwarding,"
f"no-X11-forwarding,no-pty "
f"{pub_key}"
)
authorized_keys_lines.append(ak_line)
team_dir_lines.append(
f"mkdir -p {UAT_DEPLOYMENT_DIR}/{dev_name}\n"
f"chown {DEPLOYER_USER}:{DEPLOY_GROUP} {UAT_DEPLOYMENT_DIR}/{dev_name}\n"
f"chmod 2775 {UAT_DEPLOYMENT_DIR}/{dev_name}"
)
authorized_keys_block = "\n".join(authorized_keys_lines)
team_dirs_block = "\n".join(team_dir_lines)
return f"""#!/bin/bash
set -ex
exec > /var/log/user-data.log 2>&1
# Install system packages
dnf config-manager --set-enabled codeready-builder-for-rhel-9-rhui-rpms
dnf install -y git protobuf-compiler python3 pip openssh-clients awscli
# Install uv, then use uv to get Python 3.13
curl -LsSf https://astral.sh/uv/install.sh | HOME=/home/ec2-user sh
export PATH="/home/ec2-user/.local/bin:$PATH"
cp /home/ec2-user/.local/bin/uv /usr/local/bin/ 2>/dev/null || true
uv python install 3.13
# ---- Shared deployment group & directory ----
groupadd {DEPLOY_GROUP}
mkdir -p {UAT_DEPLOYMENT_DIR}
# ---- Install restricted deploy script ----
cat > /usr/local/bin/restricted_deploy.sh <<'RESTRICTEDEOF'
{restricted_deploy_script_content}
RESTRICTEDEOF
chmod 755 /usr/local/bin/restricted_deploy.sh
# ---- Restricted deployer user (dev machines SSH as this user) ----
useradd -m -s /bin/bash -G {DEPLOY_GROUP} {DEPLOYER_USER}
chown {DEPLOYER_USER}:{DEPLOY_GROUP} {UAT_DEPLOYMENT_DIR}
chmod 2775 {UAT_DEPLOYMENT_DIR}
# ---- Pre-create per-team directories ----
{team_dirs_block}
# ---- Authorized keys with per-team forced command restrictions ----
DEPLOYER_SSH="/home/{DEPLOYER_USER}/.ssh"
mkdir -p "$DEPLOYER_SSH"
cat > "$DEPLOYER_SSH/authorized_keys" <<'AKEOF'
{authorized_keys_block}
AKEOF
chmod 700 "$DEPLOYER_SSH"
chmod 600 "$DEPLOYER_SSH/authorized_keys"
chown -R {DEPLOYER_USER}:{DEPLOYER_USER} "$DEPLOYER_SSH"
# ---- ec2-user setup (program manager only) ----
usermod -aG {DEPLOY_GROUP} ec2-user
# Lock down ec2-user home so deployer/participants cannot read test cases
chmod 700 /home/ec2-user
# Results key: ec2-user uses this to SCP test results back to dev machines
mkdir -p /home/ec2-user/.ssh
echo '{results_key_b64}' | base64 -d > /home/ec2-user/.ssh/{results_key_name}
chmod 600 /home/ec2-user/.ssh/{results_key_name}
# SSH config for ec2-user: use results key by default
cat > /home/ec2-user/.ssh/config <<'SSHEOF'
Host *
IdentityFile ~/.ssh/{results_key_name}
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
SSHEOF
chmod 600 /home/ec2-user/.ssh/config
chown -R ec2-user:ec2-user /home/ec2-user/.ssh
# Environment variables
cat >> /home/ec2-user/.bashrc <<'ENVEOF'
export CK_DEPLOYMENT_DIR={UAT_DEPLOYMENT_DIR}
export CK_TESTS_DIR=/home/ec2-user/{UAT_TESTS_DIR_SUFFIX}
export CK_UAT_REPO_URL={uat_repo_url}
export CK_TEMPLATE_REPO_URL={template_repo_url}
export CK_PROJECT_TAG={project_tag}
export CK_AWS_REGION={region}
export PATH=$HOME/.local/bin:$PATH
ENVEOF
touch /tmp/user-data-complete
"""
def generate_dev_user_data(
deploy_key_b64: str,
deploy_key_name: str,
results_key_b64: str,
results_key_name: str,
dev_name: str,
dev_password: str,
uat_private_ip: str,
deploy_script_content: str,
) -> str:
return f"""#!/bin/bash
set -ex
exec > /var/log/user-data.log 2>&1
DEV_NAME="{dev_name}"
DEV_PASSWORD="{dev_password}"
# Install system packages
dnf config-manager --set-enabled codeready-builder-for-rhel-9-rhui-rpms
dnf install -y git protobuf-compiler python3 pip openssh-clients
# Install uv + Python 3.13
curl -LsSf https://astral.sh/uv/install.sh | HOME=/home/ec2-user sh
export PATH="/home/ec2-user/.local/bin:$PATH"
cp /home/ec2-user/.local/bin/uv /usr/local/bin/ 2>/dev/null || true
uv python install 3.13
# Create team user account with password
useradd -m -s /bin/bash "$DEV_NAME"
echo "$DEV_NAME:$DEV_PASSWORD" | chpasswd
# Enable password-based SSH authentication
sed -i 's/^PasswordAuthentication no/PasswordAuthentication yes/' /etc/ssh/sshd_config
sed -i 's/^#PasswordAuthentication yes/PasswordAuthentication yes/' /etc/ssh/sshd_config
# Also check sshd_config.d/ drop-in files
find /etc/ssh/sshd_config.d/ -name '*.conf' -exec sed -i 's/^PasswordAuthentication no/PasswordAuthentication yes/' {{}} \\;
systemctl restart sshd
# ---- Deploy key: used by deploy.sh to SCP to UAT ----
DEV_HOME="/home/$DEV_NAME"
SSH_DIR="$DEV_HOME/.ssh"
mkdir -p "$SSH_DIR"
echo '{deploy_key_b64}' | base64 -d > "$SSH_DIR/{deploy_key_name}"
chmod 600 "$SSH_DIR/{deploy_key_name}"
# ---- Results key: authorize UAT to SCP test results back ----
# Only the results key public half goes into authorized_keys.
# The deploy key is NOT authorized here, so other dev machines cannot SSH in.
echo '{results_key_b64}' | base64 -d > /tmp/results_key_tmp
chmod 600 /tmp/results_key_tmp
ssh-keygen -y -f /tmp/results_key_tmp >> "$SSH_DIR/authorized_keys"
rm -f /tmp/results_key_tmp
cat > "$SSH_DIR/config" <<'SSHEOF'
Host *
IdentityFile ~/.ssh/{deploy_key_name}
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
SSHEOF
chmod 600 "$SSH_DIR/config" "$SSH_DIR/authorized_keys"
chown -R "$DEV_NAME:$DEV_NAME" "$SSH_DIR"
# Set hostname
hostnamectl set-hostname "$DEV_NAME"
# Environment variables (CK_UAT_USER=deployer: restricted user on UAT)
cat >> "$DEV_HOME/.bashrc" <<ENVEOF
export CK_UAT_USER={DEPLOYER_USER}
export CK_UAT_HOST={uat_private_ip}
export CK_UAT_DEPLOYMENT_DIR={UAT_DEPLOYMENT_DIR}
export CK_DEV_NAME=$DEV_NAME
export CK_DEPLOY_KEY_NAME={deploy_key_name}
export PATH=\\$HOME/.local/bin:/usr/local/bin:\\$PATH
ENVEOF
# Install deploy.sh
cat > /usr/local/bin/deploy.sh <<'DEPLOYSCRIPT'
{deploy_script_content}
DEPLOYSCRIPT
chmod +x /usr/local/bin/deploy.sh
touch /tmp/user-data-complete
"""
# ---------------------------------------------------------------------------
# Instance launching
# ---------------------------------------------------------------------------
def launch_instance(
ec2_resource,
ami_id: str,
instance_type: str,
subnet_id: str,
sg_id: str,
key_name: str,
user_data: str,
project_tag: str,
name: str,
role: str,
iam_instance_profile: str = "",
):
"""Launch a single EC2 instance and return it."""
kwargs = dict(
ImageId=ami_id,
InstanceType=instance_type,
KeyName=key_name,
MinCount=1,
MaxCount=1,
UserData=user_data,
TagSpecifications=get_tag_specifications(
"instance", project_tag, name, role=role
),
NetworkInterfaces=[
{
"DeviceIndex": 0,
"SubnetId": subnet_id,
"Groups": [sg_id],
"AssociatePublicIpAddress": True,
}
],
)
if iam_instance_profile:
# Accept either an ARN or a profile name
if iam_instance_profile.startswith("arn:"):
kwargs["IamInstanceProfile"] = {"Arn": iam_instance_profile}
else:
kwargs["IamInstanceProfile"] = {"Name": iam_instance_profile}
instances = ec2_resource.create_instances(**kwargs)
return instances[0]
# ---------------------------------------------------------------------------
# Destroy / teardown
# ---------------------------------------------------------------------------
def destroy_resources(
ec2_client, ec2_resource, iam_client, project_tag: str
) -> None:
"""Destroy all resources tagged with the given project tag."""
logger.info(f"Destroying all resources tagged with Project={project_tag}...")
# Step 1: Terminate tagged instances
_terminate_tagged_instances(ec2_client, project_tag)
# Step 2: Delete tagged security groups
_delete_tagged_security_groups(ec2_client, project_tag)
# Step 3: Delete all key pairs tagged with our project tag
response = ec2_client.describe_key_pairs(
Filters=[{"Name": "tag:Project", "Values": [project_tag]}]
)
for kp in response["KeyPairs"]:
kn = kp["KeyName"]
delete_key_pair(ec2_client, kn)
local_key_path = Path(f"./{kn}.pem")
if local_key_path.exists():
local_key_path.unlink()
logger.info(f"Deleted local key file: {local_key_path}")
# Step 4: Delete IAM roles and instance profiles
destroy_iam_resources(iam_client, project_tag)
logger.info("All resources destroyed.")
def _terminate_tagged_instances(ec2_client, project_tag: str) -> None:
response = ec2_client.describe_instances(
Filters=[
{"Name": "tag:Project", "Values": [project_tag]},
{
"Name": "instance-state-name",
"Values": ["running", "stopped", "pending"],
},
]
)
instance_ids = []
for reservation in response["Reservations"]:
for instance in reservation["Instances"]:
instance_ids.append(instance["InstanceId"])
if not instance_ids:
logger.info("No instances found to terminate.")
return
logger.info(f"Terminating instances: {instance_ids}")
ec2_client.terminate_instances(InstanceIds=instance_ids)
waiter = ec2_client.get_waiter("instance_terminated")
waiter.wait(
InstanceIds=instance_ids,
WaiterConfig={"Delay": 10, "MaxAttempts": 60},
)
logger.info("All instances terminated.")
def _delete_tagged_security_groups(ec2_client, project_tag: str) -> None:
response = ec2_client.describe_security_groups(
Filters=[{"Name": "tag:Project", "Values": [project_tag]}]
)
sgs = response["SecurityGroups"]
for sg in sgs:
sg_id = sg["GroupId"]
if sg.get("IpPermissions"):
logger.info(f"Revoking ingress rules for SG: {sg_id}")
ec2_client.revoke_security_group_ingress(
GroupId=sg_id, IpPermissions=sg["IpPermissions"]
)
if sg.get("IpPermissionsEgress"):
logger.info(f"Revoking egress rules for SG: {sg_id}")
ec2_client.revoke_security_group_egress(
GroupId=sg_id, IpPermissions=sg["IpPermissionsEgress"]
)
for sg in sgs:
sg_id = sg["GroupId"]
logger.info(f"Deleting security group: {sg_id}")
for attempt in range(4):
try:
ec2_client.delete_security_group(GroupId=sg_id)
break
except ClientError as e:
if "DependencyViolation" in str(e) and attempt < 3:
logger.warning(
f"SG {sg_id} still has dependencies, retrying in 15s... (attempt {attempt + 1}/4)"
)
time.sleep(15)
else:
raise
# ---------------------------------------------------------------------------
# Summary output
# ---------------------------------------------------------------------------
def print_summary(
uat_instance,
dev_instances: list,
dev_names: list[str],
dev_passwords: list[str],
key_path: Path,
) -> None:
print("\n" + "=" * 70)
print("CAREER KICKSTARTER - INFRASTRUCTURE PROVISIONED")
print("=" * 70)
print(f"\nSSH Key (operator): {key_path}")
print(f"Deploy keys: per-team keys auto-installed on each dev machine")
print(f"\nUAT Instance:")
print(f" ID: {uat_instance.id}")
print(f" Public IP: {uat_instance.public_ip_address}")
print(f" Private IP: {uat_instance.private_ip_address}")
print(
f" SSH: ssh -i {key_path} ec2-user@{uat_instance.public_ip_address}"
)
if dev_instances:
print(f"\nDev Instance Credentials (share with teams):")
# Table header
name_w = max(len(n) for n in dev_names) + 2
pass_w = max(len(p) for p in dev_passwords) + 2
header = (
f" {'Name':<{name_w}} {'Username':<{name_w}} "
f"{'Password':<{pass_w}} {'SSH Command'}"
)
sep = f" {'-' * name_w} {'-' * name_w} {'-' * pass_w} {'-' * 40}"
print(header)
print(sep)
for name, inst, password in zip(dev_names, dev_instances, dev_passwords):
ssh_cmd = f"ssh {name}@{inst.public_ip_address}"
print(
f" {name:<{name_w}} {name:<{name_w}} "
f"{password:<{pass_w}} {ssh_cmd}"
)
print(f"\nPost-provision: SSH into UAT and run setup_uat.sh to configure")
print(f" dev machine hostname resolution for test result delivery:")
print(
f" ssh -i {key_path} ec2-user@{uat_instance.public_ip_address} "
f"'bash setup_uat.sh'"
)
print(f"\nTo destroy: python provision_aws.py --destroy")
print("=" * 70)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
args = parse_args()
session = boto3.Session(region_name=args.region)
ec2_client = session.client("ec2")
ec2_resource = session.resource("ec2")
iam_client = session.client("iam")
# --- Destroy mode ---
if args.destroy:
destroy_resources(ec2_client, ec2_resource, iam_client, args.project_tag)
return
# --- Provision mode ---
# Step 1: AMI
ami_id = args.ami_id
logger.info(f"Using AMI: {ami_id}")
# Step 2: Generate dev names and passwords upfront (needed for key creation)
dev_names = [f"{args.username_prefix}-{i}" for i in range(1, args.dev_count + 1)]
dev_passwords = [generate_password() for _ in dev_names]
for dev_name in dev_names:
if not re.match(r"^[a-zA-Z0-9_-]+$", dev_name):
logger.error(
f"Invalid dev name '{dev_name}': "
f"must be alphanumeric with hyphens/underscores only"
)
sys.exit(1)
# Step 3: Create key pairs
# 3a: Operator key pair (for SSH into instances as ec2-user)
operator_key_name = args.key_name
results_key_name = f"{args.key_name}_uat"
for kn in (operator_key_name, results_key_name):
if find_existing_key_pair(ec2_client, kn):
logger.error(
f"Key pair '{kn}' already exists. "
f"Run with --destroy first or delete it manually."
)
sys.exit(1)
operator_key_material = create_key_pair(
ec2_client, operator_key_name, args.project_tag
)
operator_key_path = save_private_key(operator_key_material, operator_key_name)
# 3b: Per-team deploy key pairs
team_deploy_keys = [] # for UAT: [{"dev_name": str, "public_key": str}, ...]
team_key_materials = {} # for dev machines: {dev_name: {"b64": str, "key_name": str}}
for dev_name in dev_names:
key_name = f"{args.key_name}_deploy_{dev_name}"
if find_existing_key_pair(ec2_client, key_name):
logger.error(
f"Key pair '{key_name}' already exists. "
f"Run with --destroy first or delete it manually."
)
sys.exit(1)
key_material = create_key_pair(ec2_client, key_name, args.project_tag)
save_private_key(key_material, key_name)
key_b64 = base64.b64encode(key_material.encode()).decode()
public_key = derive_public_key(key_material)
team_deploy_keys.append({
"dev_name": dev_name,
"public_key": public_key,
})
team_key_materials[dev_name] = {
"b64": key_b64,
"key_name": key_name,
}
# 3c: Results key pair (UAT uses this to SCP results back to dev machines)
results_key_material = create_key_pair(
ec2_client, results_key_name, args.project_tag
)
save_private_key(results_key_material, results_key_name)
results_key_b64 = base64.b64encode(results_key_material.encode()).decode()
# Step 4: Create or reuse security groups (UAT + Dev, isolated)
uat_sg_id, dev_sg_id = find_existing_security_groups(
ec2_client, args.vpc_id, args.project_tag
)
if uat_sg_id is None or dev_sg_id is None:
uat_sg_id, dev_sg_id = create_security_groups(
ec2_client, args.vpc_id, args.project_tag
)
# Step 5: Create or reuse UAT IAM role + instance profile
# Dev instances get no IAM profile (least-privilege: they need no AWS API access)
uat_profile_arn = find_existing_uat_profile(iam_client, args.project_tag)
if uat_profile_arn is None:
uat_profile_arn = create_uat_iam_resources(iam_client, args.project_tag)
# Step 6: Launch UAT instance
logger.info("Launching UAT instance...")
uat_user_data = generate_uat_user_data(
team_deploy_keys=team_deploy_keys,
results_key_b64=results_key_b64,
results_key_name=results_key_name,
uat_repo_url=args.uat_repo_url,
template_repo_url=args.template_repo_url,
restricted_deploy_script_content=(SCRIPT_DIR / "restricted_deploy.sh").read_text(),
project_tag=args.project_tag,
region=args.region,
)
uat_instance = launch_instance(
ec2_resource,
ami_id=ami_id,
instance_type=args.uat_instance_type,
subnet_id=args.subnet_id,
sg_id=uat_sg_id,
key_name=operator_key_name,
user_data=uat_user_data,
project_tag=args.project_tag,
name=f"ck-{args.project_tag}-uat",
role="uat",
iam_instance_profile=uat_profile_arn,
)
logger.info(f"UAT instance launched: {uat_instance.id}")
# Wait for UAT to get its private IP
uat_instance.wait_until_running()
uat_instance.reload()
logger.info(
f"UAT running - Public: {uat_instance.public_ip_address}, "
f"Private: {uat_instance.private_ip_address}"
)
# Step 7: Launch dev instances
dev_instances = []
for dev_name, dev_password in zip(dev_names, dev_passwords):
team_key_info = team_key_materials[dev_name]
logger.info(f"Launching dev instance: {dev_name}...")
dev_user_data = generate_dev_user_data(
deploy_key_b64=team_key_info["b64"],
deploy_key_name=team_key_info["key_name"],
results_key_b64=results_key_b64,
results_key_name=results_key_name,
dev_name=dev_name,
dev_password=dev_password,
uat_private_ip=uat_instance.private_ip_address,
deploy_script_content=(SCRIPT_DIR / "deploy.sh").read_text(),
)
dev_instance = launch_instance(
ec2_resource,
ami_id=ami_id,
instance_type=args.dev_instance_type,
subnet_id=args.subnet_id,
sg_id=dev_sg_id,
key_name=operator_key_name,
user_data=dev_user_data,
project_tag=args.project_tag,
name=f"ck-{args.project_tag}-{dev_name}",
role=dev_name,
)
dev_instances.append(dev_instance)
logger.info(f"Dev instance {dev_name} launched: {dev_instance.id}")
# Wait for all dev instances
for dev_name, inst in zip(dev_names, dev_instances):
inst.wait_until_running()
inst.reload()
logger.info(
f"{dev_name} running - Public: {inst.public_ip_address}, "
f"Private: {inst.private_ip_address}"
)
# Step 9: Wait for all instances to pass status checks (EC2-level, no SSH)
all_instance_ids = [uat_instance.id] + [i.id for i in dev_instances]
logger.info("Waiting for all instances to pass EC2 status checks...")
waiter = ec2_client.get_waiter("instance_status_ok")
waiter.wait(
InstanceIds=all_instance_ids,
WaiterConfig={"Delay": 15, "MaxAttempts": 40},
)
logger.info("All instances passed status checks.")
# Step 10: Print summary
print_summary(uat_instance, dev_instances, dev_names, dev_passwords, operator_key_path)
if __name__ == "__main__":
main()