3.3 - Code - The Worker Bot Environment
This file provides the complete implementation for our first reinforcement learning task. It contains two key classes: WorkerBot
, the BotAI
that executes within the game, and WorkerEnv
, the gymnasium.Env
that provides the interface to the stable-baselines3
agent.
This code is a direct translation of the design specification from the previous section.
Implementation Overview
WorkerBot
(The Game Actor):
- Initializes with communication queues.
- On a throttled loop (
iteration % 8
), it blocks, waiting for an action from the agent. - Executes the received action.
- Calculates the reward and the next observation.
- Puts the
(obs, reward, terminated, ...)
tuple back on the queue for the agent. - Manages the episode termination condition.
WorkerEnv
(The Environment Interface):
- Inherits from our reusable
SC2GymEnv
. - Formally defines the
action_space
andobservation_space
to match our design.
worker_bot.py
import numpy as np
from gymnasium.spaces import Box, Discrete
import multiprocessing as mp
from queue import Empty
from burnysc2.bot_ai import BotAI
from burnysc2.ids.unit_typeid import UnitTypeId
from sc2_gym_env import SC2GymEnv, ObservationQueueItem
# --- The BotAI Implementation (Runs in the Game Process) ---
class WorkerBot(BotAI):
"""
The BotAI actor that executes actions and generates observations.
Its only goal is to learn the correct policy for building workers.
"""
def __init__(self, action_queue: mp.Queue, obs_queue: mp.Queue[ObservationQueueItem]):
super().__init__()
self.action_queue = action_queue
self.obs_queue = obs_queue
def _handle_action(self, action: int) -> float:
"""
Executes the agent's action and calculates the sparse reward.
Returns the reward for the action.
"""
can_afford_scv = self.can_afford(UnitTypeId.SCV)
has_idle_cc = self.townhalls.idle.exists
if action == 1: # Action: Build SCV
if can_afford_scv and has_idle_cc:
self.train(UnitTypeId.SCV)
return 5.0 # Positive reward for a correct action
else:
return -5.0 # Negative reward for an impossible/wasted action
return 0.0 # No reward for "Do Nothing" action
async def on_step(self, iteration: int):
"""
The main game loop, throttled to interact with the agent every 8 steps.
"""
if iteration % 8 != 0:
return
try:
# 1. GET ACTION - This is a blocking call, waiting for the RL agent
action = self.action_queue.get(timeout=1)
# 2. EXECUTE ACTION & GET SPARSE REWARD
reward = self._handle_action(action)
# 3. ADD DENSE REWARD for progress
reward += self.workers.amount * 0.1
# 4. GET OBSERVATION for the next state
observation = np.array([
self.minerals / 1000.0,
self.workers.amount / 50.0,
self.supply_left / 20.0
], dtype=np.float32)
# 5. DEFINE TERMINATION CONDITION
terminated = self.workers.amount >= 20
# 6. SEND DATA - Put the results on the queue for the RL agent
self.obs_queue.put((observation, reward, terminated, False, {}))
if terminated:
await self.client.leave()
except Empty:
# This can happen if the training process is killed.
print("Action queue was empty. Assuming training has ended.")
await self.client.leave()
return
# --- The Gymnasium Environment (Runs in the Main Process) ---
class WorkerEnv(SC2GymEnv):
"""
The Gymnasium Wrapper for the WorkerBot.
This class defines the action and observation spaces that are visible
to the stable-baselines3 agent.
"""
def __init__(self):
# Pass our custom BotAI class and a map name to the parent.
super().__init__(bot_class=WorkerBot, map_name="AcropolisLE")
# The agent can choose between two actions: 0 or 1.
self.action_space = Discrete(2)
# The observation is a 1D array of 3 normalized float values.
self.observation_space = Box(
low=0.0,
high=np.inf,
shape=(3,),
dtype=np.float32
)