Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public AgentTestStartData registerAgentForJob(AgentData agentData) {
// TODO: figure out controller restarts
if (jobInfo != null) { // jobInfo is null if the controller has been restarted
synchronized (jobInfo) {
ret = new AgentTestStartData(jobInfo.scripts, jobInfo.getUsers(agentData), jobInfo.jobRequest.getRampTime());
try {
ret = new AgentTestStartData(jobInfo.scripts, jobInfo.getUsers(agentData), jobInfo.jobRequest.getRampTime());
ret.setAgentInstanceNum(jobInfo.agentData.size());
ret.setDataFiles(getDataFileRequests(jobInfo));
ret.setJobId(agentData.getJobId());
Expand All @@ -172,14 +173,21 @@ public AgentTestStartData registerAgentForJob(AgentData agentData) {
ret.setUserIntervalIncrement(jobInfo.jobRequest.getUserIntervalIncrement());
ret.setTargetRampRate(jobInfo.jobRequest.getTargetRatePerAgent());
jobInfo.agentData.add(agentData);
LOG.info(new ObjectMessage(Map.of("Message", "Agent " + agentData.getInstanceId() + " added to job " + agentData.getJobId() + ". Total agents now: " + jobInfo.agentData.size() + ", isFilled: " + jobInfo.isFilled())));
CloudVmStatus status = vmTracker.getStatus(agentData.getInstanceId());
if(status != null) {
status.setVmStatus(VMStatus.pending);
vmTracker.setStatus(status);
}
if (jobInfo.isFilled()) {
if (jobInfo.isFilled() && !jobInfo.startTestCalled) {
jobInfo.startTestCalled = true;
LOG.info(new ObjectMessage(Map.of("Message", "Job " + agentData.getJobId() + " is filled with " + jobInfo.agentData.size() + " agents. Spawning thread to start test.")));
new Thread( () -> { startTest(jobInfo); }).start();
}
} catch (Exception e) {
LOG.error(new ObjectMessage(Map.of("Message", "CRITICAL ERROR: Exception in registerAgentForJob for agent " + agentData.getInstanceId() + " in job " + agentData.getJobId())), e);
throw e; // Re-throw to ensure caller knows registration failed
}
}
}
return ret;
Expand All @@ -189,6 +197,7 @@ private void startTest(final JobInfo info) {
ControllerLoggingConfig.setupThreadContext();
String jobId = info.jobRequest.getId();
LOG.info(new ObjectMessage(Map.of("Message","Sleeping for 30 seconds before starting test, to give time for last agent to process AgentTestStartData.")));
LOG.info(new ObjectMessage(Map.of("Message", "Job " + jobId + " has " + info.agentData.size() + " agents registered: " + info.agentData.stream().map(AgentData::getInstanceId).collect(Collectors.toList()))));
try {
Thread.sleep(RETRY_SLEEP);// 30 seconds
} catch (InterruptedException ignored) { }
Expand All @@ -208,6 +217,9 @@ private void startTest(final JobInfo info) {
}
LOG.info(new ObjectMessage(Map.of("Message", "Start agents command received - Sending start commands for job " + jobId + " asynchronously to following agents: " +
info.agentData.stream().collect(Collectors.toMap(AgentData::getInstanceId, AgentData::getInstanceUrl)))));
} else {
LOG.info(new ObjectMessage(Map.of("Message", "Sending start commands for job " + jobId + " to " + info.agentData.size() + " agents: " +
info.agentData.stream().collect(Collectors.toMap(AgentData::getInstanceId, AgentData::getInstanceUrl)))));
}
info.agentData.parallelStream()
.map(agentData -> agentData.getInstanceUrl() + AgentCommand.start.getPath())
Expand Down Expand Up @@ -367,6 +379,7 @@ private static class JobInfo {
private Map<RegionRequest, Integer> userMap = new HashMap<RegionRequest, Integer>();
private int numberOfMachines;
private final CountDownLatch latch = new CountDownLatch(1);
private boolean startTestCalled = false;

public JobInfo(JobRequest jobRequest) {
super();
Expand Down Expand Up @@ -400,12 +413,20 @@ public int getUsers(AgentData agent) {
if (region != r.getRegion()) continue;

if (workloadType.equals(IncrementStrategy.increasing) && Integer.parseInt(r.getUsers()) > 0) {
int numUsersRemaining = userMap.get(r);
Integer numUsersRemaining = userMap.get(r);
if (numUsersRemaining == null) {
LOG.error(new ObjectMessage(Map.of("Message", "CRITICAL BUG: Agent " + agent.getInstanceId() + " from region " + region + " not found in userMap! userMap keys: " + userMap.keySet() + ", jobRequest regions: " + jobRequest.getRegions())));
return 0; // Return 0 users to prevent NPE, but this agent won't be added
}
ret = Math.min(agent.getCapacity(), numUsersRemaining);
userMap.put(r, numUsersRemaining - ret);
break;
} else if (Integer.parseInt(r.getPercentage()) > 0) {
int numAgentsRemaining = userMap.get(r);
Integer numAgentsRemaining = userMap.get(r);
if (numAgentsRemaining == null) {
LOG.error(new ObjectMessage(Map.of("Message", "CRITICAL BUG: Agent " + agent.getInstanceId() + " from region " + region + " not found in userMap for percentage-based workload! userMap keys: " + userMap.keySet())));
return 0;
}
userMap.put(r, numAgentsRemaining - 1);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,17 @@ private void addStatusToJobContainer(CloudVmStatus status, CloudVmStatusContaine
// look up the job
JobInstance job = jobInstanceDao.get().findById(Integer.parseInt(status.getJobId()));
for (CloudVmStatus s : cloudVmStatusContainer.getStatuses()) {
VMStatus vmStatus = s.getVmStatus();

// Skip inactive instances (terminated, stopped, shutting_down, stopping)
if (vmStatus == VMStatus.terminated || vmStatus == VMStatus.stopped ||
vmStatus == VMStatus.shutting_down || vmStatus == VMStatus.stopping) {
LOG.debug(new ObjectMessage(Map.of("Message",
"Skipping inactive instance " + s.getInstanceId() +
" (VMStatus=" + vmStatus + ") in job status calculation")));
continue;
}

JobStatus jobStatus = s.getJobStatus();
if (jobStatus != JobStatus.Completed) { // If no VMs are Completed
isFinished = false;
Expand Down