do not use interruptions to abort execution

This commit is contained in:
2023-02-28 20:17:25 +01:00
parent eb9904a30b
commit be550ebac5
8 changed files with 108 additions and 24 deletions

View File

@@ -12,8 +12,19 @@ public class AbortException extends RuntimeException {
super(cause); super(cause);
} }
public static ThreadLocal<JobAborter> IS_CANCELLED = new ThreadLocal<>();
public static void setAborter(final JobAborter jobAborter) {
IS_CANCELLED.set(jobAborter);
}
public static void cancel() {
IS_CANCELLED.get().cancel();
}
public static void abortIfInterrupted() throws AbortException { public static void abortIfInterrupted() throws AbortException {
if (Thread.interrupted()) { final JobAborter aborter = IS_CANCELLED.get();
if (aborter != null && Boolean.TRUE.equals(IS_CANCELLED.get().isCancelled())) {
throw new AbortException(); throw new AbortException();
} }
} }

View File

@@ -0,0 +1,13 @@
package org.lucares.pdb.api;
public class JobAborter {
private boolean isCancelled = false;
public void cancel() {
isCancelled = true;
}
public boolean isCancelled() {
return isCancelled;
}
}

View File

@@ -138,11 +138,11 @@ export class VisualizationPageComponent implements OnInit {
abort() { abort() {
this.plotService.abort(this.submitterId).subscribe({ this.plotService.abort(this.submitterId).subscribe({
complete: () => { complete: () => {
this.plotView.imageUrl = ''; //this.plotView.imageUrl = '';
this.plotView.stats = null; //this.plotView.stats = null;
this.plotJobActive = false; //this.plotJobActive = false;
this.showError("Job aborted"); //this.showError("Job aborted");
document.dispatchEvent(new Event("invadersPause", {})); //document.dispatchEvent(new Event("invadersPause", {}));
} }
}); });
} }

View File

@@ -96,6 +96,8 @@ public class Plotter {
if (dataSerie.getValues() > 0) { if (dataSerie.getValues() > 0) {
dataSeries.add(dataSerie); dataSeries.add(dataSerie);
} }
} catch (final AbortException e) {
throw e;
} catch (final Exception e) { } catch (final Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }

View File

@@ -10,6 +10,7 @@ applicationDefaultJvmArgs = [
] ]
dependencies { dependencies {
implementation project(':pdb-api')
implementation project(':performanceDb') implementation project(':performanceDb')
implementation project(':pdb-plotting') implementation project(':pdb-plotting')
implementation project(':pdb-js') implementation project(':pdb-js')

View File

@@ -11,13 +11,13 @@ import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.lucares.pdb.api.AbortException;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker;
import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode;
@@ -32,6 +32,7 @@ import org.lucares.pdbui.domain.PlotResponse;
import org.lucares.pdbui.domain.PlotResponseStats; import org.lucares.pdbui.domain.PlotResponseStats;
import org.lucares.pdbui.job.Job; import org.lucares.pdbui.job.Job;
import org.lucares.pdbui.job.JobService; import org.lucares.pdbui.job.JobService;
import org.lucares.pdbui.job.JobService.JobEntry;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.InternalPlottingException; import org.lucares.recommind.logs.InternalPlottingException;
import org.lucares.recommind.logs.NoDataPointsException; import org.lucares.recommind.logs.NoDataPointsException;
@@ -104,9 +105,7 @@ public class PdbController implements HardcodedValues, PropertyKeys {
throw new BadRequest("The query must not be empty!"); throw new BadRequest("The query must not be empty!");
} }
// TODO the UI should cancel requests that are in flight before sending a plot final JobEntry<ResponseEntity<PlotResponse>> jobEntry = jobService
// request
final Future<ResponseEntity<PlotResponse>> future = jobService
.runJob(new Job<ResponseEntity<PlotResponse>>(request.getSubmitterId()) { .runJob(new Job<ResponseEntity<PlotResponse>>(request.getSubmitterId()) {
@Override @Override
@@ -140,18 +139,26 @@ public class PdbController implements HardcodedValues, PropertyKeys {
final long deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); final long deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
try { try {
return future.get(1, TimeUnit.SECONDS); final ResponseEntity<PlotResponse> responseEntity = jobEntry.getFuture().get(1, TimeUnit.SECONDS);
return responseEntity;
} catch (final TimeoutException e) { } catch (final TimeoutException e) {
LOGGER.info("job for submitter {} still running", request.getSubmitterId()); LOGGER.info("job for submitter {} still running", request.getSubmitterId());
} catch (final CancellationException e) { } catch (final CancellationException e) {
throw new PlotAbortedWebException(); throw new PlotAbortedWebException();
} catch (final ExecutionException e) {
if (e.getCause() instanceof AbortException) {
throw new PlotAbortedWebException();
} else {
LOGGER.error(e.getMessage(), e);
throw new InternalServerError("Internal Server Error", e);
}
} }
} }
LOGGER.info("job for submitter {} timed out, will cancel the job", request.getSubmitterId()); LOGGER.info("job for submitter {} timed out, will cancel the job", request.getSubmitterId());
future.cancel(true); jobEntry.getJobThreadLocal().cancel();
throw new InternalServerError("Internal Server Error"); throw new InternalServerError("Internal Server Error");
} catch (InterruptedException | ExecutionException e) { } catch (final InterruptedException e) {
LOGGER.error(e.getMessage(), e); LOGGER.error(e.getMessage(), e);
throw new InternalServerError("Internal Server Error", e); throw new InternalServerError("Internal Server Error", e);
} }

View File

@@ -1,9 +1,13 @@
package org.lucares.pdbui.job; package org.lucares.pdbui.job;
import org.lucares.pdb.api.JobAborter;
public abstract class Job<T> { public abstract class Job<T> {
private final String submitterId; private final String submitterId;
private final JobAborter jobAborter = new JobAborter();
public Job(final String submitterId) { public Job(final String submitterId) {
this.submitterId = submitterId; this.submitterId = submitterId;
} }
@@ -13,4 +17,8 @@ public abstract class Job<T> {
} }
public abstract T executeJob(); public abstract T executeJob();
public JobAborter getJobAborter() {
return jobAborter;
}
} }

View File

@@ -11,6 +11,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.lucares.pdb.api.AbortException;
import org.lucares.pdb.api.JobAborter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
@@ -21,23 +23,50 @@ public class JobService implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(JobService.class); private static final Logger LOGGER = LoggerFactory.getLogger(JobService.class);
private static final int MAX_JOBS = 1; private static final int MAX_JOBS = 5;
public static class JobEntry<T> {
private final Future<T> future;
private final Job<T> job;
private final JobAborter jobAborter;
public JobEntry(final Future<T> future, final Job<T> job, final JobAborter jobThreadLocal) {
super();
this.future = future;
this.job = job;
this.jobAborter = jobThreadLocal;
}
public Future<T> getFuture() {
return future;
}
public Job<T> getJob() {
return job;
}
public JobAborter getJobThreadLocal() {
return jobAborter;
}
}
private final ExecutorService threadPool = new ThreadPoolExecutor(0, MAX_JOBS, 1L, TimeUnit.SECONDS, private final ExecutorService threadPool = new ThreadPoolExecutor(0, MAX_JOBS, 1L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), new CustomizableThreadFactory("jobs")); new ArrayBlockingQueue<>(1), new CustomizableThreadFactory("jobs"));
private final Map<String, Future<?>> jobs = new ConcurrentHashMap<>(); private final Map<String, JobEntry<?>> jobs = new ConcurrentHashMap<>();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> Future<T> runJob(final Job<T> job) { public <T> JobEntry<T> runJob(final Job<T> job) {
return (Future<T>) jobs.compute(job.getSubmitterId(), (submitterId, oldFuture) -> { return (JobEntry<T>) jobs.compute(job.getSubmitterId(), (submitterId, oldJobEntry) -> {
if (oldFuture != null) { if (oldJobEntry != null) {
LOGGER.info("cancelling old job"); LOGGER.info("cancelling old job");
oldFuture.cancel(true); oldJobEntry.getJobThreadLocal().cancel();
try { try {
oldFuture.get(5, TimeUnit.SECONDS); oldJobEntry.getFuture().get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
LOGGER.info("old job finished with exception", e); LOGGER.info("old job finished with exception", e);
} catch (final CancellationException e) { } catch (final CancellationException e) {
@@ -46,15 +75,18 @@ public class JobService implements AutoCloseable {
LOGGER.info("old job did not finish within 5 seconds"); LOGGER.info("old job did not finish within 5 seconds");
} }
} }
final JobAborter jobAborter = job.getJobAborter();
return threadPool.submit(() -> { final Future<T> future = threadPool.submit(() -> {
try { try {
AbortException.setAborter(jobAborter);
LOGGER.info("starting job for submitter {}", job.getSubmitterId()); LOGGER.info("starting job for submitter {}", job.getSubmitterId());
return job.executeJob(); return job.executeJob();
} finally { } finally {
jobs.remove(job.getSubmitterId()); jobs.remove(job.getSubmitterId());
} }
}); });
return new JobEntry<T>(future, job, jobAborter);
}); });
} }
@@ -70,15 +102,25 @@ public class JobService implements AutoCloseable {
jobs.computeIfPresent(submitterId, (_submitterId, jobFuture) -> { jobs.computeIfPresent(submitterId, (_submitterId, jobFuture) -> {
LOGGER.info("cancelling old job"); LOGGER.info("cancelling old job");
jobFuture.cancel(true); jobFuture.getJobThreadLocal().cancel();
int waitTime = 0;
try { try {
jobFuture.get(5, TimeUnit.SECONDS);
final long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5);
while (System.currentTimeMillis() < deadline) {
waitTime = Math.min(waitTime + 2, 500);
jobFuture.getFuture().get(waitTime, TimeUnit.MILLISECONDS);
return null;
}
LOGGER.info("old job did not finish within 5 seconds");
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
LOGGER.info("old job finished with exception", e); LOGGER.info("old job finished with exception", e);
} catch (final CancellationException e) { } catch (final CancellationException e) {
LOGGER.info("cancelled job for submitter {}", submitterId); LOGGER.info("cancelled job for submitter {}", submitterId);
} catch (final TimeoutException e) { } catch (final TimeoutException e) {
LOGGER.info("old job did not finish within 5 seconds"); // LOGGER.info("old job did not finish within 5 seconds");
LOGGER.info("polling wait time was {}", waitTime);
} }
return null; return null;