"""Wrappers for Bauwerk environments."""
from typing import Any, Dict, Tuple
import gym
import numpy as np
import copy
import bauwerk
[docs]class TaskParamObs(gym.ObservationWrapper):
"""Wrapper that adds task parameters to observation space."""
def __init__(
self,
env: bauwerk.HouseEnv,
task_param_names: list,
task_param_low: np.array,
task_param_high: np.array,
normalize=False,
):
"""Wrapper that adds task parameters to observation space.
Args:
env (bauwerk.HouseEnv): environment to wrap.
task_param_names (list): list of names of task parameters. Each
name should be a attribute of the environment's config.
task_param_low (np.array): lower bound of task parameters.
task_param_high (np.array): upper bound of the task parameters.
normalize (bool, optional): whether to normalise the task
parameters. Defaults to False.
"""
super().__init__(env)
shape = (len(task_param_names),) # shape of task param obs space
task_param_low = np.array(task_param_low).reshape(shape)
task_param_high = np.array(task_param_high).reshape(shape)
self.task_param_names = task_param_names
# get task parameter values
self.task_param_values = np.array(
[getattr(env.cfg, key) for key in task_param_names]
)
if normalize:
self.task_param_values = [
(value - task_param_low[i]) / (task_param_high[i], task_param_low[i])
for i, value in enumerate(self.task_param_values)
]
task_param_low = np.zeros(shape)
task_param_high = np.ones(shape)
# new obs space starts from old
# note: copy is necessary because otherwise underlying obs space changed.
new_spaces = copy.copy(env.observation_space.spaces)
new_spaces["task_param"] = gym.spaces.Box(
low=task_param_low,
high=task_param_high,
shape=shape,
dtype=self.unwrapped.cfg.dtype,
)
self.observation_space = gym.spaces.Dict(new_spaces)
def observation(self, obs):
obs["task_param"] = self.task_param_values
return obs
[docs] def reset(self, *args, **kwargs):
self.task_param_values = np.array(
[getattr(self.env.cfg, key) for key in self.task_param_names]
)
return super().reset(*args, **kwargs)
[docs]class NormalizeObs(gym.ObservationWrapper):
"""Normalise Bauwerk environment's observations."""
def __init__(self, env: bauwerk.HouseEnv):
"""Normalise Bauwerk environment's observations.
Args:
env (bauwerk.HouseEnv): environment to wrap.
"""
super().__init__(env)
self.observation_space = gym.spaces.Dict(
{
key: (
gym.spaces.Box(
low=-1, high=1, shape=(1,), dtype=self.unwrapped.cfg.dtype
)
if space.shape == (1,)
else space
)
for key, space in self.env.observation_space.items()
}
)
def observation(self, obs: dict) -> dict:
new_obs = {}
for key, value in obs.items():
old_act_space = self.env.observation_space[key]
low = old_act_space.low
high = old_act_space.high
new_obs[key] = (value - low) / (high - low)
return new_obs
[docs]class ClipReward(gym.RewardWrapper):
"""Clip reward of environment."""
def __init__(self, env: gym.Env, min_reward: float, max_reward: float):
"""Clip reward of environment.
Adapted from https://www.gymlibrary.dev/api/wrappers/#rewardwrapper.
Note that in Bauwerk environments clipping the reward may
lead to alternative optimal policies.
Thus, use with care.
Args:
env (gym.Env): environment to apply wrapper to.
min_reward (float): minimum reward value.
max_reward (float): maximum reward value.
"""
super().__init__(env)
self.min_reward = min_reward
self.max_reward = max_reward
self.reward_range = (min_reward, max_reward)
def reward(self, reward: float) -> float:
return np.clip(reward, self.min_reward, self.max_reward)
[docs]class ClipActions(gym.ActionWrapper):
"""Clip actions that can be taken in environment."""
def __init__(self, env: gym.Env, low: Any, high: Any):
"""Clip actions that can be taken in environment.
Args:
env (gym.Env): gym to clip actions for.
low (Any): lower bound of clipped action space (passed to gym.spaces.Box).
This must fit the shape of the env's action space.
high (Any): upper bound of clipped action space (passed to gym.spaces.Box).
"""
super().__init__(env)
self.action_space = gym.spaces.Box(
low=low,
high=high,
shape=env.action_space.shape,
dtype=env.cfg.dtype,
)
def action(self, act):
return act
[docs]class InfeasControlPenalty(gym.Wrapper):
"""Add penalty to reward when agents tries infeasible control actions."""
def __init__(self, env: bauwerk.HouseEnv, penalty_factor: float = 1.0) -> None:
"""Add penalty to reward when agents tries infeasible control actions.
The penalty is computed based on the absolute difference between the
(dis)charging power that the agent last tried to apply to the battery,
and the power that was actually discharged after accounting for the
physics of the system.
Args:
env (bauwerk.HouseEnv): environment to wrap.
penalty_factor (float, optional): multiplicative factor that is
applied to the power difference. Similar to a price on
infeasible control. The scale should be adapted to the pricing
scheme in your control problem, as this factor effectively
determines the "price" of infeasible control. Defaults to 1.0.
"""
self.penalty_factor = penalty_factor
super().__init__(env)
[docs] def step(self, action: object) -> Tuple[object, float, bool, Dict[str, Any]]:
step_return = list(super().step(action))
info = step_return[-1]
reward = step_return[1]
reward -= info["power_diff"] * self.penalty_factor
step_return[1] = float(reward)
return tuple(step_return)