diff --git a/cflib/crazyflie/supervisor.py b/cflib/crazyflie/supervisor.py new file mode 100644 index 000000000..08d15353d --- /dev/null +++ b/cflib/crazyflie/supervisor.py @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- +# +# ,---------, ____ _ __ +# | ,-^-, | / __ )(_) /_______________ _____ ___ +# | ( O ) | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# | / ,--' | / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# +------` /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2025 Bitcraze AB +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, in version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Provides access to the supervisor modue of the Crazyflie platform. +""" +import logging +import time + +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie +from cflib.crtp.crtpstack import CRTPPacket +from cflib.crtp.crtpstack import CRTPPort + +__author__ = 'Bitcraze AB' +__all__ = ['SupervisorState'] + +logger = logging.getLogger(__name__) + +# Bit positions +BIT_CAN_BE_ARMED = 0 +BIT_IS_ARMED = 1 +BIT_IS_AUTO_ARMED = 2 +BIT_CAN_FLY = 3 +BIT_IS_FLYING = 4 +BIT_IS_TUMBLED = 5 +BIT_IS_LOCKED = 6 +BIT_IS_CRASHED = 7 +BIT_HL_CONTROL_ACTIVE = 8 +BIT_HL_TRAJ_FINISHED = 9 +BIT_HL_CONTROL_DISABLED = 10 + +CMD_GET_STATE_BITFIELD = 0x0C +SUPERVISOR_CH_INFO = 0 + + +class SupervisorState: + """ + This class is used to easily get the state of your Crazyflie through + the supervisor. Check the `reading_supervisor.py` example in + examples/supervisor/ to better understand how to use it. + """ + STATES = [ + 'Can be armed', + 'Is armed', + 'Is auto armed', + 'Can fly', + 'Is flying', + 'Is tumbled', + 'Is locked', + 'Is crashed', + 'HL control is active', + 'Finished HL trajectory', + 'HL control is disabled' + ] + + def __init__(self, crazyflie): + if isinstance(crazyflie, SyncCrazyflie): + self._cf = crazyflie.cf + else: + self._cf = crazyflie + + self._cache_timeout = 0.1 # seconds + self._last_fetch_time = 0 + self._bitfield = None + self._cf.add_port_callback(CRTPPort.SUPERVISOR, self._supervisor_callback) + self._response_received = False + + def _supervisor_callback(self, pk: CRTPPacket): + """ + Called when a packet is received. + """ + if len(pk.data) < 1: + return + + cmd = pk.data[0] + if cmd & 0x80: # high bit = response + orig_cmd = cmd & 0x7F + if orig_cmd == CMD_GET_STATE_BITFIELD: + self._bitfield = int.from_bytes(pk.data[1:], byteorder='little') + self._response_received = True + logger.info(f'Supervisor bitfield received: 0x{self._bitfield:04X}') + + def _fetch_bitfield(self, timeout=0.2): + """ + Request the bitfield and wait for response (blocking). + Uses time-based cache to avoid sending packages too frequently. + """ + now = time.time() + + # Return cached value if it's recent enough + if self._bitfield is not None and (now - self._last_fetch_time) < self._cache_timeout: + return self._bitfield + + # Send a new request + self._response_received = False + pk = CRTPPacket() + pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_INFO) + pk.data = [CMD_GET_STATE_BITFIELD] + self._cf.send_packet(pk) + + # Wait for response + start_time = now + while not self._response_received: + if time.time() - start_time > timeout: + logger.warning('Timeout waiting for supervisor bitfield response') + return self._bitfield or 0 # still return last known value + time.sleep(0.01) + + # Update timestamp + self._last_fetch_time = time.time() + return self._bitfield or 0 + + def _bit(self, position): + bitfield = self._fetch_bitfield() + return bool((bitfield >> position) & 0x01) + + def read_bitfield(self): + """ + Directly get the full bitfield value. + """ + return self._fetch_bitfield() + + def read_state_list(self): + """ + Reads the bitfield and returns the list of all active states. + """ + bitfield = self.read_bitfield() + list = self.decode_bitfield(bitfield) + return list + + def decode_bitfield(self, value): + """ + Given a bitfield integer `value` and a list of `self.STATES`, + returns the names of all states whose bits are set. + Bit 0 corresponds to states[0], Bit 1 to states[1], etc. + * Bit 0 = Can be armed - the system can be armed and will accept an arming command. + * Bit 1 = Is armed - the system is armed. + * Bit 2 = Is auto armed - the system is configured to automatically arm. + * Bit 3 = Can fly - the Crazyflie is ready to fly. + * Bit 4 = Is flying - the Crazyflie is flying. + * Bit 5 = Is tumbled - the Crazyflie is up side down. + * Bit 6 = Is locked - the Crazyflie is in the locked state and must be restarted. + * Bit 7 = Is crashed - the Crazyflie has crashed. + * Bit 8 = High level control is actively flying the drone. + * Bit 9 = High level trajectory has finished. + * Bit 10 = High level control is disabled and not producing setpoints. + """ + if value < 0: + raise ValueError('value must be >= 0') + + result = [] + for bit_index, name in enumerate(self.STATES): + if value & (1 << bit_index): + result.append(name) + + return result + + @property + def can_be_armed(self): + return self._bit(BIT_CAN_BE_ARMED) + + @property + def is_armed(self): + return self._bit(BIT_IS_ARMED) + + @property + def is_auto_armed(self): + return self._bit(BIT_IS_AUTO_ARMED) + + @property + def can_fly(self): + return self._bit(BIT_CAN_FLY) + + @property + def is_flying(self): + return self._bit(BIT_IS_FLYING) + + @property + def is_tumbled(self): + return self._bit(BIT_IS_TUMBLED) + + @property + def is_locked(self): + return self._bit(BIT_IS_LOCKED) + + @property + def is_crashed(self): + return self._bit(BIT_IS_CRASHED) + + @property + def hl_control_active(self): + return self._bit(BIT_HL_CONTROL_ACTIVE) + + @property + def hl_traj_finished(self): + return self._bit(BIT_HL_TRAJ_FINISHED) + + @property + def hl_control_disabled(self): + return self._bit(BIT_HL_CONTROL_DISABLED) diff --git a/cflib/crtp/crtpstack.py b/cflib/crtp/crtpstack.py index 2cb9118e7..8204764cd 100644 --- a/cflib/crtp/crtpstack.py +++ b/cflib/crtp/crtpstack.py @@ -47,6 +47,7 @@ class CRTPPort: LOCALIZATION = 0x06 COMMANDER_GENERIC = 0x07 SETPOINT_HL = 0x08 + SUPERVISOR = 0x09 PLATFORM = 0x0D LINKCTRL = 0x0F ALL = 0xFF diff --git a/examples/supervisor/flying_with_supervisor.py b/examples/supervisor/flying_with_supervisor.py new file mode 100644 index 000000000..304fe6717 --- /dev/null +++ b/examples/supervisor/flying_with_supervisor.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# +# || ____ _ __ +# +------+ / __ )(_) /_______________ _____ ___ +# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2025 Bitcraze AB +# +# Crazyflie Nano Quadcopter Client +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Simple example showing how to fly the Crazyflie using supervisor state information. +Based on its current state, the Crazyflie will arm (if it can be armed), take +off (if it can fly), and land (if it is flying). Before each action, we call +the supervisor to check if the Crazyflie is crashed, locked or tumbled. +Tested with the Flow deck V2 and the Lighthouse positioning system. +""" +import time + +import cflib.crtp +from cflib.crazyflie import Crazyflie +from cflib.crazyflie.supervisor import SupervisorState +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie +from cflib.utils import uri_helper + + +# URI to the Crazyflie to connect to +uri = uri_helper.uri_from_env(default='radio://0/30/2M/BADC0DE013') + + +def safety_check(sup: SupervisorState): + if sup.is_crashed: + raise Exception('Crazyflie crashed!') + if sup.is_locked: + raise Exception('Crazyflie locked!') + if sup.is_tumbled: + raise Exception('Crazyflie tumbled!') + + +def run_sequence(scf: SyncCrazyflie, sup: SupervisorState): + commander = scf.cf.high_level_commander + + try: + safety_check(sup) + if sup.can_be_armed: + print('The Crazyflie can be armed...arming!') + safety_check(sup) + scf.cf.platform.send_arming_request(True) + time.sleep(1) + + safety_check(sup) + if sup.can_fly: + print('The Crazyflie can fly...taking off!') + commander.takeoff(1.0, 2.0) + time.sleep(3) + + safety_check(sup) + if sup.is_flying: + print('The Crazyflie is flying...landing!') + commander.land(0.0, 2.0) + time.sleep(3) + + safety_check(sup) + + except Exception as e: + print(f'Safety check failed: {e}') + + +if __name__ == '__main__': + cflib.crtp.init_drivers() + + with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: + time.sleep(1) + supervisor = SupervisorState(scf) + time.sleep(1) + run_sequence(scf, supervisor) diff --git a/examples/supervisor/reading_supervisor.py b/examples/supervisor/reading_supervisor.py new file mode 100644 index 000000000..7dcbc46dc --- /dev/null +++ b/examples/supervisor/reading_supervisor.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# +# || ____ _ __ +# +------+ / __ )(_) /_______________ _____ ___ +# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2025 Bitcraze AB +# +# Crazyflie Nano Quadcopter Client +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Simple, non-flying example demonstrating how to read the Crazyflie's state +through the supervisor. Hold the Crazyflie in your hand and tilt it upside +down to observe the state changes. Once the tilt exceeds ~90°, the can_fly +state becomes False and the is_tumbled state becomes True. +""" +import logging +import time + +import cflib.crtp +from cflib.crazyflie import Crazyflie +from cflib.crazyflie.supervisor import SupervisorState +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie +from cflib.utils import uri_helper + +logging.basicConfig(level=logging.INFO) + +# URI to the Crazyflie to connect to +uri = uri_helper.uri_from_env(default='radio://0/30/2M/BADC0DE010') + + +if __name__ == '__main__': + cflib.crtp.init_drivers() + + with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: + time.sleep(1) + supervisor = SupervisorState(scf) + time.sleep(1) + try: + while True: + print('==============================================================================') + print(f'Can fly: {supervisor.can_fly}') + print(f'Is tumbled: {supervisor.is_tumbled}') + print(f'Bitfield: {supervisor.read_bitfield()}') + print(f'State list: {supervisor.read_state_list()}') + print('==============================================================================') + time.sleep(0.5) + + except KeyboardInterrupt: + print('Script terminated')