8.2 - Code - A Simplified DRL-RM Implementation
This file provides a functional, albeit simplified, implementation of the "generalized agent" concept. It is designed to handle a variable number of units by encoding the game state into an entity list and using a decomposed action space.
This code serves as a foundational example of how to structure an agent that can operate in a more complex and dynamic environment than our previous, specialized bots.
The Code: drl_rm_bot.py
# drl_rm_bot.py
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Box, MultiDiscrete
import queue
from sc2.bot_ai import BotAI
from sc2.race import Terran
from sc2.ids.unit_typeid import UnitTypeId
from sc2.units import Units
# This is a hypothetical wrapper class you would create to bridge SC2 and Gymnasium.
# Its implementation is not shown here but is assumed to exist.
from sc2_gym_env import SC2GymEnv
# --- Environment Constants ---
MAX_UNITS = 50 # The maximum number of units to include in our observation
NUM_UNIT_FEATURES = 5 # The number of features describing each unit
# A high value for the observation space bound to accommodate various UnitTypeIds.
MAX_UNIT_TYPE_ID = 2000
class DRL_RM_Bot(BotAI):
"""
The BotAI for our simplified DRL-RM agent. It encodes the game state
into an entity list and decodes actions from the RL agent.
"""
def __init__(self, action_queue, obs_queue):
super().__init__()
self.action_queue = action_queue
self.obs_queue = obs_queue
self.race = Terran
def _encode_state(self) -> np.ndarray:
"""Encodes the current game state into a padded matrix of unit features."""
# Combine our units and visible enemy units, sorted by tag for consistency.
all_units = (self.units + self.enemy_units).sorted(lambda u: u.tag)
# Manually create a feature matrix.
encoded_units = np.zeros((len(all_units), NUM_UNIT_FEATURES), dtype=np.float32)
# A simple mapping of unit types to integers for the observation.
type_map = {UnitTypeId.MARINE: 1, UnitTypeId.SCV: 2, UnitTypeId.ZERGLING: 3}
for i, unit in enumerate(all_units):
map_width = self.game_info.map_size.width
map_height = self.game_info.map_size.height
encoded_units[i] = [
type_map.get(unit.type_id, 0), # Use 0 for unknown types
unit.health_percentage,
unit.position.x / map_width if map_width > 0 else 0,
unit.position.y / map_height if map_height > 0 else 0,
1.0 if unit.is_mine else 0.0 # Ownership feature
]
# Pad the observation with zeros up to MAX_UNITS.
obs = np.zeros((MAX_UNITS, NUM_UNIT_FEATURES), dtype=np.float32)
num_observed = min(len(encoded_units), MAX_UNITS)
if num_observed > 0:
obs[:num_observed] = encoded_units[:num_observed]
return obs
def _decode_and_execute_action(self, action: list[int]):
"""Decodes the agent's action and issues a command."""
actor_idx, ability_id, target_idx = action
my_units = self.units.sorted(lambda u: u.tag)
# The list of all units must be sorted identically to _encode_state.
all_units = (self.units + self.enemy_units).sorted(lambda u: u.tag)
# --- Safety Checks ---
# Ensure the selected actor and target indices are valid.
if not (0 <= actor_idx < len(my_units)): return
if not (0 <= target_idx < min(len(all_units), MAX_UNITS)): return
actor = my_units[actor_idx]
target = all_units[target_idx]
# Ability 0: Move, Ability 1: Attack.
# Direct unit commands are used as per python-sc2 v7.0.5+.
if ability_id == 0:
actor.move(target.position)
elif ability_id == 1:
actor.attack(target)
async def on_step(self, iteration: int):
# Issue commands on a regular interval to manage API overhead.
if iteration % 8 == 0:
# 1. DECODE AND EXECUTE ACTION from the agent.
try:
action = self.action_queue.get_nowait()
self._decode_and_execute_action(action)
except queue.Empty:
pass # No action from agent in this step.
# 2. ENCODE STATE AND SEND OBSERVATION to the agent.
observation = self._encode_state()
terminated = self.townhalls.amount == 0 # End if we lose all bases.
reward = 0.1 # A simple placeholder reward.
# The obs queue expects a Gymnasium-style tuple.
self.obs_queue.put((observation, reward, terminated, False, {}))
if terminated:
await self.client.leave()
class DRL_RM_Env(SC2GymEnv):
"""The Gymnasium Wrapper for the DRL-RM bot."""
def __init__(self):
super().__init__(bot_class=DRL_RM_Bot, map_name="AcropolisLE")
# Observation Space: A 2D matrix of (MAX_UNITS, NUM_FEATURES).
# The 'high' value must accommodate unnormalized features like unit type IDs.
self.observation_space = Box(
low=0, high=MAX_UNIT_TYPE_ID,
shape=(MAX_UNITS, NUM_UNIT_FEATURES),
dtype=np.float32
)
# Action Space: A vector of 3 integers for the decomposed action.
self.action_space = MultiDiscrete([
MAX_UNITS, # Index of the friendly unit to act as the "actor"
2, # Index of the ability to use (0=Move, 1=Attack)
MAX_UNITS # Index of the target unit from the observed list
])