Skip to content

Commit 15df47f

Browse files
committed
[core] graph : Manage nodeStatus file monitoring
1 parent bcbdb65 commit 15df47f

File tree

5 files changed

+147
-81
lines changed

5 files changed

+147
-81
lines changed

meshroom/core/desc/node.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from inspect import getfile
33
from pathlib import Path
44
import logging
5-
import os
65
import psutil
76
import shlex
87
import shutil
@@ -29,6 +28,7 @@ def __init__(self):
2928
signal.signal(signal.SIGTERM, self.exit)
3029

3130
def addSubprocess(self, process):
31+
print(f"[ExitCleanup] (addSubprocess) register subprocess {process}")
3232
self._subprocesses.append(process)
3333

3434
def exit(self, signum, frame):
@@ -40,7 +40,7 @@ def exit(self, signum, frame):
4040
proc.wait(timeout=5)
4141
except subprocess.TimeoutExpired:
4242
proc.kill()
43-
raise RuntimeError("Process has been killed")
43+
sys.exit(0)
4444

4545
exitCleanup = ExitCleanup()
4646

meshroom/core/graph.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,6 +1478,9 @@ def update(self):
14781478
self.dirtyTopology = False
14791479

14801480
self.updated.emit()
1481+
1482+
def updateMonitoredFiles(self):
1483+
self.statusUpdated.emit()
14811484

14821485
def markNodesDirty(self, fromNode):
14831486
"""
@@ -1601,6 +1604,7 @@ def setVerbose(self, v):
16011604
cacheDirChanged = Signal()
16021605
cacheDir = Property(str, cacheDir.fget, cacheDir.fset, notify=cacheDirChanged)
16031606
updated = Signal()
1607+
statusUpdated = Signal()
16041608
canComputeLeavesChanged = Signal()
16051609
canComputeLeaves = Property(bool, lambda self: self._canComputeLeaves, notify=canComputeLeavesChanged)
16061610

meshroom/core/node.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Status(Enum):
5050
STOPPED = 4
5151
KILLED = 5
5252
SUCCESS = 6
53+
# SKIPPED = 7
5354
INPUT = 7 # Special status for input nodes
5455

5556

@@ -94,12 +95,6 @@ def setNode(self, node):
9495
self.nodeName = node.name
9596
self.setNodeType(node)
9697

97-
def initStartCompute(self):
98-
pass # TODO
99-
100-
def initIsolatedCompute(self):
101-
pass # TODO
102-
10398
def initExternSubmit(self):
10499
"""
105100
When submitting a node, we reset the status information to ensure that we do not keep
@@ -125,12 +120,10 @@ def initLocalSubmit(self):
125120
self.status = Status.SUBMITTED
126121
self.execMode = ExecMode.LOCAL
127122

128-
def initEndCompute(self):
129-
pass # TODO
130-
131123
def setComputationStatusToInheritChunks(self):
124+
# TODO
132125
self.status: Status = Status.NONE
133-
126+
134127
def setNodeType(self, node):
135128
"""
136129
Set the node type and package information from the given node.
@@ -525,6 +518,15 @@ def logger(self):
525518

526519
def getExecModeName(self):
527520
return self._status.execMode.name
521+
522+
def shouldMonitorChanges(self):
523+
""" Check whether we should monitor changes in minimal mode
524+
Only chunks that are run externally or local_isolated should be monitored,
525+
when run locally, status changes are already notified.
526+
Chunks with an ERROR status may be re-submitted externally and should thus still be monitored
527+
"""
528+
return (self.isExtern() and self._status.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or \
529+
(self.node.getMrNodeType() == MrNodeType.NODE and self._status.status in (Status.SUBMITTED, Status.RUNNING))
528530

529531
def updateStatusFromCache(self):
530532
"""
@@ -838,6 +840,7 @@ def __init__(self, nodeType: str, position: Position = None, parent: BaseObject
838840

839841
self._nodeStatus: NodeStatusData = NodeStatusData(self._name, nodeType, self.packageName,
840842
self.packageVersion, self.getMrNodeType())
843+
self.nodeStatusFileLastModTime = -1
841844

842845
self.globalStatusChanged.connect(self.updateDuplicatesStatusAndLocked)
843846

@@ -1352,6 +1355,15 @@ def isExtern(self):
13521355
interrupted, its execution mode will always be local, even if computations resume
13531356
externally.
13541357
"""
1358+
if not self._chunksCreated:
1359+
if self._nodeStatus.execMode == ExecMode.EXTERN:
1360+
return True
1361+
# TODO : use sessionID
1362+
# elif self._nodeStatus.execMode == ExecMode.LOCAL:
1363+
# if self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING):
1364+
# return meshroom.core.sessionUid not in (self._nodeStatus.submitterSessionUid, self._nodeStatus.sessionUid)
1365+
# return False
1366+
return False
13551367
if len(self._chunks) == 0:
13561368
return False
13571369
return any(chunk.isExtern() for chunk in self._chunks)
@@ -1512,11 +1524,23 @@ def sourceCodeFolder(self):
15121524
@property
15131525
def nodeStatusFile(self):
15141526
return os.path.join(self.graph.cacheDir, self.internalFolder, "nodeStatus")
1527+
1528+
def shouldMonitorChanges(self):
1529+
""" Check whether we should monitor changes in minimal mode
1530+
Only chunks that are run externally or local_isolated should be monitored,
1531+
when run locally, status changes are already notified.
1532+
Chunks with an ERROR status may be re-submitted externally and should thus still be monitored
1533+
"""
1534+
if self._chunksCreated:
1535+
# Only monitor when chunks are not created (in this case monitor chunk status files instead)
1536+
return False
1537+
return (self.isExtern() and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or \
1538+
(self.getMrNodeType() == MrNodeType.NODE and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING))
15151539

15161540
def updateNodeStatusFromCache(self):
15171541
"""
15181542
Update node status based on status file content/existence.
1519-
# TODO : integrate statusFileLastModTime ?
1543+
# TODO : integrate nodeStatusFileLastModTime ?
15201544
Returns True if a change on the chunk setup has been detected
15211545
"""
15221546
chunksRangeHasChanged = False
@@ -1525,8 +1549,10 @@ def updateNodeStatusFromCache(self):
15251549
self._nodeStatus.loadFromCache(self.nodeStatusFile)
15261550
if self._nodeStatus.chunks != oldChunkSetup:
15271551
chunksRangeHasChanged = True
1552+
self.nodeStatusFileLastModTime = os.path.getmtime(self.nodeStatusFile)
15281553
else:
15291554
# No status file => reset status to Status.None
1555+
self.nodeStatusFileLastModTime = -1
15301556
self._nodeStatus.reset()
15311557
self._nodeStatus.setNodeType(self)
15321558
return chunksRangeHasChanged

meshroom/core/taskManager.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,9 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName}
479479
))
480480

481481
# Update task manager's lists
482+
print("[TaskManager] (submit) updateNodes")
482483
self.updateNodes()
484+
print("[TaskManager] (submit) graph.update")
483485
graph.update()
484486

485487
# Check dependencies of toNodes
@@ -500,6 +502,13 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName}
500502
self.checkCompatibilityNodes(graph, nodesToProcess, "SUBMITTING") # name of the context is important for QML
501503
self.checkDuplicates(nodesToProcess, "SUBMITTING") # name of the context is important for QML
502504

505+
# Update nodes status
506+
for node in nodesToProcess:
507+
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
508+
node.initStatusOnSubmit()
509+
print("[TaskManager] (submit) graph.updateMonitoredFiles")
510+
graph.updateMonitoredFiles()
511+
503512
flowEdges = graph.flowEdges(startNodes=toNodes)
504513
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
505514

@@ -508,11 +517,13 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName}
508517

509518
try:
510519
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath, submitLabel=submitLabel)
511-
for node in nodesToProcess:
512-
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
513-
node.initStatusOnSubmit() # update node status
520+
if res:
514521
if isinstance(res, BaseSubmittedJob):
515522
jobManager.addJob(res, nodesToProcess)
523+
else:
524+
for node in nodesToProcess:
525+
# TODO : Notify the node that there was an issue on submit
526+
pass
516527
self._nodes.update(nodesToProcess)
517528
self._nodesExtern.extend(nodesToProcess)
518529

0 commit comments

Comments
 (0)