Introduction

In this tutorial, we will show how to use PyTupli to set up an efficient pipeline for offline reinforcement learning (RL) for a custom environment. This includes

  • creating a benchmark and uploading it to an instance of a TupliStorage,

  • re-loading this benchmark from the storage,

  • recording RL tuples of (state, action, reward, done) for this benchmark and uploading them to the storage,

  • creating a dataset from the stored episodes, and

  • training an offline RL agent using d3rlpy.

You can skip the last part, but if you want to try that, you have to install the d3rlpy library using pip install d3rlpy.

[ ]:
import io
import math
from typing import Optional

import numpy as np
import pandas as pd

from gymnasium import spaces
from gymnasium.envs.classic_control import utils, MountainCarEnv
from gymnasium.wrappers import TimeLimit

import d3rlpy
from d3rlpy.algos import DiscreteCQLConfig
from d3rlpy.dataset import MDPDataset

from pytupli.benchmark import TupliEnvWrapper
from pytupli.storage import TupliAPIClient, TupliStorage, FileStorage
from gymnasium import Env
from pytupli.schema import ArtifactMetadata, FilterEQ, EpisodeMetadataCallback
from pytupli.dataset import TupliDataset, NumpyTupleParser

PyTupli has two storage options: A local FileStorage and using MongoDB as a backend in the TupliAPIClient. You can run this notebook with both storage types by adjusting the flag below. If you want to use the TupliAPIClient, follow the instructions in the Readme to start the application.

[ ]:
STORAGE_FLAG = 'api'  # "api"

Creating a Custom Environment

We will use the MountainCar example from gymnasium with a small modification: The cart is slowed down by wind in the horizontal direction. We load the wind data from a csv file.

[ ]:
class CustomMountainCarEnv(MountainCarEnv):
    """
    ## Description

    The Mountain Car MDP is a deterministic MDP that consists of a car placed stochastically
    at the bottom of a sinusoidal valley, with the only possible actions being the accelerations
    that can be applied to the car in either direction. The goal of the MDP is to strategically
    accelerate the car to reach the goal state on top of the right hill. There are two versions
    of the mountain car domain in gymnasium: one with discrete actions and one with continuous.
    This version is the one with discrete actions.

    This MDP first appeared in [Andrew Moore's PhD Thesis (1990)](https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-209.pdf)

    ```
    @TECHREPORT{Moore90efficientmemory-based,
        author = {Andrew William Moore},
        title = {Efficient Memory-based Learning for Robot Control},
        institution = {University of Cambridge},
        year = {1990}
    }
    ```

    ## Observation Space

    The observation is a `ndarray` with shape `(2,)` where the elements correspond to the following:

    | Num | Observation                          | Min   | Max  | Unit         |
    |-----|--------------------------------------|-------|------|--------------|
    | 0   | position of the car along the x-axis | -1.2  | 0.6  | position (m) |
    | 1   | velocity of the car                  | -0.07 | 0.07 | velocity (v) |

    ## Action Space

    There are 3 discrete deterministic actions:

    - 0: Accelerate to the left
    - 1: Don't accelerate
    - 2: Accelerate to the right

    ## Transition Dynamics:

    Given an action, the mountain car follows the following transition dynamics:

    *velocity<sub>t+1</sub> = velocity<sub>t</sub> + (action - 1) * force - cos(3 * position<sub>t</sub>) * gravity*

    *position<sub>t+1</sub> = position<sub>t</sub> + velocity<sub>t+1</sub>*

    where force = 0.001 and gravity = 0.0025. The collisions at either end are inelastic with the velocity set to 0
    upon collision with the wall. The position is clipped to the range `[-1.2, 0.6]` and
    velocity is clipped to the range `[-0.07, 0.07]`.

    ## Reward:

    The goal is to reach the flag placed on top of the right hill as quickly as possible, as such the agent is
    penalised with a reward of -1 for each timestep.

    ## Starting State

    The position of the car is assigned a uniform random value in *[-0.6 , -0.4]*.
    The starting velocity of the car is always assigned to 0.

    ## Episode End

    The episode ends if either of the following happens:
    1. Termination: The position of the car is greater than or equal to 0.5 (the goal position on top of the right hill)
    2. Truncation: The length of the episode is 200.

    ## Arguments

    Mountain Car has two parameters for `gymnasium.make` with `render_mode` and `goal_velocity`.
    On reset, the `options` parameter allows the user to change the bounds used to determine the new random state.

    ```python
    >>> import gymnasium as gym
    >>> env = gym.make("MountainCar-v0", render_mode="rgb_array", goal_velocity=0.1)  # default goal_velocity=0
    >>> env
    <TimeLimit<OrderEnforcing<PassiveEnvChecker<MountainCarEnv<MountainCar-v0>>>>>
    >>> env.reset(seed=123, options={"x_init": np.pi/2, "y_init": 0.5})  # default x_init=np.pi, y_init=1.0
    (array([-0.46352962,  0.        ], dtype=float32), {})

    ```

    ## Version History

    * v0: Initial versions release
    """

    metadata = {
        'render_modes': ['human', 'rgb_array'],
        'render_fps': 30,
    }

    def __init__(self, data_path: str, render_mode: Optional[str] = None, goal_velocity=0):
        self.min_position = -1.2
        self.max_position = 0.6
        self.max_speed = 0.07
        self.goal_position = 0.5
        self.goal_velocity = goal_velocity
        self.current_step = 0
        self.data = pd.read_csv(data_path, index_col=0, header=None) * 0.01

        self.force = 0.001
        self.gravity = 0.0025

        self.low = np.array([self.min_position, -self.max_speed], dtype=np.float32)
        self.high = np.array([self.max_position, self.max_speed], dtype=np.float32)

        self.render_mode = render_mode

        self.screen_width = 600
        self.screen_height = 400
        self.screen = None
        self.clock = None
        self.isopen = True

        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Box(self.low, self.high, dtype=np.float32)

    def step(self, action: int):
        assert self.action_space.contains(action), f'{action!r} ({type(action)}) invalid'

        position, velocity = self.state
        velocity += (
            (action - 1) * self.force
            + math.cos(3 * position) * (-self.gravity)
            - self.data.loc[self.current_step].to_numpy().flatten()[0] * math.cos(position)
        )
        velocity = np.clip(velocity, -self.max_speed, self.max_speed)
        position += velocity
        position = np.clip(position, self.min_position, self.max_position)
        if position == self.min_position and velocity < 0:
            velocity = 0

        terminated = bool(position >= self.goal_position and velocity >= self.goal_velocity)
        reward = -1.0
        self.current_step += 1

        self.state = (position, velocity)
        if self.render_mode == 'human':
            self.render()
        # truncation=False as the time limit is handled by the `TimeLimit` wrapper added during `make`
        return np.array(self.state, dtype=np.float32), reward, terminated, False, {}

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        super().reset(seed=seed)
        # Note that if you use custom reset bounds, it may lead to out-of-bound
        # state/observations.
        self.current_step = 0
        low, high = utils.maybe_parse_reset_bounds(options, -0.6, -0.4)
        self.state = np.array([self.np_random.uniform(low=low, high=high), 0])

        if self.render_mode == 'human':
            self.render()
        return np.array(self.state, dtype=np.float32), {}

Serialize Environment for Upload

As a next step, we want to upload our environment to our storage using PyTupli. For this, we will detach the csv file from the environment, upload it seperately, and replace the data attribute in the environment with the id of the stored object. This allows us to re-use artifacts such as csv files in multiple benchmarks. For example, consider a case where you only want to change one parameter within the environment, e.g., the maximum speed. You would have to create a new benchmark, but could re-use the csv file! PyTupli automatically recognizes such duplicates.

To separate the csv file, we have to subclass the TupliEnvWrapper class and overwrite the _serialize() and _deserialize() members. The TupliEnvWrapper is essentially a gymnasium wrapper that records RL tuples in the step() function, but it has a lot of extra functionalities for interacting with the storage.

[ ]:
class MyTupliEnvWrapper(TupliEnvWrapper):
    def _serialize(self, env) -> Env:
        related_data_sources = []
        ds = env.unwrapped.data
        metadata = ArtifactMetadata(name='test')
        data_kwargs = {'header': None}
        try:
            content = ds.to_csv(encoding='utf-8', **data_kwargs)
            content = content.encode(encoding='utf-8')
        except Exception as e:
            raise ValueError(f'Failed to serialize data source: {e}')

        ds_storage_metadata = self.storage.store_artifact(artifact=content, metadata=metadata)
        related_data_sources.append(ds_storage_metadata.id)
        setattr(env.unwrapped, 'data', ds_storage_metadata.id)
        return env, related_data_sources

    @classmethod
    def _deserialize(cls, env: Env, storage: TupliStorage) -> Env:
        data_kwargs = {'header': None, 'index_col': 0}
        ds = storage.load_artifact(env.unwrapped.data)
        ds = ds.decode('utf-8')
        d = io.StringIO(ds)
        df = pd.read_csv(d, **data_kwargs)

        env.unwrapped.data = df
        return env
[ ]:
# which storag to use
if STORAGE_FLAG == 'api':
    storage = TupliAPIClient()
elif STORAGE_FLAG == 'file':
    storage = FileStorage()
else:
    raise ValueError(f"Unknown storage flag: {STORAGE_FLAG}. Has to be 'api' or 'file'.")
[ ]:
# instantiate the environment
max_eps_length = 999
data_path = '/home/hannah/Documents/Code/pytupli/docs/source/tutorials/data/wind_data.csv'
env = TimeLimit(
    CustomMountainCarEnv(render_mode=None, data_path=data_path), max_episode_steps=max_eps_length
)
# Now we can create the benchmark
tupli_env = MyTupliEnvWrapper(env, storage=storage)

Uploading and Downloading Benchmarks

We will now upload the benchmark and download it again.

[ ]:
tupli_env.store(name='mountain-car-v0', description='Mountain Car v0 benchmark')

Let us list the uploaded benchmarks:

[ ]:
%system
!pytupli list_benchmarks

As a next step, we show how to download the benchmark. Note that this is only for demonstration purposes! When loading the benchmark, we can pass a callback that will later be used to add metadate to recorded episodes. We provide a simple example of such a function.

[ ]:
class MyCallback(EpisodeMetadataCallback):
    def __init__(self):
        super().__init__()
        # we will compute the cumulative reward for an episode
        self.cum_reward = 0
        # Furthermore, we want to store the fact that the episode was not an expert episode
        self.is_expert = False
    def reset(self):
        # we will compute the cumulative reward for an episode
        self.cum_reward = 0
    def __call__(self, tuple):
        self.cum_reward += tuple.reward
        return {"cum_eps_reward": [self.cum_reward], "is_expert": self.is_expert}

[ ]:
loaded_tupli_env = MyTupliEnvWrapper.load(storage=storage, benchmark_id=tupli_env.id, metadata_callback=MyCallback())

Recording Episodes for Offline RL Training

The TupliEnvWrapper wrapper allows us to record all interactions with the custom environment to the storage in form of tuples (state, action, reward, terminal, timeout). This can then be used for training an offline RL agent for this environment using any offline RL library. For simplicity, we will use a random policy to generate the data.

[ ]:
# For reproducibility when generating episodes
np.random.seed(42)
obs, info = loaded_tupli_env.reset(seed=42)

for step in range(2000):
    action = np.int64(np.random.randint(low=0, high=3))
    obs, reward, done, truncated, info = loaded_tupli_env.step(action)
    if done or truncated:
        print(f'Episode finished after {step + 1} timesteps')
        obs, info = loaded_tupli_env.reset()

Downloading Episodes for a Benchmark

Next, let us download all episodes that have been recorded for our benchmark. For this, we create a TupliDataset using a filter with the id of the benchmark.

[ ]:
# Create dataset
mdp_dataset = TupliDataset(storage=storage).with_benchmark_filter(
    FilterEQ(key='id', value=loaded_tupli_env.id)
)
mdp_dataset.load()

We can show the contents of the dataset using the preview() method:

[ ]:
mdp_dataset.preview()

Training an Offline RL Agent

Finally, we use d3rlpy to train an offline RL agent on this environment. Note that we do not train it to convergence, we only show how to get from a PyTupli dataset to actually doing offline RL! Our TupliDataset has a method for converting all episodes into numpy arrays for states, actions, rewards, terminals, and timeouts. This can be customized if other output formats are required. Using these arrays, we can then create an MDPDataset, which is the required input format for all d3rlpy algorithms.

[ ]:
obs, act, rew, terminal, truncated = mdp_dataset.convert_to_tensors(parser=NumpyTupleParser)
# create d3rlpy dataset
d3rlpy_dataset = MDPDataset(
    observations=obs, actions=act, rewards=rew, terminals=terminal, timeouts=truncated
)

Finally, let us show that training an agent using conservative Q-learning (CQL) works with this data:

[ ]:
# algorithm for offline training: CQL from d3rlpy
d3rlpy.seed(1)  # for reproducibility
algo = DiscreteCQLConfig(batch_size=64, alpha=2.0, target_update_interval=1000).create(device='cpu')
# train
algo.fit(dataset=d3rlpy_dataset, n_steps=10000, n_steps_per_epoch=100)

Let us test the trained policy:

[ ]:
# activate rendering
setattr(loaded_tupli_env.unwrapped, 'render_mode', 'human')
# deactivate recording of episodes
loaded_tupli_env.deactivate_recording()
# run the environment
np.random.seed(seed=42)
obs, info = loaded_tupli_env.reset(seed=42)

for step in range(800):
    action = np.int64(algo.predict(np.expand_dims(obs, axis=0))[0])
    obs, reward, done, truncated, info = loaded_tupli_env.step(action)
    if done or truncated:
        print(f'Episode finished after {step + 1} timesteps')
        obs, info = loaded_tupli_env.reset()
# deactivate rendering
loaded_tupli_env.close()

The trained policy manages to reach the flag even though it has only learned from random actions!

Deleting Benchmarks

To clean up our storage, we now delete the benchmark and all related artifacts. Episodes will automatically be deleted, too.

[ ]:
loaded_tupli_env.delete(delete_artifacts=True)