329 lines
12 KiB
Python
329 lines
12 KiB
Python
from contextlib import closing
|
|
from io import StringIO
|
|
from os import path
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
|
|
import gymnasium as gym
|
|
from gymnasium import Env, spaces
|
|
from gymnasium.envs.toy_text.utils import categorical_sample
|
|
from gymnasium.error import DependencyNotInstalled
|
|
|
|
|
|
UP = 0
|
|
RIGHT = 1
|
|
DOWN = 2
|
|
LEFT = 3
|
|
|
|
|
|
class CliffWalkingEnv(Env):
|
|
"""
|
|
Cliff walking involves crossing a gridworld from start to goal while avoiding falling off a cliff.
|
|
|
|
## Description
|
|
The game starts with the player at location [3, 0] of the 4x12 grid world with the
|
|
goal located at [3, 11]. If the player reaches the goal the episode ends.
|
|
|
|
A cliff runs along [3, 1..10]. If the player moves to a cliff location it
|
|
returns to the start location.
|
|
|
|
The player makes moves until they reach the goal.
|
|
|
|
Adapted from Example 6.6 (page 132) from Reinforcement Learning: An Introduction
|
|
by Sutton and Barto [<a href="#cliffwalk_ref">1</a>].
|
|
|
|
With inspiration from:
|
|
[https://github.com/dennybritz/reinforcement-learning/blob/master/lib/envs/cliff_walking.py](https://github.com/dennybritz/reinforcement-learning/blob/master/lib/envs/cliff_walking.py)
|
|
|
|
## Action Space
|
|
The action shape is `(1,)` in the range `{0, 3}` indicating
|
|
which direction to move the player.
|
|
|
|
- 0: Move up
|
|
- 1: Move right
|
|
- 2: Move down
|
|
- 3: Move left
|
|
|
|
## Observation Space
|
|
There are 3 x 12 + 1 possible states. The player cannot be at the cliff, nor at
|
|
the goal as the latter results in the end of the episode. What remains are all
|
|
the positions of the first 3 rows plus the bottom-left cell.
|
|
|
|
The observation is a value representing the player's current position as
|
|
current_row * nrows + current_col (where both the row and col start at 0).
|
|
|
|
For example, the stating position can be calculated as follows: 3 * 12 + 0 = 36.
|
|
|
|
The observation is returned as an `int()`.
|
|
|
|
## Starting State
|
|
The episode starts with the player in state `[36]` (location [3, 0]).
|
|
|
|
## Reward
|
|
Each time step incurs -1 reward, unless the player stepped into the cliff,
|
|
which incurs -100 reward.
|
|
|
|
## Episode End
|
|
The episode terminates when the player enters state `[47]` (location [3, 11]).
|
|
|
|
## Information
|
|
|
|
`step()` and `reset()` return a dict with the following keys:
|
|
- "p" - transition proability for the state.
|
|
|
|
As cliff walking is not stochastic, the transition probability returned always 1.0.
|
|
|
|
## Arguments
|
|
|
|
```python
|
|
import gymnasium as gym
|
|
gym.make('CliffWalking-v0')
|
|
```
|
|
|
|
## References
|
|
<a id="cliffwalk_ref"></a>[1] R. Sutton and A. Barto, “Reinforcement Learning:
|
|
An Introduction” 2020. [Online]. Available: [http://www.incompleteideas.net/book/RLbook2020.pdf](http://www.incompleteideas.net/book/RLbook2020.pdf)
|
|
|
|
## Version History
|
|
- v0: Initial version release
|
|
|
|
"""
|
|
|
|
metadata = {
|
|
"render_modes": ["human", "rgb_array", "ansi"],
|
|
"render_fps": 4,
|
|
}
|
|
|
|
def __init__(self, render_mode: Optional[str] = None):
|
|
self.shape = (4, 12)
|
|
self.start_state_index = np.ravel_multi_index((3, 0), self.shape)
|
|
|
|
self.nS = np.prod(self.shape)
|
|
self.nA = 4
|
|
|
|
# Cliff Location
|
|
self._cliff = np.zeros(self.shape, dtype=bool)
|
|
self._cliff[3, 1:-1] = True
|
|
|
|
# Calculate transition probabilities and rewards
|
|
self.P = {}
|
|
for s in range(self.nS):
|
|
position = np.unravel_index(s, self.shape)
|
|
self.P[s] = {a: [] for a in range(self.nA)}
|
|
self.P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
|
|
self.P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
|
|
self.P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
|
|
self.P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])
|
|
|
|
# Calculate initial state distribution
|
|
# We always start in state (3, 0)
|
|
self.initial_state_distrib = np.zeros(self.nS)
|
|
self.initial_state_distrib[self.start_state_index] = 1.0
|
|
|
|
self.observation_space = spaces.Discrete(self.nS)
|
|
self.action_space = spaces.Discrete(self.nA)
|
|
|
|
self.render_mode = render_mode
|
|
|
|
# pygame utils
|
|
self.cell_size = (60, 60)
|
|
self.window_size = (
|
|
self.shape[1] * self.cell_size[1],
|
|
self.shape[0] * self.cell_size[0],
|
|
)
|
|
self.window_surface = None
|
|
self.clock = None
|
|
self.elf_images = None
|
|
self.start_img = None
|
|
self.goal_img = None
|
|
self.cliff_img = None
|
|
self.mountain_bg_img = None
|
|
self.near_cliff_img = None
|
|
self.tree_img = None
|
|
|
|
def _limit_coordinates(self, coord: np.ndarray) -> np.ndarray:
|
|
"""Prevent the agent from falling out of the grid world."""
|
|
coord[0] = min(coord[0], self.shape[0] - 1)
|
|
coord[0] = max(coord[0], 0)
|
|
coord[1] = min(coord[1], self.shape[1] - 1)
|
|
coord[1] = max(coord[1], 0)
|
|
return coord
|
|
|
|
def _calculate_transition_prob(self, current, delta):
|
|
"""Determine the outcome for an action. Transition Prob is always 1.0.
|
|
|
|
Args:
|
|
current: Current position on the grid as (row, col)
|
|
delta: Change in position for transition
|
|
|
|
Returns:
|
|
Tuple of ``(1.0, new_state, reward, terminated)``
|
|
"""
|
|
new_position = np.array(current) + np.array(delta)
|
|
new_position = self._limit_coordinates(new_position).astype(int)
|
|
new_state = np.ravel_multi_index(tuple(new_position), self.shape)
|
|
if self._cliff[tuple(new_position)]:
|
|
return [(1.0, self.start_state_index, -100, False)]
|
|
|
|
terminal_state = (self.shape[0] - 1, self.shape[1] - 1)
|
|
is_terminated = tuple(new_position) == terminal_state
|
|
return [(1.0, new_state, -1, is_terminated)]
|
|
|
|
def step(self, a):
|
|
transitions = self.P[self.s][a]
|
|
i = categorical_sample([t[0] for t in transitions], self.np_random)
|
|
p, s, r, t = transitions[i]
|
|
self.s = s
|
|
self.lastaction = a
|
|
|
|
if self.render_mode == "human":
|
|
self.render()
|
|
return (int(s), r, t, False, {"prob": p})
|
|
|
|
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
|
|
super().reset(seed=seed)
|
|
self.s = categorical_sample(self.initial_state_distrib, self.np_random)
|
|
self.lastaction = None
|
|
|
|
if self.render_mode == "human":
|
|
self.render()
|
|
return int(self.s), {"prob": 1}
|
|
|
|
def render(self):
|
|
if self.render_mode is None:
|
|
assert self.spec is not None
|
|
gym.logger.warn(
|
|
"You are calling render method without specifying any render mode. "
|
|
"You can specify the render_mode at initialization, "
|
|
f'e.g. gym.make("{self.spec.id}", render_mode="rgb_array")'
|
|
)
|
|
return
|
|
|
|
if self.render_mode == "ansi":
|
|
return self._render_text()
|
|
else:
|
|
return self._render_gui(self.render_mode)
|
|
|
|
def _render_gui(self, mode):
|
|
try:
|
|
import pygame
|
|
except ImportError as e:
|
|
raise DependencyNotInstalled(
|
|
"pygame is not installed, run `pip install gymnasium[toy-text]`"
|
|
) from e
|
|
if self.window_surface is None:
|
|
pygame.init()
|
|
|
|
if mode == "human":
|
|
pygame.display.init()
|
|
pygame.display.set_caption("CliffWalking")
|
|
self.window_surface = pygame.display.set_mode(self.window_size)
|
|
else: # rgb_array
|
|
self.window_surface = pygame.Surface(self.window_size)
|
|
if self.clock is None:
|
|
self.clock = pygame.time.Clock()
|
|
if self.elf_images is None:
|
|
hikers = [
|
|
path.join(path.dirname(__file__), "img/elf_up.png"),
|
|
path.join(path.dirname(__file__), "img/elf_right.png"),
|
|
path.join(path.dirname(__file__), "img/elf_down.png"),
|
|
path.join(path.dirname(__file__), "img/elf_left.png"),
|
|
]
|
|
self.elf_images = [
|
|
pygame.transform.scale(pygame.image.load(f_name), self.cell_size)
|
|
for f_name in hikers
|
|
]
|
|
if self.start_img is None:
|
|
file_name = path.join(path.dirname(__file__), "img/stool.png")
|
|
self.start_img = pygame.transform.scale(
|
|
pygame.image.load(file_name), self.cell_size
|
|
)
|
|
if self.goal_img is None:
|
|
file_name = path.join(path.dirname(__file__), "img/cookie.png")
|
|
self.goal_img = pygame.transform.scale(
|
|
pygame.image.load(file_name), self.cell_size
|
|
)
|
|
if self.mountain_bg_img is None:
|
|
bg_imgs = [
|
|
path.join(path.dirname(__file__), "img/mountain_bg1.png"),
|
|
path.join(path.dirname(__file__), "img/mountain_bg2.png"),
|
|
]
|
|
self.mountain_bg_img = [
|
|
pygame.transform.scale(pygame.image.load(f_name), self.cell_size)
|
|
for f_name in bg_imgs
|
|
]
|
|
if self.near_cliff_img is None:
|
|
near_cliff_imgs = [
|
|
path.join(path.dirname(__file__), "img/mountain_near-cliff1.png"),
|
|
path.join(path.dirname(__file__), "img/mountain_near-cliff2.png"),
|
|
]
|
|
self.near_cliff_img = [
|
|
pygame.transform.scale(pygame.image.load(f_name), self.cell_size)
|
|
for f_name in near_cliff_imgs
|
|
]
|
|
if self.cliff_img is None:
|
|
file_name = path.join(path.dirname(__file__), "img/mountain_cliff.png")
|
|
self.cliff_img = pygame.transform.scale(
|
|
pygame.image.load(file_name), self.cell_size
|
|
)
|
|
|
|
for s in range(self.nS):
|
|
row, col = np.unravel_index(s, self.shape)
|
|
pos = (col * self.cell_size[0], row * self.cell_size[1])
|
|
check_board_mask = row % 2 ^ col % 2
|
|
self.window_surface.blit(self.mountain_bg_img[check_board_mask], pos)
|
|
|
|
if self._cliff[row, col]:
|
|
self.window_surface.blit(self.cliff_img, pos)
|
|
if row < self.shape[0] - 1 and self._cliff[row + 1, col]:
|
|
self.window_surface.blit(self.near_cliff_img[check_board_mask], pos)
|
|
if s == self.start_state_index:
|
|
self.window_surface.blit(self.start_img, pos)
|
|
if s == self.nS - 1:
|
|
self.window_surface.blit(self.goal_img, pos)
|
|
if s == self.s:
|
|
elf_pos = (pos[0], pos[1] - 0.1 * self.cell_size[1])
|
|
last_action = self.lastaction if self.lastaction is not None else 2
|
|
self.window_surface.blit(self.elf_images[last_action], elf_pos)
|
|
|
|
if mode == "human":
|
|
pygame.event.pump()
|
|
pygame.display.update()
|
|
self.clock.tick(self.metadata["render_fps"])
|
|
else: # rgb_array
|
|
return np.transpose(
|
|
np.array(pygame.surfarray.pixels3d(self.window_surface)), axes=(1, 0, 2)
|
|
)
|
|
|
|
def _render_text(self):
|
|
outfile = StringIO()
|
|
|
|
for s in range(self.nS):
|
|
position = np.unravel_index(s, self.shape)
|
|
if self.s == s:
|
|
output = " x "
|
|
# Print terminal state
|
|
elif position == (3, 11):
|
|
output = " T "
|
|
elif self._cliff[position]:
|
|
output = " C "
|
|
else:
|
|
output = " o "
|
|
|
|
if position[1] == 0:
|
|
output = output.lstrip()
|
|
if position[1] == self.shape[1] - 1:
|
|
output = output.rstrip()
|
|
output += "\n"
|
|
|
|
outfile.write(output)
|
|
outfile.write("\n")
|
|
|
|
with closing(outfile):
|
|
return outfile.getvalue()
|
|
|
|
|
|
# Elf and stool from https://franuka.itch.io/rpg-snow-tileset
|
|
# All other assets by ____
|