-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathSTU3_ROS_Comm.py
More file actions
319 lines (260 loc) · 13.5 KB
/
STU3_ROS_Comm.py
File metadata and controls
319 lines (260 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# DON'T CHANGE ANY OF THE BELOW; NECESSARY FOR JOINING SIMULATION
import os, sys, time, datetime, traceback
import spaceteams as st
def custom_exception_handler(exctype, value, tb):
error_message = "".join(traceback.format_exception(exctype, value, tb))
st.logger_fatal(error_message)
sys.excepthook = custom_exception_handler
st.connect_to_sim(sys.argv)
import numpy as np
# DON'T CHANGE ANY OF THE ABOVE; NECESSARY FOR JOINING SIMULATION
################################################################
st.OnScreenLogMessage("ROS_Comm started properly.", "ROS_Comm", st.Severity.Info)
import copy
################################# Entity and frame setup #################################
this = st.GetThisSystem()
controlledEntity: st.Entity = this.GetParam(st.VarType.entityRef, "ControlledEntity")
# Reference frames
mars: st.Entity = this.GetParam(st.VarType.entityRef, "MarsReferenceFrame")
marsFrame = mars.GetBodyFixedFrame()
local: st.Entity = this.GetParam(st.VarType.entityRef, "LocalFrame")
localFrame = local.GetBodyFixedFrame()
################################# Waypoint data setup #################################
def check_waypoint() -> bool:
waypoints: list[int] = controlledEntity.GetParamArray(st.VarType.int32, ["Waypoints", "WaypointIDs"])
visitedWaypoints: list[int] = controlledEntity.GetParamArray(st.VarType.int32, ["Waypoints", "VisitedWaypointIDs"])
tolerance = 1.5 * controlledEntity.GetParam(st.VarType.double, ["Waypoints", "WaypointVisitTolerance_m"]) # Give it some wiggle room
roverLoc = controlledEntity.getLocation().WRT_ExprIn(localFrame)
# Invalid before waypoints are initialized
if len(waypoints) == 0:
st.OnScreenLogMessage("Warning: there are no waypoints to sample at in this simulation.",
"WaypointSamplingWarning", st.Severity.Warning)
return False
closestWaypoint = waypoints[0]
waypointParamMap = controlledEntity.GetParamMap("Waypoints")
minDistance = 1e6 # meters
for i in range(len(waypoints)):
waypoint = waypoints[i]
waypointEntity: st.Entity = waypointParamMap.GetParam(st.VarType.entityRef, f"Waypoint{waypoint}")
waypointLoc = waypointEntity.getLocation().WRT_ExprIn(localFrame)
distance = np.linalg.norm(waypointLoc - roverLoc)
if distance < minDistance:
minDistance = copy.deepcopy(distance)
closestWaypoint = copy.deepcopy(waypoint)
# Can't get credit for revisiting the same waypoints
if closestWaypoint in visitedWaypoints:
st.OnScreenLogMessage("Warning: you tried sampling at a waypoint you already visited.",
"WaypointSamplingWarning", st.Severity.Warning)
return False
if minDistance <= tolerance:
visitedWaypoints.append(closestWaypoint)
controlledEntity.SetParamArray(st.VarType.int32, ["Waypoints", "VisitedWaypointIDs"], visitedWaypoints)
st.OnScreenLogMessage(f"Success! You're collecting core samples at waypoint #{closestWaypoint}.",
"WaypointSamplingSuccess", st.Severity.Info)
return True
else:
distancePrint = f"Distance to nearest waypoint: {minDistance:.3f} meters, which is greater than the {tolerance:.3f}-meter tolerance."
st.OnScreenLogMessage("Warning: you tried to sample too far from a waypoint. " + distancePrint,
"WaypointSamplingWarning", st.Severity.Warning)
return False
################################# ROS setup #################################
import roslibpy
st.logger_info("Starting ROS Communication Server...")
try:
ros = roslibpy.Ros(host='localhost', port=9090)
ros.run()
except Exception as e:
st.logger_error(f"Error connecting to ROS: {str(e)}")
st.OnScreenLogMessage("ROS_Comm failed to connect to ROS. Please restart the sim after ROSBridge is running.", "ROS_Comm", st.Severity.Error)
exit_flag = False
while not exit_flag:
# Loop forever just so we don't (sometimes) crash the sim by leaving early
time.sleep(0.2)
st.leave_sim()
time.sleep(0.5) # Wait a bit so the messages send reliably before the socket closes
exit(1)
st.logger_info("Connected to ROS")
############################## Control Services #################################
# Logger service
Logger_service = roslibpy.Service(ros, '/log_message', 'space_teams_definitions/String')
def handle_logger_request(request, response):
try:
log_message = request['data']
st.OnScreenLogMessage(f"ROS Log: {log_message}", "ROSLog", st.Severity.Info)
response['success'] = True
return True
except Exception as e:
st.OnScreenLogMessage(f"ROS Log Error: {str(e)}", "ROSLogError", st.Severity.Error)
response['success'] = False
return False
Logger_service.advertise(handle_logger_request)
# Steer_service
steer_service = roslibpy.Service(ros, '/Steer', 'space_teams_definitions/Float')
def handle_steer_request(request, response):
try:
data = request['data']
if controlledEntity.GetParam(st.VarType.bool, "IsActive") == True:
controlledEntity.SetParam(st.VarType.double, ["ControlCmd", "SteerRight"], data)
if not controlledEntity.GetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"]):
controlledEntity.SetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"], True)
response['success'] = True
return True
except Exception as e:
st.logger_fatal(f"Error handling Steer request: {str(e)}")
response['success'] = False
return False
steer_service.advertise(handle_steer_request)
# Accelerator service
Accelerator_service = roslibpy.Service(ros, '/Accelerator', 'space_teams_definitions/Float')
def handle_accelerator_request(request, response):
try:
accel_value = request['data']
if controlledEntity.GetParam(st.VarType.bool, "IsActive") == True:
controlledEntity.SetParam(st.VarType.double, ["ControlCmd", "Accelerator"], accel_value)
if not controlledEntity.GetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"]):
controlledEntity.SetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"], True)
response['success'] = True
return True
except Exception as e:
st.logger_fatal(f"Error handling Accelerator request: {str(e)}")
response['success'] = False
return False
Accelerator_service.advertise(handle_accelerator_request)
# Reverse service
Reverse_service = roslibpy.Service(ros, '/Reverse', 'space_teams_definitions/Float')
def handle_reverse_request(request, response):
try:
reverse_value = request['data']
if controlledEntity.GetParam(st.VarType.bool, "IsActive") == True:
controlledEntity.SetParam(st.VarType.double, ["ControlCmd", "Brake"], reverse_value)
if not controlledEntity.GetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"]):
controlledEntity.SetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"], True)
response['success'] = True
return True
except Exception as e:
st.logger_fatal(f"Error handling Reverse request: {str(e)}")
response['success'] = False
return False
Reverse_service.advertise(handle_reverse_request)
# Brake service
Brake_service = roslibpy.Service(ros, '/Brake', 'space_teams_definitions/Float')
def handle_brake_request(request, response):
try:
brake_value = request['data']
if controlledEntity.GetParam(st.VarType.bool, "IsActive") == True:
controlledEntity.SetParam(st.VarType.bool, ["ControlCmd", "Handbrake"], brake_value > 0.5)
if not controlledEntity.GetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"]):
controlledEntity.SetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"], True)
response['success'] = True
return True
except Exception as e:
st.logger_fatal(f"Error handling Brake request: {str(e)}")
response['success'] = False
return False
Brake_service.advertise(handle_brake_request)
# CoreSample service
CoreSample_service = roslibpy.Service(ros, '/CoreSample', 'space_teams_definitions/Float')
def handle_core_sample_request(request, response):
try:
validSampling = check_waypoint()
validControlledEntity = controlledEntity.GetParam(st.VarType.bool, "IsActive") == True
if validSampling and validControlledEntity:
beginTimeWarpPayload: st.ParamMap = st.ParamMap()
beginTimeWarpPayload.AddParam(st.VarType.entityRef, "Rover", controlledEntity)
st.SimGlobals.DispatchEvent("BeginTimeWarp", beginTimeWarpPayload)
if not controlledEntity.GetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"]):
controlledEntity.SetParam(st.VarType.bool, ["State", "HasReceivedFirstROSCommand"], True)
response['success'] = True
return True
except Exception as e:
st.logger_fatal(f"Error handling CoreSample request: {str(e)}")
response['success'] = False
return False
CoreSample_service.advertise(handle_core_sample_request)
# CoreSamplingComplete publisher
coreSamplingComplete_publisher = roslibpy.Topic(ros, '/CoreSamplingComplete', 'geometry_msgs/Point')
def publish_coreSamplingComplete(payload: st.ParamMap, time: st.timestamp):
# Junk message that has no actual meaning; just need the event itself:
coreSamplingComplete_msg = roslibpy.Message({'x': 0.0, 'y': 0.0, 'z': 0.0})
try:
coreSamplingComplete_publisher.publish(coreSamplingComplete_msg)
except Exception as e:
st.logger_fatal(f"Error publishing CoreSamplingComplete to ROS: {str(e)}")
st.SimGlobals.AddEventListener("EndTimeWarp", publish_coreSamplingComplete)
#################################### Camera Control Services #################################'
# get camera entity
camera: st.Entity = st.GetThisSystem().GetParam(st.VarType.entityRef, "Camera")
# NOTE: FOV CHANGING DISABLED; WOULD REQUIRE REPLACEMENT CameraInfo IN ROS
# # change FOV service
# minimum_fov = 40.0
# maximum_fov = 90.0
# ChangeFOV_service = roslibpy.Service(ros, '/ChangeFOV', 'space_teams_definitions/Float')
# def handle_change_fov_request(request, response):
# try:
# fov_value = request['data']
# clamped_fov = max(minimum_fov, min(fov_value, maximum_fov))
# camera.SetParam(st.VarType.double, "FOV", clamped_fov)
# response['success'] = True
# st.logger_info(f"Camera FOV set to {camera.GetParam(st.VarType.double, 'FOV')}")
# return True
# except Exception as e:
# st.logger_error(f"Error handling ChangeFOV request: {str(e)}")
# response['success'] = False
# return False
# finally:
# pass
# ChangeFOV_service.advertise(handle_change_fov_request)
# change Exposure service
minimum_exposure = 5.0
maximum_exposure = 20.0
ChangeExposure_service = roslibpy.Service(ros, '/ChangeExposure', 'space_teams_definitions/Float')
def handle_change_exposure_request(request, response):
try:
exposure_value = request['data']
clamped_exposure = max(minimum_exposure, min(exposure_value, maximum_exposure))
camera.SetParam(st.VarType.double, "Exposure", clamped_exposure)
response['success'] = True
st.logger_info(f"Camera exposure set to {camera.GetParam(st.VarType.double, 'Exposure')}")
return True
except Exception as e:
st.logger_fatal(f"Error handling ChangeExposure request: {str(e)}")
response['success'] = False
return False
ChangeExposure_service.advertise(handle_change_exposure_request)
# change cam freq service
max_freq_rgb = 15.0
ChangeRGBFreq_service = roslibpy.Service(ros, '/ChangeRGBFreq', 'space_teams_definitions/Float')
def handle_change_rgb_freq_request(request, response):
try:
rgb_freq_value = request['data']
clamped_rgb_freq = max(0.01, min(rgb_freq_value, max_freq_rgb))
camera.SetParam(st.VarType.double, "RGB_FreqHz", clamped_rgb_freq)
response['success'] = True
st.logger_info(f"RGB Camera frequency set to {camera.GetParam(st.VarType.double, 'RGB_FreqHz')}")
return True
except Exception as e:
st.logger_fatal(f"Error handling ChangeRGBFreq request: {str(e)}")
response['success'] = False
return False
ChangeRGBFreq_service.advertise(handle_change_rgb_freq_request)
# change depth freq service
max_freq_depth = 5.0
ChangeDepthFreq_service = roslibpy.Service(ros, '/ChangeDepthFreq', 'space_teams_definitions/Float')
def handle_change_depth_freq_request(request, response):
try:
depth_freq_value = request['data']
clamped_depth_freq = max(0.01, min(depth_freq_value, max_freq_depth))
camera.SetParam(st.VarType.double, "Depth_FreqHz", clamped_depth_freq)
response['success'] = True
st.logger_info(f"Depth Camera frequency set to {camera.GetParam(st.VarType.double, 'Depth_FreqHz')}")
return True
except Exception as e:
st.logger_fatal(f"Error handling ChangeDepthFreq request: {str(e)}")
response['success'] = False
return False
ChangeDepthFreq_service.advertise(handle_change_depth_freq_request)
exit_flag = False
while not exit_flag:
time.sleep(1.0 / st.GetThisSystem().GetParam(st.VarType.double, "LoopFreqHz"))
st.leave_sim()
time.sleep(0.5) # Wait a bit so the messages send reliably before the socket closes