Single-Robot Connector

Subclass inorbit_connector.connector.Connector to create a connector for a single robot.

Constructor

def __init__(self, robot_id: str, config: ConnectorConfig, **kwargs) -> None

Parameters:

  • robot_id (str): The InOrbit robot ID

  • config (ConnectorConfig): The connector configuration

Keyword Arguments:

  • register_user_scripts (bool): Automatically register user scripts. Default: False

  • default_user_scripts_dir (str): Default directory for user scripts. Default: ~/.inorbit_connectors/connector-{robot_id}/local/

  • create_user_scripts_dir (bool): Create the user scripts directory if it doesn’t exist. Default: False

  • register_custom_command_handler (bool): Automatically register the command handler. Default: True

Required Methods

Subclasses must implement the following abstract methods:

_connect()

Set up external services and connections. This is called once when the connector starts, before the execution loop begins.

@override
async def _connect(self) -> None:
    """Connect to robot services."""
    # Initialize robot API client and/or other related services
    # e.g. initialize a REST API client and start a polling loop 
    pass

_execution_loop()

The main execution loop that runs periodically. This is where you fetch robot data and publish it to InOrbit.

Refer to the publishing guide for more details on publishing data to InOrbit.

With polling-based connectors, it is advisable to run polling loops concurrently with the execution loop to avoid long running _execution_loop calls. See the robot-connector and fleet-connector examples for more details.

@override
async def _execution_loop(self) -> None:
    """Main execution loop."""
    # Fetch robot pose
    pose = await self._get_robot_pose()
    self.publish_pose(pose.x, pose.y, pose.yaw, pose.frame_id)
    
    # Fetch and publish telemetry
    telemetry = await self._get_robot_telemetry()
    self.publish_key_values(**telemetry)

The loop runs at the frequency specified by config.update_freq (default: 1.0 Hz).

_disconnect()

Clean up resources and disconnect from external services. This is called when the connector stops.

@override
async def _disconnect(self) -> None:
    """Disconnect from robot services."""
    # Close robot API connections
    # Clean up resources
    pass

_inorbit_command_handler()

Handle commands received from InOrbit. This method is automatically registered if register_custom_command_handler is True (default). For more details see the Command Handling section below.

@override
async def _inorbit_command_handler(
    self, command_name: str, args: list, options: dict
) -> None:
    """Handle InOrbit commands."""
    if command_name == "start_mission":
        result = await self._robot.start_mission(args[0])
        if result:
            options["result_function"](CommandResultCode.SUCCESS)
        else:
            options["result_function"](
                CommandResultCode.FAILURE,
                stderr="Failed to start mission"
            )

The options dictionary contains a result_function that must be called to report the command result:

  • CommandResultCode.SUCCESS: Command executed successfully

  • CommandResultCode.FAILURE: Command failed

Lifecycle Methods

start()

Starts the connector in a background thread. Creates an async event loop and begins the connection process.

connector = MyConnector(robot_id, config)
connector.start()

join()

Blocks until the connector thread finishes. Use this to keep your main thread alive.

connector.join()

stop()

Signals the connector to stop and waits for shutdown. This calls _disconnect() and cleans up resources.

connector.stop()

Publishing Methods

See the Publishing Guide for detailed information on publishing methods.

Advanced Methods

_get_session()

Access the underlying RobotSession from the InOrbit Edge SDK for advanced use cases not covered by the connector API.

def _get_session(self) -> RobotSession:
    """Get the edge-sdk robot session for the current robot."""

Example:

async def _execution_loop(self) -> None:
    # Access the session directly for advanced features
    session = self._get_session()
    # Use Edge SDK methods directly
    ...

_is_robot_online()

Override this method to provide custom robot health checks. The default implementation assumes the robot is online if the connector is running.

def _is_robot_online(self) -> bool:
    """Check if the robot is online.
    
    Returns:
        bool: True if robot is online, False otherwise.
    """

Example:

@override
def _is_robot_online(self) -> bool:
    """Check robot connectivity via API."""
    try:
        return self._robot_api.is_connected()
    except Exception:
        return False

Command Handling

Commands from InOrbit are automatically routed to your _inorbit_command_handler() method. The handler receives:

  • command_name (str): The name of the command

  • args (list): Command arguments

  • options (dict): Options including result_function to report results

You must call options["result_function"] with a CommandResultCode to report the command result.

User Scripts

User scripts allow executing custom shell scripts from InOrbit. To enable:

  1. Set user_scripts_dir in your configuration

  2. Pass register_user_scripts=True to the constructor

  3. Place .sh scripts in the user scripts directory

Scripts are automatically registered and can be executed from InOrbit.

Examples