Introduction
In this tutorial, we will show how to use PyTupli to set up an efficient pipeline for offline reinforcement learning (RL) for a custom environment. This includes
creating a benchmark and uploading it to an instance of a TupliStorage,
re-loading this benchmark from the storage,
recording RL tuples of (state, action, reward, done) for this benchmark and uploading them to the storage,
creating a dataset from the stored episodes, and
training an offline RL agent using d3rlpy.
You can skip the last part, but if you want to try that, you have to install the d3rlpy library using pip install d3rlpy.
[ ]:
import io
import math
from typing import Optional
import numpy as np
import pandas as pd
from gymnasium import spaces
from gymnasium.envs.classic_control import utils, MountainCarEnv
from gymnasium.wrappers import TimeLimit
import d3rlpy
from d3rlpy.algos import DiscreteCQLConfig
from d3rlpy.dataset import MDPDataset
from pytupli.benchmark import TupliEnvWrapper
from pytupli.storage import TupliAPIClient, TupliStorage, FileStorage
from gymnasium import Env
from pytupli.schema import ArtifactMetadata, FilterEQ, EpisodeMetadataCallback
from pytupli.dataset import TupliDataset
PyTupli has two storage options: A local FileStorage and using MongoDB as a backend in the TupliAPIClient. You can run this notebook with both storage types by adjusting the flag below. If you want to use the TupliAPIClient, follow the instructions in the Readme to start the application.
[ ]:
STORAGE_FLAG = 'api' # "api"
Creating a Custom Environment
We will use the MountainCar example from gymnasium with a small modification: The cart is slowed down by wind in the horizontal direction. We load the wind data from a csv file.
[ ]:
class CustomMountainCarEnv(MountainCarEnv):
"""
## Description
The Mountain Car MDP is a deterministic MDP that consists of a car placed stochastically
at the bottom of a sinusoidal valley, with the only possible actions being the accelerations
that can be applied to the car in either direction. The goal of the MDP is to strategically
accelerate the car to reach the goal state on top of the right hill. There are two versions
of the mountain car domain in gymnasium: one with discrete actions and one with continuous.
This version is the one with discrete actions.
This MDP first appeared in [Andrew Moore's PhD Thesis (1990)](https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-209.pdf)
```
@TECHREPORT{Moore90efficientmemory-based,
author = {Andrew William Moore},
title = {Efficient Memory-based Learning for Robot Control},
institution = {University of Cambridge},
year = {1990}
}
```
## Observation Space
The observation is a `ndarray` with shape `(2,)` where the elements correspond to the following:
| Num | Observation | Min | Max | Unit |
|-----|--------------------------------------|-------|------|--------------|
| 0 | position of the car along the x-axis | -1.2 | 0.6 | position (m) |
| 1 | velocity of the car | -0.07 | 0.07 | velocity (v) |
## Action Space
There are 3 discrete deterministic actions:
- 0: Accelerate to the left
- 1: Don't accelerate
- 2: Accelerate to the right
## Transition Dynamics:
Given an action, the mountain car follows the following transition dynamics:
*velocity<sub>t+1</sub> = velocity<sub>t</sub> + (action - 1) * force - cos(3 * position<sub>t</sub>) * gravity*
*position<sub>t+1</sub> = position<sub>t</sub> + velocity<sub>t+1</sub>*
where force = 0.001 and gravity = 0.0025. The collisions at either end are inelastic with the velocity set to 0
upon collision with the wall. The position is clipped to the range `[-1.2, 0.6]` and
velocity is clipped to the range `[-0.07, 0.07]`.
## Reward:
The goal is to reach the flag placed on top of the right hill as quickly as possible, as such the agent is
penalised with a reward of -1 for each timestep.
## Starting State
The position of the car is assigned a uniform random value in *[-0.6 , -0.4]*.
The starting velocity of the car is always assigned to 0.
## Episode End
The episode ends if either of the following happens:
1. Termination: The position of the car is greater than or equal to 0.5 (the goal position on top of the right hill)
2. Truncation: The length of the episode is 200.
## Arguments
Mountain Car has two parameters for `gymnasium.make` with `render_mode` and `goal_velocity`.
On reset, the `options` parameter allows the user to change the bounds used to determine the new random state.
```python
>>> import gymnasium as gym
>>> env = gym.make("MountainCar-v0", render_mode="rgb_array", goal_velocity=0.1) # default goal_velocity=0
>>> env
<TimeLimit<OrderEnforcing<PassiveEnvChecker<MountainCarEnv<MountainCar-v0>>>>>
>>> env.reset(seed=123, options={"x_init": np.pi/2, "y_init": 0.5}) # default x_init=np.pi, y_init=1.0
(array([-0.46352962, 0. ], dtype=float32), {})
```
## Version History
* v0: Initial versions release
"""
metadata = {
'render_modes': ['human', 'rgb_array'],
'render_fps': 30,
}
def __init__(self, data_path: str, render_mode: Optional[str] = None, goal_velocity=0):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.goal_velocity = goal_velocity
self.current_step = 0
self.data = pd.read_csv(data_path, index_col=0, header=None) * 0.01
self.force = 0.001
self.gravity = 0.0025
self.low = np.array([self.min_position, -self.max_speed], dtype=np.float32)
self.high = np.array([self.max_position, self.max_speed], dtype=np.float32)
self.render_mode = render_mode
self.screen_width = 600
self.screen_height = 400
self.screen = None
self.clock = None
self.isopen = True
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(self.low, self.high, dtype=np.float32)
def step(self, action: int):
assert self.action_space.contains(action), f'{action!r} ({type(action)}) invalid'
position, velocity = self.state
velocity += (
(action - 1) * self.force
+ math.cos(3 * position) * (-self.gravity)
- self.data.loc[self.current_step].to_numpy().flatten()[0] * math.cos(position)
)
velocity = np.clip(velocity, -self.max_speed, self.max_speed)
position += velocity
position = np.clip(position, self.min_position, self.max_position)
if position == self.min_position and velocity < 0:
velocity = 0
terminated = bool(position >= self.goal_position and velocity >= self.goal_velocity)
reward = -1.0
self.current_step += 1
self.state = (position, velocity)
if self.render_mode == 'human':
self.render()
# truncation=False as the time limit is handled by the `TimeLimit` wrapper added during `make`
return np.array(self.state, dtype=np.float32), reward, terminated, False, {}
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
):
super().reset(seed=seed)
# Note that if you use custom reset bounds, it may lead to out-of-bound
# state/observations.
self.current_step = 0
low, high = utils.maybe_parse_reset_bounds(options, -0.6, -0.4)
self.state = np.array([self.np_random.uniform(low=low, high=high), 0])
if self.render_mode == 'human':
self.render()
return np.array(self.state, dtype=np.float32), {}
Serialize Environment for Upload
As a next step, we want to upload our environment to our storage using PyTupli. For this, we will detach the csv file from the environment, upload it seperately, and replace the data attribute in the environment with the id of the stored object. This allows us to re-use artifacts such as csv files in multiple benchmarks. For example, consider a case where you only want to change one parameter within the environment, e.g., the maximum speed. You would have to create a new benchmark, but could re-use the csv file! PyTupli automatically recognizes such duplicates.
To separate the csv file, we have to subclass the TupliEnvWrapper class and overwrite the _serialize() and _deserialize() members. The TupliEnvWrapper is essentially a gymnasium wrapper that records RL tuples in the step() function, but it has a lot of extra functionalities for interacting with the storage.
[ ]:
class MyTupliEnvWrapper(TupliEnvWrapper):
def _serialize(self, env) -> Env:
related_data_sources = []
ds = env.unwrapped.data
metadata = ArtifactMetadata(name='test')
data_kwargs = {'header': None}
try:
content = ds.to_csv(encoding='utf-8', **data_kwargs)
content = content.encode(encoding='utf-8')
except Exception as e:
raise ValueError(f'Failed to serialize data source: {e}')
ds_storage_metadata = self.storage.store_artifact(artifact=content, metadata=metadata)
related_data_sources.append(ds_storage_metadata.id)
setattr(env.unwrapped, 'data', ds_storage_metadata.id)
return env, related_data_sources
@classmethod
def _deserialize(cls, env: Env, storage: TupliStorage) -> Env:
data_kwargs = {'header': None, 'index_col': 0}
ds = storage.load_artifact(env.unwrapped.data)
ds = ds.decode('utf-8')
d = io.StringIO(ds)
df = pd.read_csv(d, **data_kwargs)
env.unwrapped.data = df
return env
[ ]:
# which storag to use
if STORAGE_FLAG == 'api':
storage = TupliAPIClient()
elif STORAGE_FLAG == 'file':
storage = FileStorage()
else:
raise ValueError(f"Unknown storage flag: {STORAGE_FLAG}. Has to be 'api' or 'file'.")
[ ]:
# instantiate the environment
max_eps_length = 999
data_path = '/home/hannah/Documents/Code/pytupli/docs/source/tutorials/data/wind_data.csv'
env = TimeLimit(
CustomMountainCarEnv(render_mode=None, data_path=data_path), max_episode_steps=max_eps_length
)
# Now we can create the benchmark
tupli_env = MyTupliEnvWrapper(env, storage=storage)
Uploading and Downloading Benchmarks
We will now upload the benchmark and download it again.
[ ]:
tupli_env.store(name='mountain-car-v0', description='Mountain Car v0 benchmark')
Let us list the uploaded benchmarks:
[ ]:
%system
!pytupli list_benchmarks
As a next step, we show how to download the benchmark. Note that this is only for demonstration purposes! When loading the benchmark, we can pass a callback that will later be used to add metadate to recorded episodes. We provide a simple example of such a function.
[ ]:
class MyCallback(EpisodeMetadataCallback):
def __init__(self):
super().__init__()
# we will compute the cumulative reward for an episode
self.cum_reward = 0
# Furthermore, we want to store the fact that the episode was not an expert episode
self.is_expert = False
def reset(self):
# we will compute the cumulative reward for an episode
self.cum_reward = 0
def __call__(self, tuple):
self.cum_reward += tuple.reward
return {"cum_eps_reward": [self.cum_reward], "is_expert": self.is_expert}
[ ]:
loaded_tupli_env = MyTupliEnvWrapper.load(storage=storage, benchmark_id=tupli_env.id, metadata_callback=MyCallback())
Recording Episodes for Offline RL Training
The TupliEnvWrapper wrapper allows us to record all interactions with the custom environment to the storage in form of tuples (state, action, reward, terminal, timeout). This can then be used for training an offline RL agent for this environment using any offline RL library. For simplicity, we will use a random policy to generate the data.
[ ]:
# For reproducibility when generating episodes
np.random.seed(42)
obs, info = loaded_tupli_env.reset(seed=42)
for step in range(2000):
action = np.int64(np.random.randint(low=0, high=3))
obs, reward, done, truncated, info = loaded_tupli_env.step(action)
if done or truncated:
print(f'Episode finished after {step + 1} timesteps')
obs, info = loaded_tupli_env.reset()
Downloading Episodes for a Benchmark
Next, let us download all episodes that have been recorded for our benchmark. For this, we create a TupliDataset using a filter with the id of the benchmark.
[ ]:
# Create dataset
mdp_dataset = TupliDataset(storage=storage).with_benchmark_filter(
FilterEQ(key='id', value=loaded_tupli_env.id)
)
mdp_dataset.load()
We can show the contents of the dataset using the preview() method:
[ ]:
mdp_dataset.preview()
Training an Offline RL Agent
Finally, we use d3rlpy to train an offline RL agent on this environment. Note that we do not train it to convergence, we only show how to get from a PyTupli dataset to actually doing offline RL! Our TupliDataset has a method for converting all episodes into numpy arrays for states, actions, rewards, terminals, and timeouts. This can be customized if other output formats are required. Using these arrays, we can then create an MDPDataset, which is the required input format for all d3rlpy
algorithms.
[ ]:
obs, act, rew, terminal, truncated = mdp_dataset.convert_to_numpy()
# create d3rlpy dataset
d3rlpy_dataset = MDPDataset(
observations=obs, actions=act, rewards=rew, terminals=terminal, timeouts=truncated
)
Finally, let us show that training an agent using conservative Q-learning (CQL) works with this data:
[ ]:
# algorithm for offline training: CQL from d3rlpy
d3rlpy.seed(1) # for reproducibility
algo = DiscreteCQLConfig(batch_size=64, alpha=2.0, target_update_interval=1000).create(device='cpu')
# train
algo.fit(dataset=d3rlpy_dataset, n_steps=10000, n_steps_per_epoch=100)
Let us test the trained policy:
[ ]:
# activate rendering
setattr(loaded_tupli_env.unwrapped, 'render_mode', 'human')
# deactivate recording of episodes
loaded_tupli_env.deactivate_recording()
# run the environment
np.random.seed(seed=42)
obs, info = loaded_tupli_env.reset(seed=42)
for step in range(800):
action = np.int64(algo.predict(np.expand_dims(obs, axis=0))[0])
obs, reward, done, truncated, info = loaded_tupli_env.step(action)
if done or truncated:
print(f'Episode finished after {step + 1} timesteps')
obs, info = loaded_tupli_env.reset()
# deactivate rendering
loaded_tupli_env.close()
The trained policy manages to reach the flag even though it has only learned from random actions!
Deleting Benchmarks
To clean up our storage, we now delete the benchmark and all related artifacts. Episodes will automatically be deleted, too.
[ ]:
loaded_tupli_env.delete(delete_artifacts=True)
[ ]: