"""Bauwerk-based multi-task and meta RL benchmarks.
API designed to be similar to be similar to that of Meta-World Benchmark
whilst keeping additional dependencies to a minimum.
https://github.com/rlworkgroup/metaworld
"""
from dataclasses import dataclass
import dataclasses
from typing import List, Any, Optional, Dict, Union
import abc
from collections import OrderedDict
import gym
import numpy as np
import bauwerk.envs.solar_battery_house
import bauwerk
from loguru import logger
ENV_NAME = "bauwerk/House-v0"
# note: change over MetaWorld API: there they use NamedTuple instead of dataclass.
@dataclass
class Task:
"""All data necessary to describe a single MDP of Bauwerk env.
Should be passed into set_task method.
"""
cfg: object # cfg of Bauwerk environment (changed from `data` in MetaWorld API)
env_name: str = ENV_NAME
@dataclass
class ParamDist:
fn: Any # function to draw from
@dataclass
class ContParamDist(ParamDist):
"""Distribution over single cfg param."""
low: float # lower bound of distribution
high: float # higher bound of distribution
def sample(self):
return self.fn(low=self.low, high=self.high)
def sample_cfg_dist(self) -> bauwerk.EnvConfig:
"""Sample from CfgDist."""
params = dict(
(field.name, getattr(self, field.name).sample())
if isinstance(getattr(self, field.name), ParamDist)
else (field.name, getattr(self, field.name))
for field in dataclasses.fields(self)
)
return bauwerk.EnvConfig(**params)
def get_default_env_cfg(self) -> bauwerk.EnvConfig:
"""Get default CfgDist with max values."""
params = dict(
(field.name, getattr(self, field.name).high)
if isinstance(getattr(self, field.name), ParamDist)
else (field.name, getattr(self, field.name))
for field in dataclasses.fields(self)
)
return bauwerk.EnvConfig(**params)
CfgDist = dataclasses.make_dataclass(
cls_name="CfgDist",
fields=list(
(field.name, Union[field.type, ParamDist], field)
for field in dataclasses.fields(bauwerk.EnvConfig)
),
namespace={
"sample": sample_cfg_dist,
"get_default_env_cfg": get_default_env_cfg,
},
)
class Benchmark(abc.ABC):
"""A Benchmark.
When used to evaluate an algorithm, only a single instance should be used.
"""
# note: this decorator forces any subclasses to implement this method
# and seed arg added
@abc.abstractmethod
def __init__(self, seed=None):
pass
@property
def train_classes(self) -> OrderedDict:
"""Get all of the environment classes used for training."""
return self._train_classes
@property
def test_classes(self) -> OrderedDict:
"""Get all of the environment classes used for testing."""
return self._test_classes
@property
def train_tasks(self) -> List[Task]:
"""Get all of the training tasks for this benchmark."""
return self._train_tasks
@property
def test_tasks(self) -> List[Task]:
"""Get all of the test tasks for this benchmark."""
return self._test_tasks
@abc.abstractmethod
def make_env(self) -> gym.Env:
"""Create environment instance on which all tasks can be set."""
pass
[docs]class BuildDist(Benchmark):
"""Building distribution."""
def __init__(
self,
cfg_dist: CfgDist,
seed: Optional[int] = None,
num_train_tasks: int = 20,
num_test_tasks: int = 10,
episode_len: Optional[int] = None,
dtype: Union[str, np.dtype] = None,
env_kwargs: Optional[Dict] = None,
):
"""Building distribution.
Args:
cfg_dist (CfgDist): distribution over bauwerk
env configs.
seed (int, optional): Random seed.
Defaults to None.
num_train_tasks (int, optional): Number of training tasks.
Defaults to 20.
num_test_tasks (int, optional): Number of test tasks.
Defaults to 10.
episode_len: (int, optional): Length of episode in distribution
environments. If not set, defaults to distribution configuration.
dtype (Union[str, np.dtype], optional): data type to be returned and
received by envs. Defaults to None, which leads to the general default
of np.float32.
env_kwargs (dict, optional): parameters to pass when creating environment.
This should not be used when evaluating on pre-defined benchmark.
Defaults to None.
"""
super().__init__()
# add cfg distribution
self.cfg_dist = cfg_dist
if episode_len is not None:
self.cfg_dist.episode_len = episode_len
if not dtype is None:
self.cfg_dist.dtype = dtype
self.env_class = bauwerk.envs.HouseEnv
if not env_kwargs is None:
logger.warning(
(
"Env kwargs in benchmark changed. "
"This may lead to inconsistent results."
)
)
self.env_kwargs = env_kwargs
else:
self.env_kwargs = {}
self._train_classes = OrderedDict([(ENV_NAME, self.env_class)])
self._test_classes = [self.env_class]
# Creating tasks
self._train_tasks = self._create_tasks(
seed=seed,
num_tasks=num_train_tasks,
)
self._test_tasks = self._create_tasks(
seed=(seed + 1 if seed is not None else seed),
num_tasks=num_test_tasks,
)
def _create_tasks(self, seed, num_tasks):
"""Create tasks representing building distribution B."""
if seed is not None:
old_np_state = np.random.get_state()
np.random.seed(seed)
tasks = []
for _ in range(num_tasks):
task = Task(
env_name=ENV_NAME,
cfg=self.cfg_dist.sample(),
)
tasks.append(task)
if seed is not None:
np.random.set_state(old_np_state)
return tasks
def make_env(self):
"""Create environment with max parameters.
This enables shared obs and act space.
"""
cfg = self.cfg_dist.get_default_env_cfg()
for name, value in self.env_kwargs.items():
setattr(cfg, name, value)
env = gym.make(
"bauwerk/House-v0",
cfg=cfg,
)
env.unwrapped.force_task_setting = True
return env
[docs]class BuildDistA(BuildDist):
"""Bauwerk building distribution A: identical houses, no variation."""
def __init__(self, **kwargs):
"""Bauwerk building distribution A:
Identical houses, no variation."""
cfg_dist = CfgDist(
battery_size=7.5,
episode_len=24 * 30,
grid_peak_threshold=2.0,
)
super().__init__(**kwargs, cfg_dist=cfg_dist)
[docs]class BuildDistB(BuildDist):
"""Bauwerk building distribution B:"""
def __init__(self, **kwargs):
"""Bauwerk building distribution B:
Houses with varying battery size (0.5kWh to 20kWh)."""
cfg_dist = CfgDist(
battery_size=ContParamDist(
low=0.5,
high=20,
fn=np.random.uniform,
),
episode_len=24 * 30,
grid_peak_threshold=2.0,
)
super().__init__(**kwargs, cfg_dist=cfg_dist)
[docs]class BuildDistC(BuildDist):
"""Bauwerk building distribution C:"""
def __init__(self, **kwargs):
"""Bauwerk building distribution C.
Houses with varying solar (multiplier: 0.5 to 5) and
battery sizes (0.5 to 20kWh)."""
cfg_dist = CfgDist(
battery_size=ContParamDist(
low=0.5,
high=20,
fn=np.random.uniform,
),
solar_scaling_factor=ContParamDist(
low=0.5,
high=5,
fn=np.random.uniform,
),
episode_len=24 * 30,
grid_peak_threshold=2.0,
)
super().__init__(**kwargs, cfg_dist=cfg_dist)
[docs]class BuildDistD(BuildDist):
"""Bauwerk building distribution D: varying battery, load and solar sizes."""
def __init__(self, **kwargs):
"""Bauwerk building distribution D.
Houses with varying battery (0.5 to 20kWh),
load (multiplier: 0.5 to 5) and solar sizes
(multiplier: 0.5 to 5).
This distribution is effectively like differently sized houses."""
cfg_dist = CfgDist(
battery_size=ContParamDist(
low=0.5,
high=20,
fn=np.random.uniform,
),
solar_scaling_factor=ContParamDist(
low=0.5,
high=5,
fn=np.random.uniform,
),
load_scaling_factor=ContParamDist(
low=0.5,
high=5,
fn=np.random.uniform,
),
episode_len=24 * 30,
grid_peak_threshold=2.0,
)
super().__init__(**kwargs, cfg_dist=cfg_dist)
[docs]class BuildDistE(BuildDist):
"""Bauwerk building distribution E."""
def __init__(self, **kwargs):
"""Bauwerk building distribution E.
Same as Bauwerk building distribution D,
other than adding irreducible noise."""
cfg_dist = CfgDist(
battery_size=ContParamDist(
low=0.5,
high=20,
fn=np.random.uniform,
),
solar_scaling_factor=ContParamDist(
low=0.5,
high=5,
fn=np.random.uniform,
),
load_scaling_factor=ContParamDist(
low=0.5,
high=5,
fn=np.random.uniform,
),
load_noise_magnitude=2.0,
solar_noise_magnitude=2.0,
episode_len=24 * 30,
grid_peak_threshold=2.0,
)
super().__init__(**kwargs, cfg_dist=cfg_dist)