Fleet Connector¶
Subclass inorbit_connector.connector.FleetConnector to manage multiple robots simultaneously.
Constructor¶
def __init__(self, config: ConnectorConfig, **kwargs) -> None
Parameters:
config(ConnectorConfig): The connector configuration containing the fleet
Keyword Arguments:
register_user_scripts(bool): Automatically register user scripts. Default:Falsedefault_user_scripts_dir(str): Default directory for user scripts. Default:~/.inorbit_connectors/connector-{class_name}/local/create_user_scripts_dir(bool): Create the user scripts directory if it doesn’t exist. Default:Falseregister_custom_command_handler(bool): Automatically register the command handler. Default:True
Required Methods¶
Subclasses must implement the same abstract methods as single-robot connectors, with one difference:
_inorbit_robot_command_handler()¶
Handle commands for a specific robot. The signature includes robot_id as the first parameter:
@override
async def _inorbit_robot_command_handler(
self, robot_id: str, command_name: str, args: list, options: dict
) -> None:
"""Handle InOrbit commands for a specific robot."""
if command_name == "start_mission":
result = await self._fleet_manager.send_command(
robot_id, "start_mission", args[0]
)
if result:
options["result_function"](CommandResultCode.SUCCESS)
else:
options["result_function"](
CommandResultCode.FAILURE,
stderr=f"Failed to start mission for {robot_id}"
)
Fleet Management¶
Accessing Robot IDs¶
Access the list of robot IDs in the fleet:
for robot_id in self.robot_ids:
# Process each robot
pass
Updating the Fleet¶
You can update the fleet configuration during _connect() by calling the update_fleet() method. This is useful for dynamically setting the robots list before the connector starts, for example, when provisioning robots from fleet manager data instead of hardcoded values in the config files:
@override
async def _connect(self) -> None:
"""Connect to fleet manager and update fleet."""
# Fetch robot list from fleet manager API
robots = await self._fleet_manager.get_robots()
# Update fleet configuration
fleet_config = [
RobotConfig(robot_id=robot.id, cameras=robot.cameras)
for robot in robots
]
self.update_fleet(fleet_config)
The update_fleet() method updates the fleet configuration and initializes sessions for all robots.
Publishing Methods¶
All publishing methods require a robot_id parameter. See the Publishing Guide for detailed information.
publish_robot_pose(robot_id, x, y, yaw, frame_id): Publish pose for a specific robotpublish_robot_odometry(robot_id, **kwargs): Publish odometry for a specific robotpublish_robot_key_values(robot_id, **kwargs): Publish key-values for a specific robotpublish_robot_system_stats(robot_id, **kwargs): Publish system stats for a specific robotpublish_robot_map(robot_id, frame_id, is_update=False): Publish map for a specific robot
Advanced Methods¶
_get_robot_session()¶
Access the underlying RobotSession from the InOrbit Edge SDK for a specific robot. Use this for advanced use cases not covered by the connector API.
def _get_robot_session(self, robot_id: str) -> RobotSession:
"""Get a robot session for a specific robot ID.
Args:
robot_id (str): The robot ID to get the session for
Returns:
RobotSession: The robot session for the specified robot
"""
Example:
async def _execution_loop(self) -> None:
for robot_id in self.robot_ids:
# Access the session directly for advanced features
session = self._get_robot_session(robot_id)
# Use Edge SDK methods directly
...
Robot Online Status¶
Override _is_fleet_robot_online() to provide custom online status checks:
@override
def _is_fleet_robot_online(self, robot_id: str) -> bool:
"""Check if a robot is online."""
# Check robot status via fleet manager API
return self._fleet_manager.is_robot_online(robot_id)
This is used by InOrbit to determine robot availability.
Example Execution Loop¶
@override
async def _execution_loop(self) -> None:
"""Main execution loop for fleet."""
for robot_id in self.robot_ids:
try:
# Fetch robot data from fleet manager
robot_data = await self._fleet_manager.get_robot_data(robot_id)
# Publish pose
self.publish_robot_pose(
robot_id,
robot_data.x,
robot_data.y,
robot_data.yaw,
robot_data.frame_id
)
# Publish telemetry
self.publish_robot_key_values(robot_id, **robot_data.telemetry)
except Exception as e:
self._logger.error(f"Error processing robot {robot_id}: {e}")
Lifecycle¶
The lifecycle methods are the same as single-robot connectors:
start(): Start the connectorjoin(): Block until stoppedstop(): Stop the connector
Examples¶
Simple fleet connector: examples/simple-fleet-connector/connector.py
Fleet connector (CLI): examples/fleet-connector/
Examples index: examples/README.md