Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions cflib/crazyflie/supervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# -*- coding: utf-8 -*-
#
# ,---------, ____ _ __
# | ,-^-, | / __ )(_) /_______________ _____ ___
# | ( O ) | / __ / / __/ ___/ ___/ __ `/_ / / _ \
# | / ,--' | / /_/ / / /_/ /__/ / / /_/ / / /_/ __/
# +------` /_____/_/\__/\___/_/ \__,_/ /___/\___/
#
# Copyright (C) 2025 Bitcraze AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, in version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Provides access to the supervisor modue of the Crazyflie platform.
"""
import logging
import time

from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crtp.crtpstack import CRTPPacket
from cflib.crtp.crtpstack import CRTPPort

__author__ = 'Bitcraze AB'
__all__ = ['SupervisorState']

logger = logging.getLogger(__name__)

# Bit positions
BIT_CAN_BE_ARMED = 0
BIT_IS_ARMED = 1
BIT_IS_AUTO_ARMED = 2
BIT_CAN_FLY = 3
BIT_IS_FLYING = 4
BIT_IS_TUMBLED = 5
BIT_IS_LOCKED = 6
BIT_IS_CRASHED = 7
BIT_HL_CONTROL_ACTIVE = 8
BIT_HL_TRAJ_FINISHED = 9
BIT_HL_CONTROL_DISABLED = 10

CMD_GET_STATE_BITFIELD = 0x0C
SUPERVISOR_CH_INFO = 0


class SupervisorState:
"""
This class is used to easily get the state of your Crazyflie through
the supervisor. Check the `reading_supervisor.py` example in
examples/supervisor/ to better understand how to use it.
"""
STATES = [
'Can be armed',
'Is armed',
'Is auto armed',
'Can fly',
'Is flying',
'Is tumbled',
'Is locked',
'Is crashed',
'HL control is active',
'Finished HL trajectory',
'HL control is disabled'
]

def __init__(self, crazyflie):
if isinstance(crazyflie, SyncCrazyflie):
self._cf = crazyflie.cf
else:
self._cf = crazyflie

self._cache_timeout = 0.1 # seconds
self._last_fetch_time = 0
self._bitfield = None
self._cf.add_port_callback(CRTPPort.SUPERVISOR, self._supervisor_callback)
self._response_received = False

def _supervisor_callback(self, pk: CRTPPacket):
"""
Called when a packet is received.
"""
if len(pk.data) < 1:
return

cmd = pk.data[0]
if cmd & 0x80: # high bit = response
orig_cmd = cmd & 0x7F
if orig_cmd == CMD_GET_STATE_BITFIELD:
self._bitfield = int.from_bytes(pk.data[1:], byteorder='little')
self._response_received = True
logger.info(f'Supervisor bitfield received: 0x{self._bitfield:04X}')

def _fetch_bitfield(self, timeout=0.2):
"""
Request the bitfield and wait for response (blocking).
Uses time-based cache to avoid sending packages too frequently.
"""
now = time.time()

# Return cached value if it's recent enough
if self._bitfield is not None and (now - self._last_fetch_time) < self._cache_timeout:
return self._bitfield

# Send a new request
self._response_received = False
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_INFO)
pk.data = [CMD_GET_STATE_BITFIELD]
self._cf.send_packet(pk)

# Wait for response
start_time = now
while not self._response_received:
if time.time() - start_time > timeout:
logger.warning('Timeout waiting for supervisor bitfield response')
return self._bitfield or 0 # still return last known value
time.sleep(0.01)

# Update timestamp
self._last_fetch_time = time.time()
return self._bitfield or 0

def _bit(self, position):
bitfield = self._fetch_bitfield()
return bool((bitfield >> position) & 0x01)

def read_bitfield(self):
"""
Directly get the full bitfield value.
"""
return self._fetch_bitfield()

def read_state_list(self):
"""
Reads the bitfield and returns the list of all active states.
"""
bitfield = self.read_bitfield()
list = self.decode_bitfield(bitfield)
return list

def decode_bitfield(self, value):
"""
Given a bitfield integer `value` and a list of `self.STATES`,
returns the names of all states whose bits are set.
Bit 0 corresponds to states[0], Bit 1 to states[1], etc.
* Bit 0 = Can be armed - the system can be armed and will accept an arming command.
* Bit 1 = Is armed - the system is armed.
* Bit 2 = Is auto armed - the system is configured to automatically arm.
* Bit 3 = Can fly - the Crazyflie is ready to fly.
* Bit 4 = Is flying - the Crazyflie is flying.
* Bit 5 = Is tumbled - the Crazyflie is up side down.
* Bit 6 = Is locked - the Crazyflie is in the locked state and must be restarted.
* Bit 7 = Is crashed - the Crazyflie has crashed.
* Bit 8 = High level control is actively flying the drone.
* Bit 9 = High level trajectory has finished.
* Bit 10 = High level control is disabled and not producing setpoints.
"""
if value < 0:
raise ValueError('value must be >= 0')

result = []
for bit_index, name in enumerate(self.STATES):
if value & (1 << bit_index):
result.append(name)

return result

@property
def can_be_armed(self):
return self._bit(BIT_CAN_BE_ARMED)

@property
def is_armed(self):
return self._bit(BIT_IS_ARMED)

@property
def is_auto_armed(self):
return self._bit(BIT_IS_AUTO_ARMED)

@property
def can_fly(self):
return self._bit(BIT_CAN_FLY)

@property
def is_flying(self):
return self._bit(BIT_IS_FLYING)

@property
def is_tumbled(self):
return self._bit(BIT_IS_TUMBLED)

@property
def is_locked(self):
return self._bit(BIT_IS_LOCKED)

@property
def is_crashed(self):
return self._bit(BIT_IS_CRASHED)

@property
def hl_control_active(self):
return self._bit(BIT_HL_CONTROL_ACTIVE)

@property
def hl_traj_finished(self):
return self._bit(BIT_HL_TRAJ_FINISHED)

@property
def hl_control_disabled(self):
return self._bit(BIT_HL_CONTROL_DISABLED)
1 change: 1 addition & 0 deletions cflib/crtp/crtpstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class CRTPPort:
LOCALIZATION = 0x06
COMMANDER_GENERIC = 0x07
SETPOINT_HL = 0x08
SUPERVISOR = 0x09
PLATFORM = 0x0D
LINKCTRL = 0x0F
ALL = 0xFF
Expand Down
89 changes: 89 additions & 0 deletions examples/supervisor/flying_with_supervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
#
# || ____ _ __
# +------+ / __ )(_) /_______________ _____ ___
# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \
# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/
# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/
#
# Copyright (C) 2025 Bitcraze AB
#
# Crazyflie Nano Quadcopter Client
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Simple example showing how to fly the Crazyflie using supervisor state information.
Based on its current state, the Crazyflie will arm (if it can be armed), take
off (if it can fly), and land (if it is flying). Before each action, we call
the supervisor to check if the Crazyflie is crashed, locked or tumbled.
Tested with the Flow deck V2 and the Lighthouse positioning system.
"""
import time

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.supervisor import SupervisorState
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.utils import uri_helper


# URI to the Crazyflie to connect to
uri = uri_helper.uri_from_env(default='radio://0/30/2M/BADC0DE013')


def safety_check(sup: SupervisorState):
if sup.is_crashed:
raise Exception('Crazyflie crashed!')
if sup.is_locked:
raise Exception('Crazyflie locked!')
if sup.is_tumbled:
raise Exception('Crazyflie tumbled!')


def run_sequence(scf: SyncCrazyflie, sup: SupervisorState):
commander = scf.cf.high_level_commander

try:
safety_check(sup)
if sup.can_be_armed:
print('The Crazyflie can be armed...arming!')
safety_check(sup)
scf.cf.platform.send_arming_request(True)
time.sleep(1)

safety_check(sup)
if sup.can_fly:
print('The Crazyflie can fly...taking off!')
commander.takeoff(1.0, 2.0)
time.sleep(3)

safety_check(sup)
if sup.is_flying:
print('The Crazyflie is flying...landing!')
commander.land(0.0, 2.0)
time.sleep(3)

safety_check(sup)

except Exception as e:
print(f'Safety check failed: {e}')


if __name__ == '__main__':
cflib.crtp.init_drivers()

with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
time.sleep(1)
supervisor = SupervisorState(scf)
time.sleep(1)
run_sequence(scf, supervisor)
63 changes: 63 additions & 0 deletions examples/supervisor/reading_supervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
#
# || ____ _ __
# +------+ / __ )(_) /_______________ _____ ___
# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \
# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/
# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/
#
# Copyright (C) 2025 Bitcraze AB
#
# Crazyflie Nano Quadcopter Client
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Simple, non-flying example demonstrating how to read the Crazyflie's state
through the supervisor. Hold the Crazyflie in your hand and tilt it upside
down to observe the state changes. Once the tilt exceeds ~90°, the can_fly
state becomes False and the is_tumbled state becomes True.
"""
import logging
import time

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.supervisor import SupervisorState
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.utils import uri_helper

logging.basicConfig(level=logging.INFO)

# URI to the Crazyflie to connect to
uri = uri_helper.uri_from_env(default='radio://0/30/2M/BADC0DE010')


if __name__ == '__main__':
cflib.crtp.init_drivers()

with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
time.sleep(1)
supervisor = SupervisorState(scf)
time.sleep(1)
try:
while True:
print('==============================================================================')
print(f'Can fly: {supervisor.can_fly}')
print(f'Is tumbled: {supervisor.is_tumbled}')
print(f'Bitfield: {supervisor.read_bitfield()}')
print(f'State list: {supervisor.read_state_list()}')
print('==============================================================================')
time.sleep(0.5)

except KeyboardInterrupt:
print('Script terminated')