From be550ebac5eaf7284421fb1fca74c4f09b1aabe6 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 28 Feb 2023 20:17:25 +0100 Subject: [PATCH] do not use interruptions to abort execution --- .../org/lucares/pdb/api/AbortException.java | 13 +++- .../java/org/lucares/pdb/api/JobAborter.java | 13 ++++ .../visualization-page.component.ts | 10 +-- .../org/lucares/recommind/logs/Plotter.java | 2 + pdb-ui/build.gradle | 1 + .../java/org/lucares/pdbui/PdbController.java | 21 ++++-- .../main/java/org/lucares/pdbui/job/Job.java | 8 +++ .../org/lucares/pdbui/job/JobService.java | 64 +++++++++++++++---- 8 files changed, 108 insertions(+), 24 deletions(-) create mode 100644 pdb-api/src/main/java/org/lucares/pdb/api/JobAborter.java diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/AbortException.java b/pdb-api/src/main/java/org/lucares/pdb/api/AbortException.java index 0638932..50df990 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/AbortException.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/AbortException.java @@ -12,8 +12,19 @@ public class AbortException extends RuntimeException { super(cause); } + public static ThreadLocal IS_CANCELLED = new ThreadLocal<>(); + + public static void setAborter(final JobAborter jobAborter) { + IS_CANCELLED.set(jobAborter); + } + + public static void cancel() { + IS_CANCELLED.get().cancel(); + } + public static void abortIfInterrupted() throws AbortException { - if (Thread.interrupted()) { + final JobAborter aborter = IS_CANCELLED.get(); + if (aborter != null && Boolean.TRUE.equals(IS_CANCELLED.get().isCancelled())) { throw new AbortException(); } } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/JobAborter.java b/pdb-api/src/main/java/org/lucares/pdb/api/JobAborter.java new file mode 100644 index 0000000..d244b3e --- /dev/null +++ b/pdb-api/src/main/java/org/lucares/pdb/api/JobAborter.java @@ -0,0 +1,13 @@ +package org.lucares.pdb.api; + +public class JobAborter { + private boolean isCancelled = false; + + public void cancel() { + isCancelled = true; + } + + public boolean isCancelled() { + return isCancelled; + } +} diff --git a/pdb-js/src/app/visualization-page/visualization-page.component.ts b/pdb-js/src/app/visualization-page/visualization-page.component.ts index c00017c..46aa59a 100644 --- a/pdb-js/src/app/visualization-page/visualization-page.component.ts +++ b/pdb-js/src/app/visualization-page/visualization-page.component.ts @@ -138,11 +138,11 @@ export class VisualizationPageComponent implements OnInit { abort() { this.plotService.abort(this.submitterId).subscribe({ complete: () => { - this.plotView.imageUrl = ''; - this.plotView.stats = null; - this.plotJobActive = false; - this.showError("Job aborted"); - document.dispatchEvent(new Event("invadersPause", {})); + //this.plotView.imageUrl = ''; + //this.plotView.stats = null; + //this.plotJobActive = false; + //this.showError("Job aborted"); + //document.dispatchEvent(new Event("invadersPause", {})); } }); } diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java index 6d85bc1..d9ae001 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java @@ -96,6 +96,8 @@ public class Plotter { if (dataSerie.getValues() > 0) { dataSeries.add(dataSerie); } + } catch (final AbortException e) { + throw e; } catch (final Exception e) { throw new IllegalStateException(e); } diff --git a/pdb-ui/build.gradle b/pdb-ui/build.gradle index df69cd6..66813e3 100644 --- a/pdb-ui/build.gradle +++ b/pdb-ui/build.gradle @@ -10,6 +10,7 @@ applicationDefaultJvmArgs = [ ] dependencies { + implementation project(':pdb-api') implementation project(':performanceDb') implementation project(':pdb-plotting') implementation project(':pdb-js') diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java index 2361aad..7cd49d9 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java @@ -11,13 +11,13 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; +import org.lucares.pdb.api.AbortException; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; @@ -32,6 +32,7 @@ import org.lucares.pdbui.domain.PlotResponse; import org.lucares.pdbui.domain.PlotResponseStats; import org.lucares.pdbui.job.Job; import org.lucares.pdbui.job.JobService; +import org.lucares.pdbui.job.JobService.JobEntry; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.InternalPlottingException; import org.lucares.recommind.logs.NoDataPointsException; @@ -104,9 +105,7 @@ public class PdbController implements HardcodedValues, PropertyKeys { throw new BadRequest("The query must not be empty!"); } - // TODO the UI should cancel requests that are in flight before sending a plot - // request - final Future> future = jobService + final JobEntry> jobEntry = jobService .runJob(new Job>(request.getSubmitterId()) { @Override @@ -140,18 +139,26 @@ public class PdbController implements HardcodedValues, PropertyKeys { final long deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); while (System.currentTimeMillis() < deadline) { try { - return future.get(1, TimeUnit.SECONDS); + final ResponseEntity responseEntity = jobEntry.getFuture().get(1, TimeUnit.SECONDS); + return responseEntity; } catch (final TimeoutException e) { LOGGER.info("job for submitter {} still running", request.getSubmitterId()); } catch (final CancellationException e) { throw new PlotAbortedWebException(); + } catch (final ExecutionException e) { + if (e.getCause() instanceof AbortException) { + throw new PlotAbortedWebException(); + } else { + LOGGER.error(e.getMessage(), e); + throw new InternalServerError("Internal Server Error", e); + } } } LOGGER.info("job for submitter {} timed out, will cancel the job", request.getSubmitterId()); - future.cancel(true); + jobEntry.getJobThreadLocal().cancel(); throw new InternalServerError("Internal Server Error"); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException e) { LOGGER.error(e.getMessage(), e); throw new InternalServerError("Internal Server Error", e); } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/job/Job.java b/pdb-ui/src/main/java/org/lucares/pdbui/job/Job.java index 86123b6..c1d755e 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/job/Job.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/job/Job.java @@ -1,9 +1,13 @@ package org.lucares.pdbui.job; +import org.lucares.pdb.api.JobAborter; + public abstract class Job { private final String submitterId; + private final JobAborter jobAborter = new JobAborter(); + public Job(final String submitterId) { this.submitterId = submitterId; } @@ -13,4 +17,8 @@ public abstract class Job { } public abstract T executeJob(); + + public JobAborter getJobAborter() { + return jobAborter; + } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/job/JobService.java b/pdb-ui/src/main/java/org/lucares/pdbui/job/JobService.java index c36a588..8cc7b3a 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/job/JobService.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/job/JobService.java @@ -11,6 +11,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.lucares.pdb.api.AbortException; +import org.lucares.pdb.api.JobAborter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; @@ -21,23 +23,50 @@ public class JobService implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(JobService.class); - private static final int MAX_JOBS = 1; + private static final int MAX_JOBS = 5; + + public static class JobEntry { + private final Future future; + + private final Job job; + + private final JobAborter jobAborter; + + public JobEntry(final Future future, final Job job, final JobAborter jobThreadLocal) { + super(); + this.future = future; + this.job = job; + this.jobAborter = jobThreadLocal; + } + + public Future getFuture() { + return future; + } + + public Job getJob() { + return job; + } + + public JobAborter getJobThreadLocal() { + return jobAborter; + } + } private final ExecutorService threadPool = new ThreadPoolExecutor(0, MAX_JOBS, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new CustomizableThreadFactory("jobs")); - private final Map> jobs = new ConcurrentHashMap<>(); + private final Map> jobs = new ConcurrentHashMap<>(); @SuppressWarnings("unchecked") - public Future runJob(final Job job) { + public JobEntry runJob(final Job job) { - return (Future) jobs.compute(job.getSubmitterId(), (submitterId, oldFuture) -> { + return (JobEntry) jobs.compute(job.getSubmitterId(), (submitterId, oldJobEntry) -> { - if (oldFuture != null) { + if (oldJobEntry != null) { LOGGER.info("cancelling old job"); - oldFuture.cancel(true); + oldJobEntry.getJobThreadLocal().cancel(); try { - oldFuture.get(5, TimeUnit.SECONDS); + oldJobEntry.getFuture().get(5, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException e) { LOGGER.info("old job finished with exception", e); } catch (final CancellationException e) { @@ -46,15 +75,18 @@ public class JobService implements AutoCloseable { LOGGER.info("old job did not finish within 5 seconds"); } } + final JobAborter jobAborter = job.getJobAborter(); - return threadPool.submit(() -> { + final Future future = threadPool.submit(() -> { try { + AbortException.setAborter(jobAborter); LOGGER.info("starting job for submitter {}", job.getSubmitterId()); return job.executeJob(); } finally { jobs.remove(job.getSubmitterId()); } }); + return new JobEntry(future, job, jobAborter); }); } @@ -70,15 +102,25 @@ public class JobService implements AutoCloseable { jobs.computeIfPresent(submitterId, (_submitterId, jobFuture) -> { LOGGER.info("cancelling old job"); - jobFuture.cancel(true); + jobFuture.getJobThreadLocal().cancel(); + int waitTime = 0; try { - jobFuture.get(5, TimeUnit.SECONDS); + + final long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5); + + while (System.currentTimeMillis() < deadline) { + waitTime = Math.min(waitTime + 2, 500); + jobFuture.getFuture().get(waitTime, TimeUnit.MILLISECONDS); + return null; + } + LOGGER.info("old job did not finish within 5 seconds"); } catch (InterruptedException | ExecutionException e) { LOGGER.info("old job finished with exception", e); } catch (final CancellationException e) { LOGGER.info("cancelled job for submitter {}", submitterId); } catch (final TimeoutException e) { - LOGGER.info("old job did not finish within 5 seconds"); + // LOGGER.info("old job did not finish within 5 seconds"); + LOGGER.info("polling wait time was {}", waitTime); } return null;