Source code for scripts.finite_state_machine

#!/usr/bin/env python
"""
.. module:: finite_state_machine
    :platform: Unix
    :synopsis: the finite_state_machine python script in ros-moveit-opencv-ontology package

.. moduleauthor:: Ali Yousefi <aliyousef98@outlook.com>

Subscribes to:
    /image_id

Uses Service:
    /state/set_battery_level

    /state/get_pose

    /state/set_base_movement_state

    /room_info

    /move_arm

Uses Action:
    /move_base

Uses helper script:
    /utilities/ontological_robot_control/topological_map.py

Defines the states and transitions for the finite state machine of this rospackage. In the initial state
robot builds the semantic map of environment using image id of the markers detected by robot camera. This node
uses ``topological_map.py`` helper script to update the ontology while the process is running, and retreives
the target room based on last visit times and robot battey state. In the next state it moves to the target room
and if battery level gets lower than threshold, it goes to charger, and charges the battery.
"""
import rospy
import smach
import smach_ros
import math
import actionlib
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from assignment2.msg import Point
from assignment2.srv import SetBatteryLevel, GetPose, SetBaseMovementState, RoomInformation
from ontological_robot_control import architecture_name_mapper as anm
from ontological_robot_control.topological_map import TopologicalMap
from std_msgs.msg import Int32
from std_srvs.srv import SetBool

# Import mutex to manage synchronization among ROS-based threads (i.e., node loop and subscribers)
from threading import Lock

# A tag for identifying logs producer.
LOG_TAG = anm.NODE_FINITE_STATE_MACHINE
LOOP_TIME = 5

pub = None
tm = None
mutex = None
rooms_id = []
rooms_name = []
rooms_center = []
rooms_connections = []
rooms_doors = []

[docs]def get_room_info(room_id): """ Server client for ``marker_server``, gets information for each room using ``room_info`` service Args: room_id(int) Returns: resp(RoomInformationResponse) """ rospy.wait_for_service('room_info') try: srv = rospy.ServiceProxy('room_info', RoomInformation) resp = srv(room_id) return resp except rospy.ServiceException as e: print("Service call failed: %s"%e)
[docs]def marker_id_callback(data): """ Callback function for ``/image_id`` topic subscriber. Eveytime an image id is detected, checks if image id is valuable and not already available, then saves the corresponding information of each room in the global variables by calling ``get_room_info(room_id)`` function, and modifies the ontology using ``add_room(room)``, ``add_door(door)``, ``assign_doors_to_room(room, doors)`` ``disjoint_individuals()`` and ``add_last_visit_time(room, visit_time)`` functions from ``topological_map.py`` helper script. Args: data(int32) """ global rooms_id global rooms_name global rooms_center global rooms_connections global rooms_doors if data.data not in rooms_id and data.data > 10 and data.data < 18: rooms_id.append(data.data) log_msg = 'Image id detected: %d ' % (data.data) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) log_msg = 'Number of detected IDs: %d ' % (len(rooms_id)) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) room_info = get_room_info(data.data) rooms_name.append(room_info.room) log_msg = 'Semantic map updated, room '+ room_info.room + ' detected' rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) tm.add_room(room_info.room) rooms_center.append([room_info.x, room_info.y]) log_msg = 'Center position is: [%f, %f]' % (room_info.x, room_info.y) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) for i in range(len(room_info.connections)): rooms_connections.append(room_info.connections[i].connected_to) rooms_doors.append(room_info.connections[i].through_door) log_msg = 'Room ' + room_info.room + ' is connected to ' + room_info.connections[i].connected_to + ' through door ' + room_info.connections[i].through_door rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) tm.add_door(room_info.connections[i].through_door) tm.assign_doors_to_room(room_info.room, room_info.connections[i].through_door) tm.disjoint_individuals() tm.add_last_visit_time(room_info.room, str(room_info.visit_time))
[docs]def move_to_pose(pose): """ Action client function for ``move_base`` node, gets a pose as an argument and sends it as ``MoveBaseGoal.msg`` to the action server Args: pose(Point) Returns: result(MoveBaseResult.msg) """ client = actionlib.SimpleActionClient('move_base',MoveBaseAction) goal = MoveBaseGoal() goal.target_pose.header.frame_id = "map" goal.target_pose.header.stamp = rospy.Time.now() goal.target_pose.pose.position.x = pose.x goal.target_pose.pose.position.y = pose.y goal.target_pose.pose.orientation.w = 1.0 client.wait_for_server() client.send_goal(goal)
[docs]def check_target_reached(target_pose): """ Checks if the robot has reached to a specific point by computing the eucledian distance between robot current pose and target pose Args: target_pose(Point) Returns: target_reached(Bool) """ rospy.wait_for_service(anm.SERVER_GET_POSE) try: service = rospy.ServiceProxy(anm.SERVER_GET_POSE, GetPose) response = service() pose = response.pose if math.sqrt((target_pose.x - pose.x)**2 + (target_pose.y - pose.y)**2) < 1: target_reached = True else: target_reached = False log_msg = 'target reached state: ' + str(target_reached) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) return target_reached except rospy.ServiceException as e: log_msg = f'Server cannot get current robot position: {e}' rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
[docs]def get_room_pose(room): """ Detects the center postion by using the room information for corresponding room Args: room(string) Returns: room_pose(Point) """ global rooms_name global rooms_center room_pose = Point() room_index = rooms_name.index(room) room_pose.x = rooms_center[room_index][0] room_pose.y = rooms_center[room_index][1] return room_pose
[docs]def set_battery_level(battery_level): """ Service client function for ``/state/set_battery_level`` Update the current robot battery level stored in the ``robot-state`` node Args: battery_level(int) """ rospy.wait_for_service(anm.SERVER_SET_BATTERY_LEVEL) try: log_msg = f'Set current robot battery level to the `{anm.SERVER_SET_BATTERY_LEVEL}` node.' rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) service = rospy.ServiceProxy(anm.SERVER_SET_BATTERY_LEVEL, SetBatteryLevel) service(battery_level) except rospy.ServiceException as e: log_msg = f'Server cannot set current robot battery level: {e}' rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
[docs]def set_arm_movement_state(arm_movement_state): """ Service client function for ``/move_arm``. Updates the current robot arm movement state stored in ``my_moveit`` node Args: arm_movement_state(bool) """ rospy.wait_for_service('/move_arm') try: log_msg = 'Setting robot arm movement state to ' + str(arm_movement_state) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) service = rospy.ServiceProxy('move_arm', SetBool) service(arm_movement_state) except rospy.ServiceException as e: log_msg = f'Server cannot set current arm movement state: {e}' rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
[docs]def set_base_movement_state(base_movement_state): """ Service client function for ``/base_movement_state``. Updates the current robot base movement state stored in ``robot-states`` node Args: base_movement_state(bool) """ rospy.wait_for_service(anm.SERVER_SET_BASE_MOVEMENT_STATE) try: log_msg = 'Setting robot base movement state to ' + str(base_movement_state) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) service = rospy.ServiceProxy(anm.SERVER_SET_BASE_MOVEMENT_STATE, SetBaseMovementState) service(base_movement_state) except rospy.ServiceException as e: log_msg = f'Server cannot set current base movement state: {e}' rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
[docs]class BuildSemanticMap(smach.State): """ Defines the initial state when robot builds the semantic map.It enables the robot arm to start moving on the desired trajectories as it is defined in ``my_moveit.cpp`` file using ``set_arm_movement_state(arm_movement_state)`` function, and then checks if the number of detected room ids are enough, then exits from state by returning ``map_ready`` """ def __init__(self): smach.State.__init__(self, outcomes=['map_ready'])
[docs] def execute(self, userdata): """ Implements the execution of the tasks while this state gets active. """ global mutex global rooms_id set_arm_movement_state(True) while not rospy.is_shutdown(): # Wait for stimulus from the other nodes of the architecture. mutex.acquire() try: if len(rooms_id) > 6: return 'map_ready' finally: mutex.release() rospy.sleep(LOOP_TIME)
[docs]class MoveToTargetRoom(smach.State): """ Defines the state when robot moves to target room found by the ontology, first enables battery consumption using ``set_base_movement_state(base_movement_state)`` function and then moves to target room using ``move_to_pose(pose)`` function. Additionally, it updates the ontology while it moves to target room using ``update_ontology(now)`` function, until it reaches the target room, then exits the state by returning ``target_reached``. If the battery level gets lower than threshold, it returns ``battery_low`` and target room will be canceled. """ def __init__(self): smach.State.__init__(self, outcomes=['target_reached', 'battery_low'])
[docs] def execute(self, userdata): """ Implements the execution of the tasks while this state gets active. """ global mutex global tm now = rospy.get_rostime() [target_room, battery_low] = tm.update_ontology(now) target_room_pose = get_room_pose(target_room) set_base_movement_state(True) move_to_pose(target_room_pose) while not rospy.is_shutdown(): # Wait for stimulus from the other nodes of the architecture. mutex.acquire() try: now = rospy.get_rostime() [next_target_room, battery_low] = tm.update_ontology(now) log_msg = 'target room: ' + target_room rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) if battery_low: return 'battery_low' else: target_reached = check_target_reached(target_room_pose) if target_reached: set_base_movement_state(False) return 'target_reached' finally: mutex.release() rospy.sleep(LOOP_TIME)
[docs]class ExploreRoom(smach.State): """ Defines the state when robot has reached the target room and then explores it. It enables robot arm movement like the initial state using ``set_arm_movement_state(arm_movement_state)`` functinon, then returns ``room_explored`` """ def __init__(self): smach.State.__init__(self, outcomes=['room_explored'])
[docs] def execute(self, userdata): """ Implements the execution of the tasks while this state gets active. """ global mutex global tm while not rospy.is_shutdown(): # Wait for stimulus from the other nodes of the architecture. mutex.acquire() try: set_arm_movement_state(True) return 'room_explored' finally: mutex.release()
[docs]class MoveToCharger(smach.State): """ Defines the state when battery level is low and moves to charger, first enables battery consumption using ``set_base_movement_state(movement_state)`` function and then moves to charger using ``move_to_pose(pose)`` function. Additionally, it updates the ontology while it moves to charger using ``update_ontology(now)`` function, until it reaches the charger. """ def __init__(self): smach.State.__init__(self, outcomes=['target_reached'])
[docs] def execute(self, userdata): """ Implements the execution of the tasks while this state gets active. """ global mutex global tm now = rospy.get_rostime() tm.update_ontology(now) target_room_pose = get_room_pose('E') set_base_movement_state(True) move_to_pose(target_room_pose) while not rospy.is_shutdown(): # Wait for stimulus from the other nodes of the architecture. mutex.acquire() try: now = rospy.get_rostime() tm.update_ontology(now) target_reached = check_target_reached(target_room_pose) if target_reached: set_base_movement_state(False) return 'target_reached' finally: mutex.release() rospy.sleep(LOOP_TIME)
[docs]class Recharging(smach.State): """ Defines the state when robot has reached the charger and chargers battery after some time using ``set_battery_level(battery_level)`` function and then returns ``robot_charged`` transition. """ def __init__(self): smach.State.__init__(self, outcomes=['robot_charged'])
[docs] def execute(self, userdata): """ Implements the execution of the tasks while this state gets active. """ global mutex global tm while not rospy.is_shutdown(): # Wait for stimulus from the other nodes of the architecture. mutex.acquire() try: now = rospy.get_rostime() tm.update_ontology(now) rospy.sleep(10) set_battery_level(1000) return 'robot_charged' finally: mutex.release()
[docs]def main(): """ The main function for finite_state_machine node, initialises the node and takes an instance of ``TopologicalMap`` class in the time instance now, defines the subscriner to the ``/image_id`` topic , defines the states and transitions of the finite state machine for topological map and finally starts the finite state machine process """ # Initialise this node. global tm global pub global mutex rospy.init_node(anm.NODE_FINITE_STATE_MACHINE, log_level=rospy.INFO) now = rospy.get_rostime() tm = TopologicalMap(LOG_TAG, now) # Get or create a new mutex. if mutex is None: mutex = Lock() else: mutex = mutex # Subscribe image id to get rooms information rospy.Subscriber('/image_id', Int32, marker_id_callback) # Create a SMACH state machine sm = smach.StateMachine(outcomes=[]) sm.userdata.sm_counter = 0 # Open the container with sm: # Add states to the container smach.StateMachine.add('BUILD_SEMANTIC_MAP', BuildSemanticMap(), transitions={'map_ready':'MOVE_TO_TARGET_ROOM'}) smach.StateMachine.add('MOVE_TO_TARGET_ROOM', MoveToTargetRoom(), transitions={'battery_low':'MOVE_TO_CHARGER', 'target_reached':'EXPLORE_ROOM'}) smach.StateMachine.add('MOVE_TO_CHARGER', MoveToCharger(), transitions={'target_reached':'RECHARGING'}) smach.StateMachine.add('EXPLORE_ROOM', ExploreRoom(), transitions={'room_explored':'MOVE_TO_TARGET_ROOM'}) smach.StateMachine.add('RECHARGING', Recharging(), transitions={'robot_charged':'MOVE_TO_TARGET_ROOM'}) # Create and start the introspection server for visualization sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT') sis.start() # Execute the state machine outcome = sm.execute() # Wait for ctrl-c to stop the application rospy.spin() sis.stop()
if __name__ == '__main__': main()