summaryrefslogtreecommitdiff
path: root/examples/pybullet/gym/pybullet_envs/minitaur/agents/tools/batch_env.py
blob: d103d0133cd71da76d511d413d7fbf86d5364b8a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Copyright 2017 The TensorFlow Agents Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Combine multiple environments to step them in batch."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np


class BatchEnv(object):
  """Combine multiple environments to step them in batch."""

  def __init__(self, envs, blocking):
    """Combine multiple environments to step them in batch.

    To step environments in parallel, environments must support a
    `blocking=False` argument to their step and reset functions that makes them
    return callables instead to receive the result at a later time.

    Args:
      envs: List of environments.
      blocking: Step environments after another rather than in parallel.

    Raises:
      ValueError: Environments have different observation or action spaces.
    """
    self._envs = envs
    self._blocking = blocking
    observ_space = self._envs[0].observation_space
    if not all(env.observation_space == observ_space for env in self._envs):
      raise ValueError('All environments must use the same observation space.')
    action_space = self._envs[0].action_space
    if not all(env.action_space == action_space for env in self._envs):
      raise ValueError('All environments must use the same observation space.')

  def __len__(self):
    """Number of combined environments."""
    return len(self._envs)

  def __getitem__(self, index):
    """Access an underlying environment by index."""
    return self._envs[index]

  def __getattr__(self, name):
    """Forward unimplemented attributes to one of the original environments.

    Args:
      name: Attribute that was accessed.

    Returns:
      Value behind the attribute name one of the wrapped environments.
    """
    return getattr(self._envs[0], name)

  def step(self, action):
    """Forward a batch of actions to the wrapped environments.

    Args:
      action: Batched action to apply to the environment.

    Raises:
      ValueError: Invalid actions.

    Returns:
      Batch of observations, rewards, and done flags.
    """
    actions = action
    for index, (env, action) in enumerate(zip(self._envs, actions)):
      if not env.action_space.contains(action):
        message = 'Invalid action at index {}: {}'
        raise ValueError(message.format(index, action))
    if self._blocking:
      transitions = [
          env.step(action)
          for env, action in zip(self._envs, actions)]
    else:
      transitions = [
          env.step(action, blocking=False)
          for env, action in zip(self._envs, actions)]
      transitions = [transition() for transition in transitions]
    observs, rewards, dones, infos = zip(*transitions)
    observ = np.stack(observs)
    reward = np.stack(rewards)
    done = np.stack(dones)
    info = tuple(infos)
    return observ, reward, done, info

  def reset(self, indices=None):
    """Reset the environment and convert the resulting observation.

    Args:
      indices: The batch indices of environments to reset; defaults to all.

    Returns:
      Batch of observations.
    """
    if indices is None:
      indices = np.arange(len(self._envs))
    if self._blocking:
      observs = [self._envs[index].reset() for index in indices]
    else:
      observs = [self._envs[index].reset(blocking=False) for index in indices]
      observs = [observ() for observ in observs]
    observ = np.stack(observs)
    return observ

  def close(self):
    """Send close messages to the external process and join them."""
    for env in self._envs:
      if hasattr(env, 'close'):
        env.close()