Source code for scripts.teleop_twist_keyboard

#!/usr/bin/env python
"""
.. module:: teleop_twist_keyboard
    :platform: Unix
    :synopsis: provides an interactive interface for user to control the robot with keyboard

.. moduleauthor:: Ali Yousefi <aliyousef98@outlook.com>

Subscribes to:
    /scan

Publishes to:
    /cmd_vel

    it can implement two behaviours on robot:

            1: moving without obstacle avoidance:
                user can move the robot using keys it publishes the desired movements
                to cmd_vel topic

            2: moveing with obstacle avoidance:
                subscribes scan topic and uses it to detect obstacles. user 
                can move the robot using keys and it also avoids the robot
                from colliding the obstacles

    Here is the instruction for controlling the robot with keyboard:
    
        =====  =====  ======
        Moving around     
        -------------------- 
        u	 i	 o  
        j	 k 	 l
        m	 ,	 . 
        =====  =====  ======

        =======  =======  =======
        Holonomic mode(strafing)    
        ------------------------- 
        U	   I	    O  
        J	   K 	    L
        M	   <	    > 
        =======  =======  =======

        t : up (+z)

        b : down (-z)

        anything else : stop

        q/z : increase/decrease max speeds by 10%

        w/x : increase/decrease only linear speed by 10%

        e/c : increase/decrease only angular speed by 10%

        CTRL-C to change robot behaviour
            
"""

from __future__ import print_function
import threading
import roslib; roslib.load_manifest('teleop_twist_keyboard')
import rospy
from geometry_msgs.msg import Twist
import sys, select, termios, tty
from sensor_msgs.msg import LaserScan
import os
import time


front_obs = False
right_obs = False
left_obs = False
mesg = """
Reading from the keyboard  and Publishing to Twist!
---------------------------
Moving around:
   u    i    o
   j    k    l
   m    ,    .

For Holonomic mode (strafing), hold down the shift key:
---------------------------
   U    I    O
   J    K    L
   M    <    >

t : up (+z)
b : down (-z)

anything else : stop

q/z : increase/decrease max speeds by 10%
w/x : increase/decrease only linear speed by 10%
e/c : increase/decrease only angular speed by 10%

CTRL-C to change robot behaviour
"""

# binding input keys for moving the robot
moveBindings = {
        'i':(1,0,0,0),
        'o':(1,0,0,-1),
        'j':(0,0,0,1),
        'l':(0,0,0,-1),
        'u':(1,0,0,1),
        ',':(-1,0,0,0),
        '.':(-1,0,0,1),
        'm':(-1,0,0,-1),
        'O':(1,-1,0,0),
        'I':(1,0,0,0),
        'J':(0,1,0,0),
        'L':(0,-1,0,0),
        'U':(1,1,0,0),
        '<':(-1,0,0,0),
        '>':(-1,-1,0,0),
        'M':(-1,1,0,0),
        't':(0,0,1,0),
        'b':(0,0,-1,0),
    }

speedBindings={
        'q':(1.1,1.1),
        'z':(.9,.9),
        'w':(1.1,1),
        'x':(.9,1),
        'e':(1,1.1),
        'c':(1,.9),
    }

[docs]def clbk_laser(msg): """ Callback function for subscring laser scan topic Args: msg(float): rangefinder sensor data """ global front_obs global right_obs global left_obs regions = { 'right': min(min(msg.ranges[0:143]), 10), 'fright': min(min(msg.ranges[144:287]), 10), 'front': min(min(msg.ranges[288:431]), 10), 'fleft': min(min(msg.ranges[432:575]), 10), 'left': min(min(msg.ranges[576:719]), 10), } if regions['front'] < 1: front_obs = True else: front_obs = False if regions['right'] < 1: right_obs = True # print("right obs detected") else: right_obs = False if regions['left'] < 1: left_obs = True # print("left obs detected") else: left_obs = False
[docs]class PublishThread(threading.Thread): """ A class which is used for synchronization and publishing robot's twist into cmd_vel topic **Attributes:** x, y, z, th, speed, turn : float Robot's Position Information done : bool Robot State **Methods:** """ def __init__(self, rate): super(PublishThread, self).__init__() self.publisher = rospy.Publisher('cmd_vel', Twist, queue_size = 1) self.x = 0.0 self.y = 0.0 self.z = 0.0 self.th = 0.0 self.speed = 0.0 self.turn = 0.0 self.condition = threading.Condition() self.done = False # Set timeout to None if rate is 0 (causes new_message to wait forever # for new data to publish) if rate != 0.0: self.timeout = 1.0 / rate else: self.timeout = None self.start()
[docs] def wait_for_subscribers(self): """ Waits for subscribers to connect to the topic """ i = 0 while not rospy.is_shutdown() and self.publisher.get_num_connections() == 0: if i == 4: print("Waiting for subscriber to connect to {}".format(self.publisher.name)) rospy.sleep(0.5) i += 1 i = i % 5 if rospy.is_shutdown(): raise Exception("Got shutdown request before subscribers connected")
[docs] def update(self, x, y, z, th, speed, turn): """ Updates robots position information """ self.condition.acquire() self.x = x self.y = y self.z = z self.th = th self.speed = speed self.turn = turn # Notify publish thread that we have a new message. self.condition.notify() self.condition.release()
[docs] def stop(self): """ Stops the robot """ self.done = True self.update(0, 0, 0, 0, 0, 0) self.join()
[docs] def run(self): """ Detects if user has chosen the obstacle avoidance option and moves the robot by publishing the twist to cmd_vel topic """ global front_obs global right_obs global left_obs twist = Twist() while not self.done: self.condition.acquire() if rospy.get_param('robot_state')=='2': # Copy state into twist message. twist.linear.x = self.x * self.speed twist.linear.y = self.y * self.speed twist.linear.z = self.z * self.speed twist.angular.x = 0 twist.angular.y = 0 twist.angular.z = self.th * self.turn elif rospy.get_param('robot_state')=='3': # avoids the robot from moving forward if there is an obstacle if front_obs == True and self.x == 1: twist.linear.x = 0 # otherwise it can move forward else: twist.linear.x = self.x * self.speed twist.linear.y = self.y * self.speed twist.linear.z = self.z * self.speed twist.angular.x = 0 twist.angular.y = 0 # avoids the robot from turning to right if there is an obstacle if right_obs == True and self.th == -1: twist.angular.z = 0 # avoids the robot from turning to left if there is an obstacle elif left_obs == True and self.th == 1: twist.angular.z = 0 # otherwise robot can turn in any direction else: twist.angular.z = self.th * self.turn self.condition.release() # Publish. self.publisher.publish(twist) # Publish stop message when thread exits. twist.linear.x = 0 twist.linear.y = 0 twist.linear.z = 0 twist.angular.x = 0 twist.angular.y = 0 twist.angular.z = 0 self.publisher.publish(twist)
[docs]def getKey(key_timeout): """ Gets commands from user Args: key_timeout(float): timeout for input key Returns: key(str): user input key """ settings = termios.tcgetattr(sys.stdin) tty.setraw(sys.stdin.fileno()) rlist, _, _ = select.select([sys.stdin], [], [], key_timeout) if rlist: key = sys.stdin.read(1) else: key = '' termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) return key
[docs]def vels(speed, turn): """ Puts the current speed of robot in a message string Args: speed(float): robot's linear speed turn(float): robot's angular speed Returns: msg(str): a message string that shows robot's twist """ return "currently:\tspeed %s\tturn %s " % (speed,turn)
[docs]def teleop(): """ Provides the interface and publishes user command to cmd_vel topic """ settings = termios.tcgetattr(sys.stdin) speed = rospy.get_param("~speed", 0.5) turn = rospy.get_param("~turn", 1.0) repeat = rospy.get_param("~repeat_rate", 0.0) key_timeout = rospy.get_param("~key_timeout", 0.0) if key_timeout == 0.0: key_timeout = None pub_thread = PublishThread(repeat) x = 0 y = 0 z = 0 th = 0 status = 0 try: pub_thread.wait_for_subscribers() pub_thread.update(x, y, z, th, speed, turn) os.system('cls||clear') print("** TELEOP TWIST KEYBOARD NODE **\n") print(mesg) print(vels(speed,turn)) while(1): key = getKey(key_timeout) if key in moveBindings.keys(): x = moveBindings[key][0] y = moveBindings[key][1] z = moveBindings[key][2] th = moveBindings[key][3] elif key in speedBindings.keys(): speed = speed * speedBindings[key][0] turn = turn * speedBindings[key][1] print(vels(speed,turn)) if (status == 14): print(mesg) status = (status + 1) % 15 else: # Skip updating cmd_vel if key timeout and robot already # stopped. if key == '' and x == 0 and y == 0 and z == 0 and th == 0: continue x = 0 y = 0 z = 0 th = 0 if (key == '\x03'): rospy.set_param('robot_state', '0') os.system('cls||clear') print("** TELEOP TWIST KEYBOARD NODE **\n") print("choose robot behaviour in master node") time.sleep(5) os.system('cls||clear') print("** TELEOP TWIST KEYBOARD NODE **\n") print("waiting for master node response...\n") break pub_thread.update(x, y, z, th, speed, turn) except Exception as e: print(e) finally: pub_thread.stop() termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
def main(): rospy.init_node('teleop_twist_keyboard') rospy.set_param('robot_state', '0') sub = rospy.Subscriber('/scan', LaserScan, clbk_laser) os.system('cls||clear') print("** TELEOP TWIST KEYBOARD NODE **\n") print("waiting for master node response...\n") rate = rospy.Rate(20) while not rospy.is_shutdown(): if rospy.get_param('robot_state')=='2' or rospy.get_param('robot_state')=='3': teleop() else: rate.sleep() continue rate.sleep() if __name__ == '__main__': main()