diff options
Diffstat (limited to 'examples/pybullet/gym/pybullet_envs/minitaur/envs/minitaur.py')
-rw-r--r-- | examples/pybullet/gym/pybullet_envs/minitaur/envs/minitaur.py | 996 |
1 files changed, 996 insertions, 0 deletions
diff --git a/examples/pybullet/gym/pybullet_envs/minitaur/envs/minitaur.py b/examples/pybullet/gym/pybullet_envs/minitaur/envs/minitaur.py new file mode 100644 index 000000000..cd5749d22 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/minitaur/envs/minitaur.py @@ -0,0 +1,996 @@ +"""This file implements the functionalities of a minitaur using pybullet. + +""" +import collections +import copy +import math +import re + +import numpy as np +import motor + +INIT_POSITION = [0, 0, .2] +INIT_RACK_POSITION = [0, 0, 1] +INIT_ORIENTATION = [0, 0, 0, 1] +KNEE_CONSTRAINT_POINT_RIGHT = [0, 0.005, 0.2] +KNEE_CONSTRAINT_POINT_LEFT = [0, 0.01, 0.2] +OVERHEAT_SHUTDOWN_TORQUE = 2.45 +OVERHEAT_SHUTDOWN_TIME = 1.0 +LEG_POSITION = ["front_left", "back_left", "front_right", "back_right"] +MOTOR_NAMES = [ + "motor_front_leftL_joint", "motor_front_leftR_joint", + "motor_back_leftL_joint", "motor_back_leftR_joint", + "motor_front_rightL_joint", "motor_front_rightR_joint", + "motor_back_rightL_joint", "motor_back_rightR_joint" +] +_CHASSIS_NAME_PATTERN = re.compile(r"chassis\D*center") +_MOTOR_NAME_PATTERN = re.compile(r"motor\D*joint") +_KNEE_NAME_PATTERN = re.compile(r"knee\D*") +SENSOR_NOISE_STDDEV = (0.0, 0.0, 0.0, 0.0, 0.0) +TWO_PI = 2 * math.pi + + +def MapToMinusPiToPi(angles): + """Maps a list of angles to [-pi, pi]. + + Args: + angles: A list of angles in rad. + Returns: + A list of angle mapped to [-pi, pi]. + """ + mapped_angles = copy.deepcopy(angles) + for i in range(len(angles)): + mapped_angles[i] = math.fmod(angles[i], TWO_PI) + if mapped_angles[i] >= math.pi: + mapped_angles[i] -= TWO_PI + elif mapped_angles[i] < -math.pi: + mapped_angles[i] += TWO_PI + return mapped_angles + + +class Minitaur(object): + """The minitaur class that simulates a quadruped robot from Ghost Robotics. + + """ + + def __init__(self, + pybullet_client, + urdf_root="", + time_step=0.01, + action_repeat=1, + self_collision_enabled=False, + motor_velocity_limit=np.inf, + pd_control_enabled=False, + accurate_motor_model_enabled=False, + remove_default_joint_damping=False, + motor_kp=1.0, + motor_kd=0.02, + pd_latency=0.0, + control_latency=0.0, + observation_noise_stdev=SENSOR_NOISE_STDDEV, + torque_control_enabled=False, + motor_overheat_protection=False, + on_rack=False): + """Constructs a minitaur and reset it to the initial states. + + Args: + pybullet_client: The instance of BulletClient to manage different + simulations. + urdf_root: The path to the urdf folder. + time_step: The time step of the simulation. + action_repeat: The number of ApplyAction() for each control step. + self_collision_enabled: Whether to enable self collision. + motor_velocity_limit: The upper limit of the motor velocity. + pd_control_enabled: Whether to use PD control for the motors. + accurate_motor_model_enabled: Whether to use the accurate DC motor model. + remove_default_joint_damping: Whether to remove the default joint damping. + motor_kp: proportional gain for the accurate motor model. + motor_kd: derivative gain for the accurate motor model. + pd_latency: The latency of the observations (in seconds) used to calculate + PD control. On the real hardware, it is the latency between the + microcontroller and the motor controller. + control_latency: The latency of the observations (in second) used to + calculate action. On the real hardware, it is the latency from the motor + controller, the microcontroller to the host (Nvidia TX2). + observation_noise_stdev: The standard deviation of a Gaussian noise model + for the sensor. It should be an array for separate sensors in the + following order [motor_angle, motor_velocity, motor_torque, + base_roll_pitch_yaw, base_angular_velocity] + torque_control_enabled: Whether to use the torque control, if set to + False, pose control will be used. + motor_overheat_protection: Whether to shutdown the motor that has exerted + large torque (OVERHEAT_SHUTDOWN_TORQUE) for an extended amount of time + (OVERHEAT_SHUTDOWN_TIME). See ApplyAction() in minitaur.py for more + details. + on_rack: Whether to place the minitaur on rack. This is only used to debug + the walking gait. In this mode, the minitaur's base is hanged midair so + that its walking gait is clearer to visualize. + """ + self.num_motors = 8 + self.num_legs = int(self.num_motors / 2) + self._pybullet_client = pybullet_client + self._action_repeat = action_repeat + self._urdf_root = urdf_root + self._self_collision_enabled = self_collision_enabled + self._motor_velocity_limit = motor_velocity_limit + self._pd_control_enabled = pd_control_enabled + self._motor_direction = [-1, -1, -1, -1, 1, 1, 1, 1] + self._observed_motor_torques = np.zeros(self.num_motors) + self._applied_motor_torques = np.zeros(self.num_motors) + self._max_force = 3.5 + self._pd_latency = pd_latency + self._control_latency = control_latency + self._observation_noise_stdev = observation_noise_stdev + self._accurate_motor_model_enabled = accurate_motor_model_enabled + self._remove_default_joint_damping = remove_default_joint_damping + self._observation_history = collections.deque(maxlen=100) + self._control_observation = [] + self._chassis_link_ids = [-1] + self._leg_link_ids = [] + self._motor_link_ids = [] + self._foot_link_ids = [] + self._torque_control_enabled = torque_control_enabled + self._motor_overheat_protection = motor_overheat_protection + self._on_rack = on_rack + if self._accurate_motor_model_enabled: + self._kp = motor_kp + self._kd = motor_kd + self._motor_model = motor.MotorModel( + torque_control_enabled=self._torque_control_enabled, + kp=self._kp, + kd=self._kd) + elif self._pd_control_enabled: + self._kp = 8 + self._kd = 0.3 + else: + self._kp = 1 + self._kd = 1 + self.time_step = time_step + self._step_counter = 0 + # reset_time=-1.0 means skipping the reset motion. + # See Reset for more details. + self.Reset(reset_time=-1.0) + + def GetTimeSinceReset(self): + return self._step_counter * self.time_step + + def Step(self, action): + for _ in range(self._action_repeat): + self.ApplyAction(action) + self._pybullet_client.stepSimulation() + self.ReceiveObservation() + self._step_counter += 1 + + def Terminate(self): + pass + + def _RecordMassInfoFromURDF(self): + self._base_mass_urdf = [] + for chassis_id in self._chassis_link_ids: + self._base_mass_urdf.append( + self._pybullet_client.getDynamicsInfo(self.quadruped, chassis_id)[0]) + self._leg_masses_urdf = [] + for leg_id in self._leg_link_ids: + self._leg_masses_urdf.append( + self._pybullet_client.getDynamicsInfo(self.quadruped, leg_id)[0]) + for motor_id in self._motor_link_ids: + self._leg_masses_urdf.append( + self._pybullet_client.getDynamicsInfo(self.quadruped, motor_id)[0]) + + def _RecordInertiaInfoFromURDF(self): + """Record the inertia of each body from URDF file.""" + self._link_urdf = [] + num_bodies = self._pybullet_client.getNumJoints(self.quadruped) + for body_id in range(-1, num_bodies): # -1 is for the base link. + inertia = self._pybullet_client.getDynamicsInfo(self.quadruped, + body_id)[2] + self._link_urdf.append(inertia) + # We need to use id+1 to index self._link_urdf because it has the base + # (index = -1) at the first element. + self._base_inertia_urdf = [ + self._link_urdf[chassis_id + 1] for chassis_id in self._chassis_link_ids + ] + self._leg_inertia_urdf = [ + self._link_urdf[leg_id + 1] for leg_id in self._leg_link_ids + ] + self._leg_inertia_urdf.extend( + [self._link_urdf[motor_id + 1] for motor_id in self._motor_link_ids]) + + def _BuildJointNameToIdDict(self): + num_joints = self._pybullet_client.getNumJoints(self.quadruped) + self._joint_name_to_id = {} + for i in range(num_joints): + joint_info = self._pybullet_client.getJointInfo(self.quadruped, i) + self._joint_name_to_id[joint_info[1].decode("UTF-8")] = joint_info[0] + + def _BuildUrdfIds(self): + """Build the link Ids from its name in the URDF file.""" + num_joints = self._pybullet_client.getNumJoints(self.quadruped) + self._chassis_link_ids = [-1] + # the self._leg_link_ids include both the upper and lower links of the leg. + self._leg_link_ids = [] + self._motor_link_ids = [] + self._foot_link_ids = [] + for i in range(num_joints): + joint_info = self._pybullet_client.getJointInfo(self.quadruped, i) + joint_name = joint_info[1].decode("UTF-8") + joint_id = self._joint_name_to_id[joint_name] + if _CHASSIS_NAME_PATTERN.match(joint_name): + self._chassis_link_ids.append(joint_id) + elif _MOTOR_NAME_PATTERN.match(joint_name): + self._motor_link_ids.append(joint_id) + elif _KNEE_NAME_PATTERN.match(joint_name): + self._foot_link_ids.append(joint_id) + else: + self._leg_link_ids.append(joint_id) + self._leg_link_ids.extend(self._foot_link_ids) + self._chassis_link_ids.sort() + self._motor_link_ids.sort() + self._foot_link_ids.sort() + self._leg_link_ids.sort() + + def _RemoveDefaultJointDamping(self): + num_joints = self._pybullet_client.getNumJoints(self.quadruped) + for i in range(num_joints): + joint_info = self._pybullet_client.getJointInfo(self.quadruped, i) + self._pybullet_client.changeDynamics( + joint_info[0], -1, linearDamping=0, angularDamping=0) + + def _BuildMotorIdList(self): + self._motor_id_list = [ + self._joint_name_to_id[motor_name] for motor_name in MOTOR_NAMES + ] + + def IsObservationValid(self): + """Whether the observation is valid for the current time step. + + In simulation, observations are always valid. In real hardware, it may not + be valid from time to time when communication error happens between the + Nvidia TX2 and the microcontroller. + + Returns: + Whether the observation is valid for the current time step. + """ + return True + + def Reset(self, reload_urdf=True, default_motor_angles=None, reset_time=3.0): + """Reset the minitaur to its initial states. + + Args: + reload_urdf: Whether to reload the urdf file. If not, Reset() just place + the minitaur back to its starting position. + default_motor_angles: The default motor angles. If it is None, minitaur + will hold a default pose (motor angle math.pi / 2) for 100 steps. In + torque control mode, the phase of holding the default pose is skipped. + reset_time: The duration (in seconds) to hold the default motor angles. If + reset_time <= 0 or in torque control mode, the phase of holding the + default pose is skipped. + """ + if self._on_rack: + init_position = INIT_RACK_POSITION + else: + init_position = INIT_POSITION + if reload_urdf: + if self._self_collision_enabled: + self.quadruped = self._pybullet_client.loadURDF( + "%s/quadruped/minitaur.urdf" % self._urdf_root, + init_position, + useFixedBase=self._on_rack, + flags=self._pybullet_client.URDF_USE_SELF_COLLISION) + else: + self.quadruped = self._pybullet_client.loadURDF( + "%s/quadruped/minitaur.urdf" % self._urdf_root, + init_position, + useFixedBase=self._on_rack) + self._BuildJointNameToIdDict() + self._BuildUrdfIds() + if self._remove_default_joint_damping: + self._RemoveDefaultJointDamping() + self._BuildMotorIdList() + self._RecordMassInfoFromURDF() + self._RecordInertiaInfoFromURDF() + self.ResetPose(add_constraint=True) + else: + self._pybullet_client.resetBasePositionAndOrientation( + self.quadruped, init_position, INIT_ORIENTATION) + self._pybullet_client.resetBaseVelocity(self.quadruped, [0, 0, 0], + [0, 0, 0]) + self.ResetPose(add_constraint=False) + self._overheat_counter = np.zeros(self.num_motors) + self._motor_enabled_list = [True] * self.num_motors + self._step_counter = 0 + + # Perform reset motion within reset_duration if in position control mode. + # Nothing is performed if in torque control mode for now. + # TODO(jietan): Add reset motion when the torque control is fully supported. + self._observation_history.clear() + if not self._torque_control_enabled and reset_time > 0.0: + self.ReceiveObservation() + for _ in range(100): + self.ApplyAction([math.pi / 2] * self.num_motors) + self._pybullet_client.stepSimulation() + self.ReceiveObservation() + if default_motor_angles is not None: + num_steps_to_reset = int(reset_time / self.time_step) + for _ in range(num_steps_to_reset): + self.ApplyAction(default_motor_angles) + self._pybullet_client.stepSimulation() + self.ReceiveObservation() + self.ReceiveObservation() + + def _SetMotorTorqueById(self, motor_id, torque): + self._pybullet_client.setJointMotorControl2( + bodyIndex=self.quadruped, + jointIndex=motor_id, + controlMode=self._pybullet_client.TORQUE_CONTROL, + force=torque) + + def _SetDesiredMotorAngleById(self, motor_id, desired_angle): + self._pybullet_client.setJointMotorControl2( + bodyIndex=self.quadruped, + jointIndex=motor_id, + controlMode=self._pybullet_client.POSITION_CONTROL, + targetPosition=desired_angle, + positionGain=self._kp, + velocityGain=self._kd, + force=self._max_force) + + def _SetDesiredMotorAngleByName(self, motor_name, desired_angle): + self._SetDesiredMotorAngleById(self._joint_name_to_id[motor_name], + desired_angle) + + def ResetPose(self, add_constraint): + """Reset the pose of the minitaur. + + Args: + add_constraint: Whether to add a constraint at the joints of two feet. + """ + for i in range(self.num_legs): + self._ResetPoseForLeg(i, add_constraint) + + def _ResetPoseForLeg(self, leg_id, add_constraint): + """Reset the initial pose for the leg. + + Args: + leg_id: It should be 0, 1, 2, or 3, which represents the leg at + front_left, back_left, front_right and back_right. + add_constraint: Whether to add a constraint at the joints of two feet. + """ + knee_friction_force = 0 + half_pi = math.pi / 2.0 + knee_angle = -2.1834 + + leg_position = LEG_POSITION[leg_id] + self._pybullet_client.resetJointState( + self.quadruped, + self._joint_name_to_id["motor_" + leg_position + "L_joint"], + self._motor_direction[2 * leg_id] * half_pi, + targetVelocity=0) + self._pybullet_client.resetJointState( + self.quadruped, + self._joint_name_to_id["knee_" + leg_position + "L_link"], + self._motor_direction[2 * leg_id] * knee_angle, + targetVelocity=0) + self._pybullet_client.resetJointState( + self.quadruped, + self._joint_name_to_id["motor_" + leg_position + "R_joint"], + self._motor_direction[2 * leg_id + 1] * half_pi, + targetVelocity=0) + self._pybullet_client.resetJointState( + self.quadruped, + self._joint_name_to_id["knee_" + leg_position + "R_link"], + self._motor_direction[2 * leg_id + 1] * knee_angle, + targetVelocity=0) + if add_constraint: + self._pybullet_client.createConstraint( + self.quadruped, + self._joint_name_to_id["knee_" + leg_position + "R_link"], + self.quadruped, + self._joint_name_to_id["knee_" + leg_position + "L_link"], + self._pybullet_client.JOINT_POINT2POINT, [0, 0, 0], + KNEE_CONSTRAINT_POINT_RIGHT, KNEE_CONSTRAINT_POINT_LEFT) + + if self._accurate_motor_model_enabled or self._pd_control_enabled: + # Disable the default motor in pybullet. + self._pybullet_client.setJointMotorControl2( + bodyIndex=self.quadruped, + jointIndex=( + self._joint_name_to_id["motor_" + leg_position + "L_joint"]), + controlMode=self._pybullet_client.VELOCITY_CONTROL, + targetVelocity=0, + force=knee_friction_force) + self._pybullet_client.setJointMotorControl2( + bodyIndex=self.quadruped, + jointIndex=( + self._joint_name_to_id["motor_" + leg_position + "R_joint"]), + controlMode=self._pybullet_client.VELOCITY_CONTROL, + targetVelocity=0, + force=knee_friction_force) + + else: + self._SetDesiredMotorAngleByName( + "motor_" + leg_position + "L_joint", + self._motor_direction[2 * leg_id] * half_pi) + self._SetDesiredMotorAngleByName( + "motor_" + leg_position + "R_joint", + self._motor_direction[2 * leg_id + 1] * half_pi) + + self._pybullet_client.setJointMotorControl2( + bodyIndex=self.quadruped, + jointIndex=(self._joint_name_to_id["knee_" + leg_position + "L_link"]), + controlMode=self._pybullet_client.VELOCITY_CONTROL, + targetVelocity=0, + force=knee_friction_force) + self._pybullet_client.setJointMotorControl2( + bodyIndex=self.quadruped, + jointIndex=(self._joint_name_to_id["knee_" + leg_position + "R_link"]), + controlMode=self._pybullet_client.VELOCITY_CONTROL, + targetVelocity=0, + force=knee_friction_force) + + def GetBasePosition(self): + """Get the position of minitaur's base. + + Returns: + The position of minitaur's base. + """ + position, _ = ( + self._pybullet_client.getBasePositionAndOrientation(self.quadruped)) + return position + + def GetTrueBaseRollPitchYaw(self): + """Get minitaur's base orientation in euler angle in the world frame. + + Returns: + A tuple (roll, pitch, yaw) of the base in world frame. + """ + orientation = self.GetTrueBaseOrientation() + roll_pitch_yaw = self._pybullet_client.getEulerFromQuaternion(orientation) + return np.asarray(roll_pitch_yaw) + + def GetBaseRollPitchYaw(self): + """Get minitaur's base orientation in euler angle in the world frame. + + This function mimicks the noisy sensor reading and adds latency. + Returns: + A tuple (roll, pitch, yaw) of the base in world frame polluted by noise + and latency. + """ + delayed_orientation = np.array( + self._control_observation[3 * self.num_motors:3 * self.num_motors + 4]) + delayed_roll_pitch_yaw = self._pybullet_client.getEulerFromQuaternion( + delayed_orientation) + roll_pitch_yaw = self._AddSensorNoise( + np.array(delayed_roll_pitch_yaw), self._observation_noise_stdev[3]) + return roll_pitch_yaw + + def GetTrueMotorAngles(self): + """Gets the eight motor angles at the current moment, mapped to [-pi, pi]. + + Returns: + Motor angles, mapped to [-pi, pi]. + """ + motor_angles = [ + self._pybullet_client.getJointState(self.quadruped, motor_id)[0] + for motor_id in self._motor_id_list + ] + motor_angles = np.multiply(motor_angles, self._motor_direction) + return motor_angles + + def GetMotorAngles(self): + """Gets the eight motor angles. + + This function mimicks the noisy sensor reading and adds latency. The motor + angles that are delayed, noise polluted, and mapped to [-pi, pi]. + + Returns: + Motor angles polluted by noise and latency, mapped to [-pi, pi]. + """ + motor_angles = self._AddSensorNoise( + np.array(self._control_observation[0:self.num_motors]), + self._observation_noise_stdev[0]) + return MapToMinusPiToPi(motor_angles) + + def GetTrueMotorVelocities(self): + """Get the velocity of all eight motors. + + Returns: + Velocities of all eight motors. + """ + motor_velocities = [ + self._pybullet_client.getJointState(self.quadruped, motor_id)[1] + for motor_id in self._motor_id_list + ] + motor_velocities = np.multiply(motor_velocities, self._motor_direction) + return motor_velocities + + def GetMotorVelocities(self): + """Get the velocity of all eight motors. + + This function mimicks the noisy sensor reading and adds latency. + Returns: + Velocities of all eight motors polluted by noise and latency. + """ + return self._AddSensorNoise( + np.array( + self._control_observation[self.num_motors:2 * self.num_motors]), + self._observation_noise_stdev[1]) + + def GetTrueMotorTorques(self): + """Get the amount of torque the motors are exerting. + + Returns: + Motor torques of all eight motors. + """ + if self._accurate_motor_model_enabled or self._pd_control_enabled: + return self._observed_motor_torques + else: + motor_torques = [ + self._pybullet_client.getJointState(self.quadruped, motor_id)[3] + for motor_id in self._motor_id_list + ] + motor_torques = np.multiply(motor_torques, self._motor_direction) + return motor_torques + + def GetMotorTorques(self): + """Get the amount of torque the motors are exerting. + + This function mimicks the noisy sensor reading and adds latency. + Returns: + Motor torques of all eight motors polluted by noise and latency. + """ + return self._AddSensorNoise( + np.array( + self._control_observation[2 * self.num_motors:3 * self.num_motors]), + self._observation_noise_stdev[2]) + + def GetTrueBaseOrientation(self): + """Get the orientation of minitaur's base, represented as quaternion. + + Returns: + The orientation of minitaur's base. + """ + _, orientation = ( + self._pybullet_client.getBasePositionAndOrientation(self.quadruped)) + return orientation + + def GetBaseOrientation(self): + """Get the orientation of minitaur's base, represented as quaternion. + + This function mimicks the noisy sensor reading and adds latency. + Returns: + The orientation of minitaur's base polluted by noise and latency. + """ + return self._pybullet_client.getQuaternionFromEuler( + self.GetBaseRollPitchYaw()) + + def GetTrueBaseRollPitchYawRate(self): + """Get the rate of orientation change of the minitaur's base in euler angle. + + Returns: + rate of (roll, pitch, yaw) change of the minitaur's base. + """ + vel = self._pybullet_client.getBaseVelocity(self.quadruped) + return np.asarray([vel[1][0], vel[1][1], vel[1][2]]) + + def GetBaseRollPitchYawRate(self): + """Get the rate of orientation change of the minitaur's base in euler angle. + + This function mimicks the noisy sensor reading and adds latency. + Returns: + rate of (roll, pitch, yaw) change of the minitaur's base polluted by noise + and latency. + """ + return self._AddSensorNoise( + np.array(self._control_observation[3 * self.num_motors + 4: + 3 * self.num_motors + 7]), + self._observation_noise_stdev[4]) + + def GetActionDimension(self): + """Get the length of the action list. + + Returns: + The length of the action list. + """ + return self.num_motors + + def ApplyAction(self, motor_commands, motor_kps=None, motor_kds=None): + """Set the desired motor angles to the motors of the minitaur. + + The desired motor angles are clipped based on the maximum allowed velocity. + If the pd_control_enabled is True, a torque is calculated according to + the difference between current and desired joint angle, as well as the joint + velocity. This torque is exerted to the motor. For more information about + PD control, please refer to: https://en.wikipedia.org/wiki/PID_controller. + + Args: + motor_commands: The eight desired motor angles. + motor_kps: Proportional gains for the motor model. If not provided, it + uses the default kp of the minitaur for all the motors. + motor_kds: Derivative gains for the motor model. If not provided, it + uses the default kd of the minitaur for all the motors. + """ + if self._motor_velocity_limit < np.inf: + current_motor_angle = self.GetTrueMotorAngles() + motor_commands_max = ( + current_motor_angle + self.time_step * self._motor_velocity_limit) + motor_commands_min = ( + current_motor_angle - self.time_step * self._motor_velocity_limit) + motor_commands = np.clip(motor_commands, motor_commands_min, + motor_commands_max) + # Set the kp and kd for all the motors if not provided as an argument. + if motor_kps is None: + motor_kps = np.full(8, self._kp) + if motor_kds is None: + motor_kds = np.full(8, self._kd) + + if self._accurate_motor_model_enabled or self._pd_control_enabled: + q, qdot = self._GetPDObservation() + qdot_true = self.GetTrueMotorVelocities() + if self._accurate_motor_model_enabled: + actual_torque, observed_torque = self._motor_model.convert_to_torque( + motor_commands, q, qdot, qdot_true, motor_kps, motor_kds) + if self._motor_overheat_protection: + for i in range(self.num_motors): + if abs(actual_torque[i]) > OVERHEAT_SHUTDOWN_TORQUE: + self._overheat_counter[i] += 1 + else: + self._overheat_counter[i] = 0 + if (self._overheat_counter[i] > + OVERHEAT_SHUTDOWN_TIME / self.time_step): + self._motor_enabled_list[i] = False + + # The torque is already in the observation space because we use + # GetMotorAngles and GetMotorVelocities. + self._observed_motor_torques = observed_torque + + # Transform into the motor space when applying the torque. + self._applied_motor_torque = np.multiply(actual_torque, + self._motor_direction) + + for motor_id, motor_torque, motor_enabled in zip( + self._motor_id_list, self._applied_motor_torque, + self._motor_enabled_list): + if motor_enabled: + self._SetMotorTorqueById(motor_id, motor_torque) + else: + self._SetMotorTorqueById(motor_id, 0) + else: + torque_commands = -1 * motor_kps * ( + q - motor_commands) - motor_kds * qdot + + # The torque is already in the observation space because we use + # GetMotorAngles and GetMotorVelocities. + self._observed_motor_torques = torque_commands + + # Transform into the motor space when applying the torque. + self._applied_motor_torques = np.multiply(self._observed_motor_torques, + self._motor_direction) + + for motor_id, motor_torque in zip(self._motor_id_list, + self._applied_motor_torques): + self._SetMotorTorqueById(motor_id, motor_torque) + else: + motor_commands_with_direction = np.multiply(motor_commands, + self._motor_direction) + for motor_id, motor_command_with_direction in zip( + self._motor_id_list, motor_commands_with_direction): + self._SetDesiredMotorAngleById(motor_id, motor_command_with_direction) + + def ConvertFromLegModel(self, actions): + """Convert the actions that use leg model to the real motor actions. + + Args: + actions: The theta, phi of the leg model. + Returns: + The eight desired motor angles that can be used in ApplyActions(). + """ + motor_angle = copy.deepcopy(actions) + scale_for_singularity = 1 + offset_for_singularity = 1.5 + half_num_motors = int(self.num_motors / 2) + quater_pi = math.pi / 4 + for i in range(self.num_motors): + action_idx = int(i // 2) + forward_backward_component = (-scale_for_singularity * quater_pi * ( + actions[action_idx + half_num_motors] + offset_for_singularity)) + extension_component = (-1)**i * quater_pi * actions[action_idx] + if i >= half_num_motors: + extension_component = -extension_component + motor_angle[i] = ( + math.pi + forward_backward_component + extension_component) + return motor_angle + + def GetBaseMassesFromURDF(self): + """Get the mass of the base from the URDF file.""" + return self._base_mass_urdf + + def GetBaseInertiasFromURDF(self): + """Get the inertia of the base from the URDF file.""" + return self._base_inertia_urdf + + def GetLegMassesFromURDF(self): + """Get the mass of the legs from the URDF file.""" + return self._leg_masses_urdf + + def GetLegInertiasFromURDF(self): + """Get the inertia of the legs from the URDF file.""" + return self._leg_inertia_urdf + + def SetBaseMasses(self, base_mass): + """Set the mass of minitaur's base. + + Args: + base_mass: A list of masses of each body link in CHASIS_LINK_IDS. The + length of this list should be the same as the length of CHASIS_LINK_IDS. + Raises: + ValueError: It is raised when the length of base_mass is not the same as + the length of self._chassis_link_ids. + """ + if len(base_mass) != len(self._chassis_link_ids): + raise ValueError( + "The length of base_mass {} and self._chassis_link_ids {} are not " + "the same.".format(len(base_mass), len(self._chassis_link_ids))) + for chassis_id, chassis_mass in zip(self._chassis_link_ids, base_mass): + self._pybullet_client.changeDynamics( + self.quadruped, chassis_id, mass=chassis_mass) + + def SetLegMasses(self, leg_masses): + """Set the mass of the legs. + + A leg includes leg_link and motor. 4 legs contain 16 links (4 links each) + and 8 motors. First 16 numbers correspond to link masses, last 8 correspond + to motor masses (24 total). + + Args: + leg_masses: The leg and motor masses for all the leg links and motors. + + Raises: + ValueError: It is raised when the length of masses is not equal to number + of links + motors. + """ + if len(leg_masses) != len(self._leg_link_ids) + len(self._motor_link_ids): + raise ValueError("The number of values passed to SetLegMasses are " + "different than number of leg links and motors.") + for leg_id, leg_mass in zip(self._leg_link_ids, leg_masses): + self._pybullet_client.changeDynamics( + self.quadruped, leg_id, mass=leg_mass) + motor_masses = leg_masses[len(self._leg_link_ids):] + for link_id, motor_mass in zip(self._motor_link_ids, motor_masses): + self._pybullet_client.changeDynamics( + self.quadruped, link_id, mass=motor_mass) + + def SetBaseInertias(self, base_inertias): + """Set the inertias of minitaur's base. + + Args: + base_inertias: A list of inertias of each body link in CHASIS_LINK_IDS. + The length of this list should be the same as the length of + CHASIS_LINK_IDS. + Raises: + ValueError: It is raised when the length of base_inertias is not the same + as the length of self._chassis_link_ids and base_inertias contains + negative values. + """ + if len(base_inertias) != len(self._chassis_link_ids): + raise ValueError( + "The length of base_inertias {} and self._chassis_link_ids {} are " + "not the same.".format( + len(base_inertias), len(self._chassis_link_ids))) + for chassis_id, chassis_inertia in zip(self._chassis_link_ids, + base_inertias): + for inertia_value in chassis_inertia: + if (np.asarray(inertia_value) < 0).any(): + raise ValueError("Values in inertia matrix should be non-negative.") + self._pybullet_client.changeDynamics( + self.quadruped, chassis_id, localInertiaDiagonal=chassis_inertia) + + def SetLegInertias(self, leg_inertias): + """Set the inertias of the legs. + + A leg includes leg_link and motor. 4 legs contain 16 links (4 links each) + and 8 motors. First 16 numbers correspond to link inertia, last 8 correspond + to motor inertia (24 total). + + Args: + leg_inertias: The leg and motor inertias for all the leg links and motors. + + Raises: + ValueError: It is raised when the length of inertias is not equal to + the number of links + motors or leg_inertias contains negative values. + """ + + if len(leg_inertias) != len(self._leg_link_ids) + len(self._motor_link_ids): + raise ValueError("The number of values passed to SetLegMasses are " + "different than number of leg links and motors.") + for leg_id, leg_inertia in zip(self._leg_link_ids, leg_inertias): + for inertia_value in leg_inertias: + if (np.asarray(inertia_value) < 0).any(): + raise ValueError("Values in inertia matrix should be non-negative.") + self._pybullet_client.changeDynamics( + self.quadruped, leg_id, localInertiaDiagonal=leg_inertia) + + motor_inertias = leg_inertias[len(self._leg_link_ids):] + for link_id, motor_inertia in zip(self._motor_link_ids, motor_inertias): + for inertia_value in motor_inertias: + if (np.asarray(inertia_value) < 0).any(): + raise ValueError("Values in inertia matrix should be non-negative.") + self._pybullet_client.changeDynamics( + self.quadruped, link_id, localInertiaDiagonal=motor_inertia) + + def SetFootFriction(self, foot_friction): + """Set the lateral friction of the feet. + + Args: + foot_friction: The lateral friction coefficient of the foot. This value is + shared by all four feet. + """ + for link_id in self._foot_link_ids: + self._pybullet_client.changeDynamics( + self.quadruped, link_id, lateralFriction=foot_friction) + + # TODO(b/73748980): Add more API's to set other contact parameters. + def SetFootRestitution(self, foot_restitution): + """Set the coefficient of restitution at the feet. + + Args: + foot_restitution: The coefficient of restitution (bounciness) of the feet. + This value is shared by all four feet. + """ + for link_id in self._foot_link_ids: + self._pybullet_client.changeDynamics( + self.quadruped, link_id, restitution=foot_restitution) + + def SetJointFriction(self, joint_frictions): + for knee_joint_id, friction in zip(self._foot_link_ids, joint_frictions): + self._pybullet_client.setJointMotorControl2( + bodyIndex=self.quadruped, + jointIndex=knee_joint_id, + controlMode=self._pybullet_client.VELOCITY_CONTROL, + targetVelocity=0, + force=friction) + + def GetNumKneeJoints(self): + return len(self._foot_link_ids) + + def SetBatteryVoltage(self, voltage): + if self._accurate_motor_model_enabled: + self._motor_model.set_voltage(voltage) + + def SetMotorViscousDamping(self, viscous_damping): + if self._accurate_motor_model_enabled: + self._motor_model.set_viscous_damping(viscous_damping) + + def GetTrueObservation(self): + observation = [] + observation.extend(self.GetTrueMotorAngles()) + observation.extend(self.GetTrueMotorVelocities()) + observation.extend(self.GetTrueMotorTorques()) + observation.extend(self.GetTrueBaseOrientation()) + observation.extend(self.GetTrueBaseRollPitchYawRate()) + return observation + + def ReceiveObservation(self): + """Receive the observation from sensors. + + This function is called once per step. The observations are only updated + when this function is called. + """ + self._observation_history.appendleft(self.GetTrueObservation()) + self._control_observation = self._GetControlObservation() + + def _GetDelayedObservation(self, latency): + """Get observation that is delayed by the amount specified in latency. + + Args: + latency: The latency (in seconds) of the delayed observation. + Returns: + observation: The observation which was actually latency seconds ago. + """ + if latency <= 0 or len(self._observation_history) == 1: + observation = self._observation_history[0] + else: + n_steps_ago = int(latency / self.time_step) + if n_steps_ago + 1 >= len(self._observation_history): + return self._observation_history[-1] + remaining_latency = latency - n_steps_ago * self.time_step + blend_alpha = remaining_latency / self.time_step + observation = ( + (1.0 - blend_alpha) * np.array(self._observation_history[n_steps_ago]) + + blend_alpha * np.array(self._observation_history[n_steps_ago + 1])) + return observation + + def _GetPDObservation(self): + pd_delayed_observation = self._GetDelayedObservation(self._pd_latency) + q = pd_delayed_observation[0:self.num_motors] + qdot = pd_delayed_observation[self.num_motors:2 * self.num_motors] + return (np.array(q), np.array(qdot)) + + def _GetControlObservation(self): + control_delayed_observation = self._GetDelayedObservation( + self._control_latency) + return control_delayed_observation + + def _AddSensorNoise(self, sensor_values, noise_stdev): + if noise_stdev <= 0: + return sensor_values + observation = sensor_values + np.random.normal( + scale=noise_stdev, size=sensor_values.shape) + return observation + + def SetControlLatency(self, latency): + """Set the latency of the control loop. + + It measures the duration between sending an action from Nvidia TX2 and + receiving the observation from microcontroller. + + Args: + latency: The latency (in seconds) of the control loop. + """ + self._control_latency = latency + + def GetControlLatency(self): + """Get the control latency. + + Returns: + The latency (in seconds) between when the motor command is sent and when + the sensor measurements are reported back to the controller. + """ + return self._control_latency + + def SetMotorGains(self, kp, kd): + """Set the gains of all motors. + + These gains are PD gains for motor positional control. kp is the + proportional gain and kd is the derivative gain. + + Args: + kp: proportional gain of the motors. + kd: derivative gain of the motors. + """ + self._kp = kp + self._kd = kd + if self._accurate_motor_model_enabled: + self._motor_model.set_motor_gains(kp, kd) + + def GetMotorGains(self): + """Get the gains of the motor. + + Returns: + The proportional gain. + The derivative gain. + """ + return self._kp, self._kd + + def SetMotorStrengthRatio(self, ratio): + """Set the strength of all motors relative to the default value. + + Args: + ratio: The relative strength. A scalar range from 0.0 to 1.0. + """ + if self._accurate_motor_model_enabled: + self._motor_model.set_strength_ratios([ratio] * self.num_motors) + + def SetMotorStrengthRatios(self, ratios): + """Set the strength of each motor relative to the default value. + + Args: + ratios: The relative strength. A numpy array ranging from 0.0 to 1.0. + """ + if self._accurate_motor_model_enabled: + self._motor_model.set_strength_ratios(ratios) + + def SetTimeSteps(self, action_repeat, simulation_step): + """Set the time steps of the control and simulation. + + Args: + action_repeat: The number of simulation steps that the same action is + repeated. + simulation_step: The simulation time step. + """ + self.time_step = simulation_step + self._action_repeat = action_repeat + + @property + def chassis_link_ids(self): + return self._chassis_link_ids |