Experiment script
Contents
Experiment script#
Bauwerk provides an experiment script that allows us to evaluate SB3-based agents on Bauwerk distributions. This script can be used as a starting point for your experiments.
Note
In order to run this script you will need to install the relevant dependencies using:
pip install bauwerk[exp]
Usage#
There are two ways of using this script: either running bauwerk-exp or python -m bauwerk/exp/core.py (latter is only available in cloned Bauwerk repo).
So for example we can run:
bauwerk-exp sb3_alg=SAC train_steps_per_task=16000 env_cfg.battery_size=10
To get a full list of available configuration options, run
bauwerk-exp --help
Full script#
"""Utility functions to run experiments within Bauwerk."""
from typing import Optional, Dict
from dataclasses import dataclass
import dataclasses
import bauwerk
import bauwerk.benchmarks
import bauwerk.envs.wrappers
import bauwerk.eval
import bauwerk.utils.sb3
import bauwerk.utils.logging
import hydra
from hydra.core.config_store import ConfigStore
import stable_baselines3 as sb3
from loguru import logger
import wandb
import wandb.integration.sb3
import uuid
from omegaconf import OmegaConf, DictConfig
@dataclass
class Sb3Config:
"""Kwargs for SB3 algorithm."""
policy: str = "MultiInputPolicy"
# verbose: int = 2
@dataclass
class ExpConfig:
"""Experiment configuration."""
# env training params
train_steps_per_task: int = 8760 # 24 * 365
task_len: int = 24 * 30 # total length of task
env_mode: str = "single_env" # or "benchmark"
env_cfg: bauwerk.EnvConfig = bauwerk.EnvConfig()
# only apply if env_mode is benchmark
benchmark: str = "BuildDistB" # benchmark to run experiment on
benchmark_env_kwargs: Optional[Dict] = None
train_procedure: str = "consecutive"
num_train_tasks: int = 10 # will sample
# algorithm params
sb3_alg: str = "SAC"
# note: wandb logging may change
# depending on algorithm
# see https://github.com/DLR-RM/stable-baselines3/issues/263
sb3_alg_kwargs: Sb3Config = Sb3Config()
# wrappers
normalize_obs: bool = True # whether to normalise obs
add_param_obs: bool = False # whether to add building params to obs
# whether to add infeasible control penalty
infeasible_control_penalty: bool = False
penalty_factor: float = 1.0
# evaluation
eval_freq: int = 24 * 7 # evaluate model performance at this frequency
save_dist_figures: bool = True
dist_fig_freq: int = 24 * 7 * 4 # frequency of saving distribution evaluation plots
save_trajectory_figures: bool = True
traj_fig_freq: int = 24 * 7 * 4
traj_visible_h: float = 24 * 7
# logging & experiment tracking
log_level: str = "INFO"
wandb_project: str = "bauwerk"
wandb_mode: str = "online"
def __post_init__(self):
if self.env_mode == "single_env":
if self.task_len != self.env_cfg.episode_len:
logger.info(
(
f"Note that task_len ({self.task_len}) is not the same as env"
f"_cfg.episode_len ({self.env_cfg.episode_len}). "
"This would lead to inconsistent evaluation."
f" Thus, overwritting env_cfg.episode_len with {self.task_len}."
)
)
self.env_cfg.episode_len = self.task_len
cs = ConfigStore.instance()
cs.store(name="config", node=ExpConfig)
@hydra.main(config_path=None, config_name="config", version_base=None)
def run(cfg: DictConfig):
"""Run Bauwerk building distribution experiment."""
# Enable running of cfg checks defined in __post_init__ method above.
# (from https://github.com/facebookresearch/hydra/issues/981)
cfg: ExpConfig = OmegaConf.to_object(cfg)
bauwerk.utils.logging.setup(log_level=cfg.log_level)
logger.info("Starting Bauwerk experiment.")
run_id = uuid.uuid4().hex[:6]
build_dist: bauwerk.benchmarks.Benchmark = getattr(
bauwerk.benchmarks, cfg.benchmark
)(seed=1, env_kwargs=cfg.benchmark_env_kwargs, episode_len=cfg.task_len)
# environment creation pipelines (wrappers etc.)
def apply_train_eval_wrappers(env: bauwerk.HouseEnv, train: bool = False):
"""applying wrappers
Those wrappers that affect reward will only be applied to train env."""
if train is True:
if cfg.infeasible_control_penalty:
env = bauwerk.envs.wrappers.InfeasControlPenalty(
env, penalty_factor=cfg.penalty_factor
)
if cfg.add_param_obs:
env = bauwerk.envs.wrappers.TaskParamObs(
env,
task_param_names=["battery_size"],
task_param_low=build_dist.cfg_dist.battery_size.low,
task_param_high=build_dist.cfg_dist.battery_size.high,
normalize=cfg.normalize_obs,
)
if cfg.normalize_obs:
env = bauwerk.envs.wrappers.NormalizeObs(env)
return env
train_env = apply_train_eval_wrappers(build_dist.make_env(), train=True)
def get_eval_env(task=None, same_as_train=False):
"""Create new eval environment"""
env = build_dist.make_env()
if task is not None:
env.set_task(task)
env = apply_train_eval_wrappers(env, train=same_as_train)
return env
model_cls = getattr(sb3, cfg.sb3_alg)
# configuration based on training type (single task vs multi-task)
if cfg.env_mode == "single_env":
logger.debug(str(cfg.env_cfg))
tasks = [bauwerk.benchmarks.Task(cfg=cfg.env_cfg)]
elif cfg.train_procedure == "consecutive":
tasks = build_dist.train_tasks[: cfg.num_train_tasks]
# setting up wandb logging
root_tensorboard_dir = "outputs/sb3/runs/"
run_tensorboard_log = root_tensorboard_dir + f"{run_id}/"
logger.info(f"Writing tensorboard logs to {run_tensorboard_log}")
# Note: the patch below leads to separation of experiment data
# wandb.tensorboard.patch(root_logdir=root_tensorboard_dir)
wandb_run = wandb.init(
project=cfg.wandb_project,
config={"bauwerk": dataclasses.asdict(cfg)},
sync_tensorboard=True,
id=run_id,
save_code=True,
monitor_gym=True,
mode=cfg.wandb_mode,
)
def create_model(task):
"""Create model and set up logging."""
model = model_cls(
env=train_env,
# tensorboard logs are necessary for full wandb logging
tensorboard_log=run_tensorboard_log,
**dataclasses.asdict(cfg.sb3_alg_kwargs),
)
callbacks = []
callbacks.append(
bauwerk.utils.sb3.EvalCallback(
eval_env=get_eval_env(task),
eval_len=cfg.task_len,
eval_freq=cfg.eval_freq,
)
)
if cfg.save_dist_figures:
callbacks.append(
bauwerk.utils.sb3.bauwerk.utils.sb3.DistPerfPlotCallback(
eval_env=get_eval_env(task),
eval_len=cfg.task_len,
eval_freq=cfg.dist_fig_freq,
)
)
if cfg.save_trajectory_figures:
callbacks.append(
bauwerk.utils.sb3.bauwerk.utils.sb3.TrajectoryPlotCallback(
eval_env=get_eval_env(task, same_as_train=True),
eval_freq=cfg.traj_fig_freq,
visible_h=cfg.traj_visible_h,
)
)
callbacks.append(
wandb.integration.sb3.WandbCallback(
verbose=2,
)
)
return model, callbacks
logger.info("Starting training loop.")
# training per task
for i, task in enumerate(tasks):
logger.info(f"Training on task {i + 1} out of {len(tasks)} tasks.")
logger.info(f"Current task: {task}")
train_env.set_task(task)
if cfg.train_procedure == "separate_models" or i < 1:
model, callbacks = create_model(
task
) # task is only used for model eval callbacks
model.learn(
total_timesteps=cfg.train_steps_per_task,
callback=callbacks,
# note that the log interval is in episodes for off-policy algs
# (see https://github.com/DLR-RM/stable-baselines3/blob/0532a5719c2bb46fd96b61a7e03dd8cb180c00fc/stable_baselines3/common/off_policy_algorithm.py#L604) # pylint: disable=line-too-long
log_interval=1,
# the last two configs prevent the log from being split up
# between learn calls
tb_log_name=f"run-{cfg.sb3_alg}-{wandb_run.id}",
reset_num_timesteps=False,
progress_bar=True,
)
wandb_run.finish()
logger.info("Run completed.")
if __name__ == "__main__":
run() # pylint: disable=no-value-for-parameter