Skip to content

Commit 7d8bb18

Browse files
committed
Merge pull request #570 from dronekit/tcr-writethread
Splits outbound messages into its own thread.
2 parents 04f7d1b + 42244b5 commit 7d8bb18

File tree

2 files changed

+82
-30
lines changed

2 files changed

+82
-30
lines changed

dronekit/__init__.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,9 +1452,10 @@ def remove_message_listener(self, name, fn):
14521452
"""
14531453
name = str(name)
14541454
if name in self._message_listeners:
1455-
self._message_listeners[name].remove(fn)
1456-
if len(self._message_listeners[name]) == 0:
1457-
del self._message_listeners[name]
1455+
if fn in self._message_listeners[name]:
1456+
self._message_listeners[name].remove(fn)
1457+
if len(self._message_listeners[name]) == 0:
1458+
del self._message_listeners[name]
14581459

14591460
def notify_message_listeners(self, name, msg):
14601461
for fn in self._message_listeners.get(name, []):
@@ -2605,7 +2606,11 @@ def clear(self):
26052606

26062607
# Add home point again.
26072608
self.wait_ready()
2608-
home = self._vehicle._wploader.wp(0)
2609+
home = None
2610+
try:
2611+
home = self._vehicle._wploader.wp(0)
2612+
except:
2613+
pass
26092614
self._vehicle._wploader.clear()
26102615
if home:
26112616
self._vehicle._wploader.add(home, comment='Added by DroneKit')
@@ -2701,7 +2706,8 @@ def connect(ip,
27012706
rate=4,
27022707
baud=115200,
27032708
heartbeat_timeout=30,
2704-
source_system=255):
2709+
source_system=255,
2710+
use_native=False):
27052711
"""
27062712
Returns a :py:class:`Vehicle` object connected to the address specified by string parameter ``ip``.
27072713
Connection string parameters (``ip``) for different targets are listed in the :ref:`getting started guide <get_started_connecting>`.
@@ -2737,6 +2743,7 @@ def connect(ip,
27372743
:param int heartbeat_timeout: Connection timeout value in seconds (default is 30s).
27382744
If a heartbeat is not detected within this time an exception will be raised.
27392745
:param int source_system: The MAVLink ID of the :py:class:`Vehicle` object returned by this method (by default 255).
2746+
:param bool use_native: Use precompiled MAVLink parser.
27402747
27412748
.. note::
27422749
@@ -2755,7 +2762,7 @@ def connect(ip,
27552762
if not vehicle_class:
27562763
vehicle_class = Vehicle
27572764

2758-
handler = MAVConnection(ip, baud=baud, source_system=source_system)
2765+
handler = MAVConnection(ip, baud=baud, source_system=source_system, use_native=use_native)
27592766
vehicle = vehicle_class(handler)
27602767

27612768
if status_printer:

dronekit/mavlink.py

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121

2222

2323
class MAVWriter(object):
24+
"""
25+
Indirection layer to take messages written to MAVlink and send them all
26+
on the same thread.
27+
"""
28+
2429
def __init__(self, queue):
2530
self.queue = queue
2631

@@ -33,7 +38,7 @@ def read(self):
3338

3439

3540
class MAVConnection(object):
36-
def __init__(self, ip, baud=115200, target_system=0, source_system=255):
41+
def __init__(self, ip, baud=115200, target_system=0, source_system=255, use_native=False):
3742
self.master = mavutil.mavlink_connection(ip, baud=baud, source_system=source_system)
3843

3944
# TODO get rid of "master" object as exposed,
@@ -42,7 +47,7 @@ def __init__(self, ip, baud=115200, target_system=0, source_system=255):
4247
self.master.mav = mavutil.mavlink.MAVLink(
4348
MAVWriter(self.out_queue),
4449
srcSystem=self.master.source_system,
45-
use_native=False)
50+
use_native=use_native)
4651

4752
# Monkey-patch MAVLink object for fix_targets.
4853
sendfn = self.master.mav.send
@@ -72,7 +77,42 @@ def onexit():
7277

7378
atexit.register(onexit)
7479

75-
def mavlink_thread():
80+
def mavlink_thread_out():
81+
# Huge try catch in case we see http://bugs.python.org/issue1856
82+
try:
83+
while self._alive:
84+
try:
85+
msg = self.out_queue.get(True, timeout=0.01)
86+
self.master.write(msg)
87+
except Empty:
88+
continue
89+
except socket.error as error:
90+
# If connection reset (closed), stop polling.
91+
if error.errno == ECONNABORTED:
92+
raise APIException('Connection aborting during read')
93+
raise
94+
except Exception as e:
95+
errprinter('>>> mav send error:', e)
96+
break
97+
except APIException as e:
98+
errprinter('>>> ' + str(e.message))
99+
self._alive = False
100+
self.master.close()
101+
self._death_error = e
102+
103+
except Exception as e:
104+
# http://bugs.python.org/issue1856
105+
if not self._alive:
106+
pass
107+
else:
108+
self._alive = False
109+
self.master.close()
110+
self._death_error = e
111+
112+
# Explicitly clear out buffer so .close closes.
113+
self.out_queue = Queue()
114+
115+
def mavlink_thread_in():
76116
# Huge try catch in case we see http://bugs.python.org/issue1856
77117
try:
78118
while True:
@@ -83,21 +123,6 @@ def mavlink_thread():
83123
for fn in self.loop_listeners:
84124
fn(self)
85125

86-
while True:
87-
try:
88-
msg = self.out_queue.get_nowait()
89-
self.master.write(msg)
90-
except socket.error as error:
91-
# If connection reset (closed), stop polling.
92-
if error.errno == ECONNABORTED:
93-
raise APIException('Connection aborting during read')
94-
raise
95-
except Empty:
96-
break
97-
except Exception as e:
98-
errprinter('>>> mav send error:', e)
99-
break
100-
101126
while self._accept_input:
102127
try:
103128
msg = self.master.recv_msg()
@@ -140,9 +165,13 @@ def mavlink_thread():
140165
self.master.close()
141166
self._death_error = e
142167

143-
t = Thread(target=mavlink_thread)
168+
t = Thread(target=mavlink_thread_in)
169+
t.daemon = True
170+
self.mavlink_thread_in = t
171+
172+
t = Thread(target=mavlink_thread_out)
144173
t.daemon = True
145-
self.mavlink_thread = t
174+
self.mavlink_thread_out = t
146175

147176
def reset(self):
148177
self.out_queue = Queue()
@@ -173,8 +202,10 @@ def forward_message(self, fn):
173202
self.message_listeners.append(fn)
174203

175204
def start(self):
176-
if not self.mavlink_thread.is_alive():
177-
self.mavlink_thread.start()
205+
if not self.mavlink_thread_in.is_alive():
206+
self.mavlink_thread_in.start()
207+
if not self.mavlink_thread_out.is_alive():
208+
self.mavlink_thread_out.start()
178209

179210
def close(self):
180211
# TODO this can block forever if parameters continue to be added
@@ -189,13 +220,27 @@ def pipe(self, target):
189220
# vehicle -> self -> target
190221
@self.forward_message
191222
def callback(_, msg):
192-
target.out_queue.put(msg.pack(target.master.mav))
223+
try:
224+
target.out_queue.put(msg.pack(target.master.mav))
225+
except:
226+
try:
227+
assert len(msg.get_msgbuf()) > 0
228+
target.out_queue.put(msg.get_msgbuf())
229+
except:
230+
errprinter('>>> Could not pack this object on receive: %s' % type(msg))
193231

194232
# target -> self -> vehicle
195233
@target.forward_message
196234
def callback(_, msg):
197235
msg = copy.copy(msg)
198236
target.fix_targets(msg)
199-
self.out_queue.put(msg.pack(self.master.mav))
237+
try:
238+
self.out_queue.put(msg.pack(self.master.mav))
239+
except:
240+
try:
241+
assert len(msg.get_msgbuf()) > 0
242+
self.out_queue.put(msg.get_msgbuf())
243+
except:
244+
errprinter('>>> Could not pack this object on forward: %s' % type(msg))
200245

201246
return target

0 commit comments

Comments
 (0)