#!/usr/bin/env python3 """ AWS infrastructure provisioning for the Tech Career Kickstarter program. Creates EC2 instances (1 UAT + N dev machines) with all required software, SSH keys, and environment variables pre-configured via user-data scripts. Usage: # Provision infrastructure python provision_aws.py --vpc-id vpc-xxx --subnet-id subnet-xxx --dev-count 5 # Custom username prefix python provision_aws.py --vpc-id vpc-xxx --subnet-id subnet-xxx --dev-count 3 --username-prefix team # Destroy all resources python provision_aws.py --destroy """ import argparse import base64 import json import logging import os import re import secrets import stat import subprocess import sys import tempfile import time from pathlib import Path from typing import Optional import boto3 from botocore.exceptions import ClientError logger = logging.getLogger("provision_aws") UAT_DEPLOYMENT_DIR = "/srv/deployments" UAT_TESTS_DIR_SUFFIX = "gtat-tech-career-kickstarter/solution/tests" DEPLOY_GROUP = "ck-deploy" DEPLOYER_USER = "deployer" SCRIPT_DIR = Path(__file__).resolve().parent # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Provision AWS infrastructure for Tech Career Kickstarter" ) parser.add_argument("--vpc-id", type=str, help="VPC ID") parser.add_argument("--subnet-id", type=str, help="Subnet ID") parser.add_argument( "--ami-id", type=str, default="ami-086b9c66ec8e1126a", help="EC2 AMI ID (default: ami-086b9c66ec8e1126a, RHEL 9 free tier)", ) parser.add_argument( "--dev-count", type=int, default=1, help="Number of dev instances (default: 1)" ) parser.add_argument( "--uat-instance-type", type=str, default="c6i.4xlarge", help="EC2 instance type for the UAT instance (default: c6i.4xlarge)", ) parser.add_argument( "--dev-instance-type", type=str, default="c6i.xlarge", help="EC2 instance type for dev instances (default: c6i.xlarge)", ) parser.add_argument( "--username-prefix", type=str, default="dev", help="Prefix for dev instance usernames, e.g. 'dev' creates dev-1, dev-2 (default: dev)", ) parser.add_argument( "--destroy", action="store_true", help="Destroy all tagged resources" ) parser.add_argument( "--project-tag", type=str, default="career-kickstarter", help="Project tag value (default: career-kickstarter)", ) parser.add_argument( "--uat-repo-url", type=str, default="https://github.com/optiver-external/gtat-tech-career-kickstarter.git", help="Git URL for UAT repo (solution + tests)" ) parser.add_argument( "--template-repo-url", type=str, default="https://github.com/optiver-external/gtat-tech-career-kickstarter-challenge.git", help="Git URL for template repo (cloned on dev machines)" ) parser.add_argument( "--region", type=str, default="eu-central-1", help="AWS region (default: eu-central-1)" ) parser.add_argument( "--key-name", type=str, default="id_optivex", help="EC2 key pair name (default: id_optivex)", ) args = parser.parse_args() if not args.destroy: if not args.vpc_id: parser.error("--vpc-id is required when not using --destroy") if not args.subnet_id: parser.error("--subnet-id is required when not using --destroy") if args.dev_count < 1: parser.error("--dev-count must be >= 1") return args # --------------------------------------------------------------------------- # Tagging helper # --------------------------------------------------------------------------- def get_tag_specifications( resource_type: str, project_tag: str, name: str, role: str = "" ) -> list: tags = [ {"Key": "Project", "Value": project_tag}, {"Key": "Name", "Value": name}, {"Key": "ManagedBy", "Value": "provision_aws.py"}, ] if role: tags.append({"Key": "Role", "Value": role}) return [{"ResourceType": resource_type, "Tags": tags}] # --------------------------------------------------------------------------- # Key pair management # --------------------------------------------------------------------------- def find_existing_key_pair(ec2_client, key_name: str) -> bool: try: ec2_client.describe_key_pairs(KeyNames=[key_name]) return True except ClientError: return False def create_key_pair(ec2_client, key_name: str, project_tag: str) -> str: """Create an EC2 key pair and return the private key material.""" response = ec2_client.create_key_pair( KeyName=key_name, KeyType="rsa", TagSpecifications=get_tag_specifications("key-pair", project_tag, key_name), ) logger.info(f"Created key pair: {key_name}") return response["KeyMaterial"] def _restrict_file_permissions(path) -> None: """Restrict a file to owner-only access (cross-platform).""" if os.name == "nt": # Windows: remove inherited ACLs, grant only current user full control import subprocess as _sp username = os.environ.get("USERNAME", os.environ.get("USER", "")) _sp.run( ["icacls", str(path), "/inheritance:r", "/grant:r", f"{username}:(R,W)"], capture_output=True, ) else: os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) def save_private_key(private_key_material: str, key_name: str) -> Path: key_path = Path(f"./{key_name}.pem") key_path.write_text(private_key_material) _restrict_file_permissions(key_path) logger.info(f"Saved private key to {key_path}") return key_path def delete_key_pair(ec2_client, key_name: str) -> None: try: ec2_client.describe_key_pairs(KeyNames=[key_name]) logger.info(f"Deleting key pair: {key_name}") ec2_client.delete_key_pair(KeyName=key_name) except ClientError as e: if "InvalidKeyPair.NotFound" in str(e): logger.info(f"Key pair {key_name} not found, skipping.") else: raise # --------------------------------------------------------------------------- # Security group # --------------------------------------------------------------------------- def find_existing_security_groups( ec2_client, vpc_id: str, project_tag: str ) -> tuple[Optional[str], Optional[str]]: """Find existing UAT and Dev security groups. Returns (uat_sg_id, dev_sg_id).""" response = ec2_client.describe_security_groups( Filters=[ {"Name": "tag:Project", "Values": [project_tag]}, {"Name": "vpc-id", "Values": [vpc_id]}, ] ) uat_sg_id = None dev_sg_id = None for sg in response["SecurityGroups"]: name = sg.get("GroupName", "") if "uat-sg" in name: uat_sg_id = sg["GroupId"] logger.info(f"Found existing UAT security group: {uat_sg_id}") elif "dev-sg" in name: dev_sg_id = sg["GroupId"] logger.info(f"Found existing Dev security group: {dev_sg_id}") return uat_sg_id, dev_sg_id def create_security_groups( ec2_client, vpc_id: str, project_tag: str ) -> tuple[str, str]: """Create UAT and Dev security groups with dev-instance isolation. Returns (uat_sg_id, dev_sg_id). """ # ---- UAT security group ---- uat_sg_name = f"ck-{project_tag}-uat-sg" response = ec2_client.create_security_group( GroupName=uat_sg_name, Description="Career Kickstarter - UAT instance", VpcId=vpc_id, TagSpecifications=get_tag_specifications( "security-group", project_tag, uat_sg_name ), ) uat_sg_id = response["GroupId"] logger.info(f"Created UAT security group: {uat_sg_id}") # ---- Dev security group ---- dev_sg_name = f"ck-{project_tag}-dev-sg" response = ec2_client.create_security_group( GroupName=dev_sg_name, Description="Career Kickstarter - Dev instances (isolated)", VpcId=vpc_id, TagSpecifications=get_tag_specifications( "security-group", project_tag, dev_sg_name ), ) dev_sg_id = response["GroupId"] logger.info(f"Created Dev security group: {dev_sg_id}") # ---- UAT SG ingress rules ---- ec2_client.authorize_security_group_ingress( GroupId=uat_sg_id, IpPermissions=[ # SSH from the internet { "IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "IpRanges": [ {"CidrIp": "0.0.0.0/0", "Description": "SSH from internet"} ], }, # SSH from dev instances (deploy.sh) { "IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "UserIdGroupPairs": [ {"GroupId": dev_sg_id, "Description": "SSH from dev instances"} ], }, ], ) # ---- Dev SG ingress rules ---- ec2_client.authorize_security_group_ingress( GroupId=dev_sg_id, IpPermissions=[ # SSH from the internet { "IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "IpRanges": [ {"CidrIp": "0.0.0.0/0", "Description": "SSH from internet"} ], }, # SSH from UAT (SCP test results back) { "IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "UserIdGroupPairs": [ {"GroupId": uat_sg_id, "Description": "SSH from UAT (results)"} ], }, ], ) logger.info(f"Configured ingress rules for UAT ({uat_sg_id}) and Dev ({dev_sg_id})") return uat_sg_id, dev_sg_id # --------------------------------------------------------------------------- # Password generation # --------------------------------------------------------------------------- def generate_password() -> str: """Generate a random URL-safe password.""" return secrets.token_urlsafe(12) # --------------------------------------------------------------------------- # SSH public key derivation # --------------------------------------------------------------------------- def derive_public_key(private_key_material: str) -> str: """Derive an SSH public key from PEM private key material.""" fd, tmp_path = tempfile.mkstemp(suffix=".pem") try: os.write(fd, private_key_material.encode()) os.close(fd) _restrict_file_permissions(tmp_path) result = subprocess.run( ["ssh-keygen", "-y", "-f", tmp_path], capture_output=True, text=True, check=True, ) return result.stdout.strip() finally: # Retry unlink on Windows where antivirus may hold a brief lock for attempt in range(3): try: os.unlink(tmp_path) break except PermissionError: if attempt < 2: time.sleep(0.5) else: logger.warning(f"Could not delete temp file: {tmp_path}") # --------------------------------------------------------------------------- # IAM roles and instance profiles # --------------------------------------------------------------------------- EC2_TRUST_POLICY = json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } ) UAT_ROLE_POLICY = json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "ec2:DescribeInstances", "Resource": "*", } ], } ) def _uat_iam_names(project_tag: str) -> tuple[str, str]: """Return (role_name, profile_name) for the UAT instance.""" prefix = f"ck-{project_tag}" return f"{prefix}-uat-role", f"{prefix}-uat-profile" def find_existing_uat_profile( iam_client, project_tag: str ) -> Optional[str]: """Return the UAT instance profile ARN if it already exists.""" _, profile_name = _uat_iam_names(project_tag) try: resp = iam_client.get_instance_profile(InstanceProfileName=profile_name) arn = resp["InstanceProfile"]["Arn"] logger.info(f"Found existing UAT instance profile: {arn}") return arn except ClientError as e: if e.response["Error"]["Code"] != "NoSuchEntity": raise return None def create_uat_iam_resources(iam_client, project_tag: str) -> str: """Create IAM role and instance profile for the UAT instance. Dev instances intentionally get no IAM profile (least-privilege). Returns the UAT instance profile ARN. """ role_name, profile_name = _uat_iam_names(project_tag) tags = [ {"Key": "Project", "Value": project_tag}, {"Key": "ManagedBy", "Value": "provision_aws"}, ] iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=EC2_TRUST_POLICY, Description="Career Kickstarter - UAT instance role", Tags=tags, ) iam_client.put_role_policy( RoleName=role_name, PolicyName="ec2-describe-instances", PolicyDocument=UAT_ROLE_POLICY, ) iam_client.create_instance_profile( InstanceProfileName=profile_name, Tags=tags ) iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) resp = iam_client.get_instance_profile(InstanceProfileName=profile_name) arn = resp["InstanceProfile"]["Arn"] logger.info(f"Created UAT role ({role_name}) and profile ({arn})") # IAM is eventually consistent — wait for profile to propagate logger.info("Waiting 10s for IAM instance profile to propagate...") time.sleep(10) return arn def destroy_iam_resources(iam_client, project_tag: str) -> None: """Delete the UAT IAM role and instance profile.""" role_name, profile_name = _uat_iam_names(project_tag) # Remove role from instance profile, then delete profile try: iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) except ClientError as e: if e.response["Error"]["Code"] != "NoSuchEntity": raise try: iam_client.delete_instance_profile(InstanceProfileName=profile_name) logger.info(f"Deleted instance profile: {profile_name}") except ClientError as e: if e.response["Error"]["Code"] != "NoSuchEntity": raise # Delete inline policies, then delete role try: policies = iam_client.list_role_policies(RoleName=role_name) for policy_name in policies.get("PolicyNames", []): iam_client.delete_role_policy( RoleName=role_name, PolicyName=policy_name ) iam_client.delete_role(RoleName=role_name) logger.info(f"Deleted IAM role: {role_name}") except ClientError as e: if e.response["Error"]["Code"] != "NoSuchEntity": raise # --------------------------------------------------------------------------- # User-data generation # --------------------------------------------------------------------------- def generate_uat_user_data( team_deploy_keys: list[dict], results_key_b64: str, results_key_name: str, uat_repo_url: str, template_repo_url: str, restricted_deploy_script_content: str, project_tag: str, region: str, ) -> str: # Build authorized_keys entries — one per team with forced command authorized_keys_lines = [] team_dir_lines = [] for entry in team_deploy_keys: dev_name = entry["dev_name"] pub_key = entry["public_key"] ak_line = ( f'command="/usr/local/bin/restricted_deploy.sh {dev_name}",' f"no-port-forwarding,no-agent-forwarding," f"no-X11-forwarding,no-pty " f"{pub_key}" ) authorized_keys_lines.append(ak_line) team_dir_lines.append( f"mkdir -p {UAT_DEPLOYMENT_DIR}/{dev_name}\n" f"chown {DEPLOYER_USER}:{DEPLOY_GROUP} {UAT_DEPLOYMENT_DIR}/{dev_name}\n" f"chmod 2775 {UAT_DEPLOYMENT_DIR}/{dev_name}" ) authorized_keys_block = "\n".join(authorized_keys_lines) team_dirs_block = "\n".join(team_dir_lines) return f"""#!/bin/bash set -ex exec > /var/log/user-data.log 2>&1 # Install system packages dnf config-manager --set-enabled codeready-builder-for-rhel-9-rhui-rpms dnf install -y git protobuf-compiler python3 pip openssh-clients awscli # Install uv, then use uv to get Python 3.13 curl -LsSf https://astral.sh/uv/install.sh | HOME=/home/ec2-user sh export PATH="/home/ec2-user/.local/bin:$PATH" cp /home/ec2-user/.local/bin/uv /usr/local/bin/ 2>/dev/null || true uv python install 3.13 # ---- Shared deployment group & directory ---- groupadd {DEPLOY_GROUP} mkdir -p {UAT_DEPLOYMENT_DIR} # ---- Install restricted deploy script ---- cat > /usr/local/bin/restricted_deploy.sh <<'RESTRICTEDEOF' {restricted_deploy_script_content} RESTRICTEDEOF chmod 755 /usr/local/bin/restricted_deploy.sh # ---- Restricted deployer user (dev machines SSH as this user) ---- useradd -m -s /bin/bash -G {DEPLOY_GROUP} {DEPLOYER_USER} chown {DEPLOYER_USER}:{DEPLOY_GROUP} {UAT_DEPLOYMENT_DIR} chmod 2775 {UAT_DEPLOYMENT_DIR} # ---- Pre-create per-team directories ---- {team_dirs_block} # ---- Authorized keys with per-team forced command restrictions ---- DEPLOYER_SSH="/home/{DEPLOYER_USER}/.ssh" mkdir -p "$DEPLOYER_SSH" cat > "$DEPLOYER_SSH/authorized_keys" <<'AKEOF' {authorized_keys_block} AKEOF chmod 700 "$DEPLOYER_SSH" chmod 600 "$DEPLOYER_SSH/authorized_keys" chown -R {DEPLOYER_USER}:{DEPLOYER_USER} "$DEPLOYER_SSH" # ---- ec2-user setup (program manager only) ---- usermod -aG {DEPLOY_GROUP} ec2-user # Lock down ec2-user home so deployer/participants cannot read test cases chmod 700 /home/ec2-user # Results key: ec2-user uses this to SCP test results back to dev machines mkdir -p /home/ec2-user/.ssh echo '{results_key_b64}' | base64 -d > /home/ec2-user/.ssh/{results_key_name} chmod 600 /home/ec2-user/.ssh/{results_key_name} # SSH config for ec2-user: use results key by default cat > /home/ec2-user/.ssh/config <<'SSHEOF' Host * IdentityFile ~/.ssh/{results_key_name} StrictHostKeyChecking no UserKnownHostsFile /dev/null SSHEOF chmod 600 /home/ec2-user/.ssh/config chown -R ec2-user:ec2-user /home/ec2-user/.ssh # Environment variables cat >> /home/ec2-user/.bashrc <<'ENVEOF' export CK_DEPLOYMENT_DIR={UAT_DEPLOYMENT_DIR} export CK_TESTS_DIR=/home/ec2-user/{UAT_TESTS_DIR_SUFFIX} export CK_UAT_REPO_URL={uat_repo_url} export CK_TEMPLATE_REPO_URL={template_repo_url} export CK_PROJECT_TAG={project_tag} export CK_AWS_REGION={region} export PATH=$HOME/.local/bin:$PATH ENVEOF touch /tmp/user-data-complete """ def generate_dev_user_data( deploy_key_b64: str, deploy_key_name: str, results_key_b64: str, results_key_name: str, dev_name: str, dev_password: str, uat_private_ip: str, deploy_script_content: str, ) -> str: return f"""#!/bin/bash set -ex exec > /var/log/user-data.log 2>&1 DEV_NAME="{dev_name}" DEV_PASSWORD="{dev_password}" # Install system packages dnf config-manager --set-enabled codeready-builder-for-rhel-9-rhui-rpms dnf install -y git protobuf-compiler python3 pip openssh-clients # Install uv + Python 3.13 curl -LsSf https://astral.sh/uv/install.sh | HOME=/home/ec2-user sh export PATH="/home/ec2-user/.local/bin:$PATH" cp /home/ec2-user/.local/bin/uv /usr/local/bin/ 2>/dev/null || true uv python install 3.13 # Create team user account with password useradd -m -s /bin/bash "$DEV_NAME" echo "$DEV_NAME:$DEV_PASSWORD" | chpasswd # Enable password-based SSH authentication sed -i 's/^PasswordAuthentication no/PasswordAuthentication yes/' /etc/ssh/sshd_config sed -i 's/^#PasswordAuthentication yes/PasswordAuthentication yes/' /etc/ssh/sshd_config # Also check sshd_config.d/ drop-in files find /etc/ssh/sshd_config.d/ -name '*.conf' -exec sed -i 's/^PasswordAuthentication no/PasswordAuthentication yes/' {{}} \\; systemctl restart sshd # ---- Deploy key: used by deploy.sh to SCP to UAT ---- DEV_HOME="/home/$DEV_NAME" SSH_DIR="$DEV_HOME/.ssh" mkdir -p "$SSH_DIR" echo '{deploy_key_b64}' | base64 -d > "$SSH_DIR/{deploy_key_name}" chmod 600 "$SSH_DIR/{deploy_key_name}" # ---- Results key: authorize UAT to SCP test results back ---- # Only the results key public half goes into authorized_keys. # The deploy key is NOT authorized here, so other dev machines cannot SSH in. echo '{results_key_b64}' | base64 -d > /tmp/results_key_tmp chmod 600 /tmp/results_key_tmp ssh-keygen -y -f /tmp/results_key_tmp >> "$SSH_DIR/authorized_keys" rm -f /tmp/results_key_tmp cat > "$SSH_DIR/config" <<'SSHEOF' Host * IdentityFile ~/.ssh/{deploy_key_name} StrictHostKeyChecking no UserKnownHostsFile /dev/null SSHEOF chmod 600 "$SSH_DIR/config" "$SSH_DIR/authorized_keys" chown -R "$DEV_NAME:$DEV_NAME" "$SSH_DIR" # Set hostname hostnamectl set-hostname "$DEV_NAME" # Environment variables (CK_UAT_USER=deployer: restricted user on UAT) cat >> "$DEV_HOME/.bashrc" < /usr/local/bin/deploy.sh <<'DEPLOYSCRIPT' {deploy_script_content} DEPLOYSCRIPT chmod +x /usr/local/bin/deploy.sh touch /tmp/user-data-complete """ # --------------------------------------------------------------------------- # Instance launching # --------------------------------------------------------------------------- def launch_instance( ec2_resource, ami_id: str, instance_type: str, subnet_id: str, sg_id: str, key_name: str, user_data: str, project_tag: str, name: str, role: str, iam_instance_profile: str = "", ): """Launch a single EC2 instance and return it.""" kwargs = dict( ImageId=ami_id, InstanceType=instance_type, KeyName=key_name, MinCount=1, MaxCount=1, UserData=user_data, TagSpecifications=get_tag_specifications( "instance", project_tag, name, role=role ), NetworkInterfaces=[ { "DeviceIndex": 0, "SubnetId": subnet_id, "Groups": [sg_id], "AssociatePublicIpAddress": True, } ], ) if iam_instance_profile: # Accept either an ARN or a profile name if iam_instance_profile.startswith("arn:"): kwargs["IamInstanceProfile"] = {"Arn": iam_instance_profile} else: kwargs["IamInstanceProfile"] = {"Name": iam_instance_profile} instances = ec2_resource.create_instances(**kwargs) return instances[0] # --------------------------------------------------------------------------- # Destroy / teardown # --------------------------------------------------------------------------- def destroy_resources( ec2_client, ec2_resource, iam_client, project_tag: str ) -> None: """Destroy all resources tagged with the given project tag.""" logger.info(f"Destroying all resources tagged with Project={project_tag}...") # Step 1: Terminate tagged instances _terminate_tagged_instances(ec2_client, project_tag) # Step 2: Delete tagged security groups _delete_tagged_security_groups(ec2_client, project_tag) # Step 3: Delete all key pairs tagged with our project tag response = ec2_client.describe_key_pairs( Filters=[{"Name": "tag:Project", "Values": [project_tag]}] ) for kp in response["KeyPairs"]: kn = kp["KeyName"] delete_key_pair(ec2_client, kn) local_key_path = Path(f"./{kn}.pem") if local_key_path.exists(): local_key_path.unlink() logger.info(f"Deleted local key file: {local_key_path}") # Step 4: Delete IAM roles and instance profiles destroy_iam_resources(iam_client, project_tag) logger.info("All resources destroyed.") def _terminate_tagged_instances(ec2_client, project_tag: str) -> None: response = ec2_client.describe_instances( Filters=[ {"Name": "tag:Project", "Values": [project_tag]}, { "Name": "instance-state-name", "Values": ["running", "stopped", "pending"], }, ] ) instance_ids = [] for reservation in response["Reservations"]: for instance in reservation["Instances"]: instance_ids.append(instance["InstanceId"]) if not instance_ids: logger.info("No instances found to terminate.") return logger.info(f"Terminating instances: {instance_ids}") ec2_client.terminate_instances(InstanceIds=instance_ids) waiter = ec2_client.get_waiter("instance_terminated") waiter.wait( InstanceIds=instance_ids, WaiterConfig={"Delay": 10, "MaxAttempts": 60}, ) logger.info("All instances terminated.") def _delete_tagged_security_groups(ec2_client, project_tag: str) -> None: response = ec2_client.describe_security_groups( Filters=[{"Name": "tag:Project", "Values": [project_tag]}] ) sgs = response["SecurityGroups"] for sg in sgs: sg_id = sg["GroupId"] if sg.get("IpPermissions"): logger.info(f"Revoking ingress rules for SG: {sg_id}") ec2_client.revoke_security_group_ingress( GroupId=sg_id, IpPermissions=sg["IpPermissions"] ) if sg.get("IpPermissionsEgress"): logger.info(f"Revoking egress rules for SG: {sg_id}") ec2_client.revoke_security_group_egress( GroupId=sg_id, IpPermissions=sg["IpPermissionsEgress"] ) for sg in sgs: sg_id = sg["GroupId"] logger.info(f"Deleting security group: {sg_id}") for attempt in range(4): try: ec2_client.delete_security_group(GroupId=sg_id) break except ClientError as e: if "DependencyViolation" in str(e) and attempt < 3: logger.warning( f"SG {sg_id} still has dependencies, retrying in 15s... (attempt {attempt + 1}/4)" ) time.sleep(15) else: raise # --------------------------------------------------------------------------- # Summary output # --------------------------------------------------------------------------- def print_summary( uat_instance, dev_instances: list, dev_names: list[str], dev_passwords: list[str], key_path: Path, ) -> None: print("\n" + "=" * 70) print("CAREER KICKSTARTER - INFRASTRUCTURE PROVISIONED") print("=" * 70) print(f"\nSSH Key (operator): {key_path}") print(f"Deploy keys: per-team keys auto-installed on each dev machine") print(f"\nUAT Instance:") print(f" ID: {uat_instance.id}") print(f" Public IP: {uat_instance.public_ip_address}") print(f" Private IP: {uat_instance.private_ip_address}") print( f" SSH: ssh -i {key_path} ec2-user@{uat_instance.public_ip_address}" ) if dev_instances: print(f"\nDev Instance Credentials (share with teams):") # Table header name_w = max(len(n) for n in dev_names) + 2 pass_w = max(len(p) for p in dev_passwords) + 2 header = ( f" {'Name':<{name_w}} {'Username':<{name_w}} " f"{'Password':<{pass_w}} {'SSH Command'}" ) sep = f" {'-' * name_w} {'-' * name_w} {'-' * pass_w} {'-' * 40}" print(header) print(sep) for name, inst, password in zip(dev_names, dev_instances, dev_passwords): ssh_cmd = f"ssh {name}@{inst.public_ip_address}" print( f" {name:<{name_w}} {name:<{name_w}} " f"{password:<{pass_w}} {ssh_cmd}" ) print(f"\nPost-provision: SSH into UAT and run setup_uat.sh to configure") print(f" dev machine hostname resolution for test result delivery:") print( f" ssh -i {key_path} ec2-user@{uat_instance.public_ip_address} " f"'bash setup_uat.sh'" ) print(f"\nTo destroy: python provision_aws.py --destroy") print("=" * 70) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) args = parse_args() session = boto3.Session(region_name=args.region) ec2_client = session.client("ec2") ec2_resource = session.resource("ec2") iam_client = session.client("iam") # --- Destroy mode --- if args.destroy: destroy_resources(ec2_client, ec2_resource, iam_client, args.project_tag) return # --- Provision mode --- # Step 1: AMI ami_id = args.ami_id logger.info(f"Using AMI: {ami_id}") # Step 2: Generate dev names and passwords upfront (needed for key creation) dev_names = [f"{args.username_prefix}-{i}" for i in range(1, args.dev_count + 1)] dev_passwords = [generate_password() for _ in dev_names] for dev_name in dev_names: if not re.match(r"^[a-zA-Z0-9_-]+$", dev_name): logger.error( f"Invalid dev name '{dev_name}': " f"must be alphanumeric with hyphens/underscores only" ) sys.exit(1) # Step 3: Create key pairs # 3a: Operator key pair (for SSH into instances as ec2-user) operator_key_name = args.key_name results_key_name = f"{args.key_name}_uat" for kn in (operator_key_name, results_key_name): if find_existing_key_pair(ec2_client, kn): logger.error( f"Key pair '{kn}' already exists. " f"Run with --destroy first or delete it manually." ) sys.exit(1) operator_key_material = create_key_pair( ec2_client, operator_key_name, args.project_tag ) operator_key_path = save_private_key(operator_key_material, operator_key_name) # 3b: Per-team deploy key pairs team_deploy_keys = [] # for UAT: [{"dev_name": str, "public_key": str}, ...] team_key_materials = {} # for dev machines: {dev_name: {"b64": str, "key_name": str}} for dev_name in dev_names: key_name = f"{args.key_name}_deploy_{dev_name}" if find_existing_key_pair(ec2_client, key_name): logger.error( f"Key pair '{key_name}' already exists. " f"Run with --destroy first or delete it manually." ) sys.exit(1) key_material = create_key_pair(ec2_client, key_name, args.project_tag) save_private_key(key_material, key_name) key_b64 = base64.b64encode(key_material.encode()).decode() public_key = derive_public_key(key_material) team_deploy_keys.append({ "dev_name": dev_name, "public_key": public_key, }) team_key_materials[dev_name] = { "b64": key_b64, "key_name": key_name, } # 3c: Results key pair (UAT uses this to SCP results back to dev machines) results_key_material = create_key_pair( ec2_client, results_key_name, args.project_tag ) save_private_key(results_key_material, results_key_name) results_key_b64 = base64.b64encode(results_key_material.encode()).decode() # Step 4: Create or reuse security groups (UAT + Dev, isolated) uat_sg_id, dev_sg_id = find_existing_security_groups( ec2_client, args.vpc_id, args.project_tag ) if uat_sg_id is None or dev_sg_id is None: uat_sg_id, dev_sg_id = create_security_groups( ec2_client, args.vpc_id, args.project_tag ) # Step 5: Create or reuse UAT IAM role + instance profile # Dev instances get no IAM profile (least-privilege: they need no AWS API access) uat_profile_arn = find_existing_uat_profile(iam_client, args.project_tag) if uat_profile_arn is None: uat_profile_arn = create_uat_iam_resources(iam_client, args.project_tag) # Step 6: Launch UAT instance logger.info("Launching UAT instance...") uat_user_data = generate_uat_user_data( team_deploy_keys=team_deploy_keys, results_key_b64=results_key_b64, results_key_name=results_key_name, uat_repo_url=args.uat_repo_url, template_repo_url=args.template_repo_url, restricted_deploy_script_content=(SCRIPT_DIR / "restricted_deploy.sh").read_text(), project_tag=args.project_tag, region=args.region, ) uat_instance = launch_instance( ec2_resource, ami_id=ami_id, instance_type=args.uat_instance_type, subnet_id=args.subnet_id, sg_id=uat_sg_id, key_name=operator_key_name, user_data=uat_user_data, project_tag=args.project_tag, name=f"ck-{args.project_tag}-uat", role="uat", iam_instance_profile=uat_profile_arn, ) logger.info(f"UAT instance launched: {uat_instance.id}") # Wait for UAT to get its private IP uat_instance.wait_until_running() uat_instance.reload() logger.info( f"UAT running - Public: {uat_instance.public_ip_address}, " f"Private: {uat_instance.private_ip_address}" ) # Step 7: Launch dev instances dev_instances = [] for dev_name, dev_password in zip(dev_names, dev_passwords): team_key_info = team_key_materials[dev_name] logger.info(f"Launching dev instance: {dev_name}...") dev_user_data = generate_dev_user_data( deploy_key_b64=team_key_info["b64"], deploy_key_name=team_key_info["key_name"], results_key_b64=results_key_b64, results_key_name=results_key_name, dev_name=dev_name, dev_password=dev_password, uat_private_ip=uat_instance.private_ip_address, deploy_script_content=(SCRIPT_DIR / "deploy.sh").read_text(), ) dev_instance = launch_instance( ec2_resource, ami_id=ami_id, instance_type=args.dev_instance_type, subnet_id=args.subnet_id, sg_id=dev_sg_id, key_name=operator_key_name, user_data=dev_user_data, project_tag=args.project_tag, name=f"ck-{args.project_tag}-{dev_name}", role=dev_name, ) dev_instances.append(dev_instance) logger.info(f"Dev instance {dev_name} launched: {dev_instance.id}") # Wait for all dev instances for dev_name, inst in zip(dev_names, dev_instances): inst.wait_until_running() inst.reload() logger.info( f"{dev_name} running - Public: {inst.public_ip_address}, " f"Private: {inst.private_ip_address}" ) # Step 9: Wait for all instances to pass status checks (EC2-level, no SSH) all_instance_ids = [uat_instance.id] + [i.id for i in dev_instances] logger.info("Waiting for all instances to pass EC2 status checks...") waiter = ec2_client.get_waiter("instance_status_ok") waiter.wait( InstanceIds=all_instance_ids, WaiterConfig={"Delay": 15, "MaxAttempts": 40}, ) logger.info("All instances passed status checks.") # Step 10: Print summary print_summary(uat_instance, dev_instances, dev_names, dev_passwords, operator_key_path) if __name__ == "__main__": main()