add job service to be able to cancel plot requests

This commit is contained in:
2023-02-18 17:36:54 +01:00
parent 8c410fac4a
commit ed448af78c
18 changed files with 296 additions and 38 deletions

View File

@@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.AbortException;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.QueryWithCaretMarker;
@@ -225,6 +226,7 @@ public class DataStore implements AutoCloseable {
private List<PdbFile> toPdbFiles(final List<Doc> searchResult) {
final List<PdbFile> result = new ArrayList<>(searchResult.size());
for (final Doc document : searchResult) {
AbortException.abortIfInterrupted();
final ParititionId partitionId = document.getPartitionId();
final long rootBlockNumber = document.getRootBlockNumber();

View File

@@ -8,6 +8,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.AbortException;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.api.Tag;
@@ -151,6 +152,7 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<PartitionLongLis
final long start = System.nanoTime();
final Set<ParititionId> availablePartitionIds = keyToValueToDocId.getAvailablePartitionIds(datePartitioner);
for (final ParititionId partitionId : availablePartitionIds) {
AbortException.abortIfInterrupted();
final List<LongList> docIdsForPartition = new ArrayList<>();
keyToValueToDocId.visitValues(partitionId, stringCompressor.createTag(propertyName, ""),
(tag, blockOffsetToDocIds) -> {

View File

@@ -0,0 +1,21 @@
package org.lucares.pdb.api;
public class AbortException extends RuntimeException {
private static final long serialVersionUID = 7614132985675048490L;
public AbortException() {
super();
}
public AbortException(final Throwable cause) {
super(cause);
}
public static void abortIfInterrupted() throws AbortException {
if (Thread.interrupted()) {
throw new AbortException();
}
}
}

View File

@@ -113,6 +113,8 @@ export class GalleryViewComponent implements OnInit {
showDetails = false;
submitterId!: string;
@ViewChild(GalleryFilterView)
filter! : GalleryFilterView;
@@ -146,6 +148,7 @@ export class GalleryViewComponent implements OnInit {
this.totalNumberImages = 0;
this.splitByValuesQueue = new Array<string>();
this.sequenceId++;
this.plotService.abort(this.submitterId).subscribe({next: () => {}});
}
filteredSortedGalleryItems(): Array<GalleryItem> {
@@ -235,20 +238,24 @@ export class GalleryViewComponent implements OnInit {
renderGallery(request: PlotRequest, splitByField: string) {
const that=this;
this.submitterId = request.submitterId;
this.galleryItems.length = 0;
this.splitByValuesQueue.length = 0;
request.generateThumbnail = true;
this.plotService.splitQuery(request.query, splitByField).subscribe(function(valuesForSplitBy){
console.log("valuesForSplitBy: " + JSON.stringify(valuesForSplitBy));
that.splitByValuesQueue = valuesForSplitBy;
that.progress = 0;
that.totalNumberImages = that.splitByValuesQueue.length;
that.renderGalleryRecursively(request, splitByField);
},
error => {
that.showError(error.error.message);
this.plotService.splitQuery(request.query, splitByField).subscribe({
next: function(valuesForSplitBy){
console.log("valuesForSplitBy: " + JSON.stringify(valuesForSplitBy));
that.splitByValuesQueue = valuesForSplitBy;
that.progress = 0;
that.totalNumberImages = that.splitByValuesQueue.length;
that.renderGalleryRecursively(request, splitByField);
},
error: error => {
console.log(JSON.stringify(error));
that.showError(error.error.message);
}
});
}

View File

@@ -51,6 +51,10 @@ export class PlotService {
return this.http.get<AutocompleteResult>('//'+window.location.hostname+':'+window.location.port+'/api/autocomplete', options);
}
abort(submitterId: string): Observable<void>{
return this.http.delete<void>('//'+window.location.hostname+':'+window.location.port+'/api/plots/'+submitterId)
}
sendPlotRequest(plotRequest: PlotRequest): Observable<PlotResponse>{
//console.log("send plot request: "+ JSON.stringify(plotRequest));
@@ -219,6 +223,7 @@ export class PlotRequest {
public generateThumbnail : boolean,
public intervalUnit: string,
public intervalValue: number,
public submitterId: string,
public renderBarChartTickLabels: boolean = false){}
copy(): PlotRequest {

View File

@@ -67,7 +67,8 @@
<div id="plot-button-bar">
<button
*ngIf="!enableGallery"
*ngIf="!enableGallery && !plotJobActive"
[disabled]="plotJobActive"
mat-button
matTooltip="Create Plot"
(click)="plot()">
@@ -75,7 +76,7 @@
Plot
</button>
<button
*ngIf="enableGallery"
*ngIf="enableGallery && !plotJobActive"
mat-button
matTooltip="Create Gallery"
(click)="gallery()"
@@ -83,6 +84,11 @@
<img src="assets/img/four-squares-line.svg" class="icon-inline" aria-hidden="true" title="Create Gallery (only active if 'Split' is set)" />
Gallery
</button>
<button
*ngIf="plotJobActive"
mat-button
(click)="abort()"
matTooltip="abort"><img src="assets/img/close.svg" class="icon-inline" /> Abort</button>
</div>
</div>
</div>

View File

@@ -54,6 +54,10 @@ export class VisualizationPageComponent implements OnInit {
intervalValue = 1;
renderBarChartTickLabels = false;
submitterId = crypto.randomUUID();
plotJobActive = false;
constructor(private plotService: PlotService, private snackBar: MatSnackBar) {
}
@@ -103,10 +107,9 @@ export class VisualizationPageComponent implements OnInit {
gallery(){
if (this.splitBy != null){
const that = this;
this.plotView.imageUrl = '';
that.plotView.stats = null;
that.galleryView.show=true;
this.plotView.stats = null;
this.galleryView.show=true;
const request = this.createPlotRequest();
this.galleryView.renderGallery(request, this.splitBy.name);
} else {
@@ -116,8 +119,8 @@ export class VisualizationPageComponent implements OnInit {
getAxes() : AxesTypes {
var x = new Array<DataType>();
var y = new Array<DataType>();
const x = new Array<DataType>();
const y = new Array<DataType>();
for(var i = 0; i < this.selectedPlotType.length; i++){
var plotType = this.selectedPlotType[i];
@@ -132,11 +135,24 @@ export class VisualizationPageComponent implements OnInit {
return new AxesTypes(x,y);
}
abort() {
this.plotService.abort(this.submitterId).subscribe({
complete: () => {
this.plotView.imageUrl = '';
this.plotView.stats = null;
this.plotJobActive = false;
this.showError("Job aborted");
document.dispatchEvent(new Event("invadersPause", {}));
}
});
}
plot(){
const that = this;
that.plotView.imageUrl = '';
that.plotView.stats = null;
this.plotJobActive = true;
this.plotView.axes = this.getAxes();
console.log(JSON.stringify(this.getAxes()));
that.galleryView.show=false;
@@ -148,11 +164,14 @@ export class VisualizationPageComponent implements OnInit {
next: (plotResponse: PlotResponse) => {
this.plotView.imageUrl = "http://"+window.location.hostname+':'+window.location.port+'/'+plotResponse.imageUrl;
this.plotView.stats = plotResponse.stats;
this.plotJobActive = false;
document.dispatchEvent(new Event("invadersPause", {}));
},
error: (error:any) => {
console.log(JSON.stringify(error));
this.plotView.imageUrl = '';
this.plotView.stats = null;
this.plotJobActive = false;
this.showError(error.error.message);
document.dispatchEvent(new Event("invadersPause", {}));
}
@@ -184,6 +203,7 @@ export class VisualizationPageComponent implements OnInit {
this.enableGallery, // generateThumbnail
this.intervalUnit,
this.intervalValue,
this.submitterId,
this.renderBarChartTickLabels);
return request;
}

View File

@@ -13,6 +13,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.lucares.pdb.api.AbortException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,8 @@ public class Gnuplot {
public void plot(final GnuplotSettings settings, final Collection<DataSeries> dataSeries)
throws IOException, InterruptedException {
AbortException.abortIfInterrupted();
final GnuplotFileGenerator generator = new GnuplotFileGenerator();
final String gnuplotFileContent = generator.generate(settings, dataSeries);

View File

@@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.AbortException;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Query;
@@ -145,7 +146,7 @@ public class Plotter {
return new PlotResult(outputFile, dataSeries, thumbnail);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Plotting was interrupted.");
throw new AbortException();
} catch (final IOException e) {
throw new InternalPlottingException("Plotting failed: " + e.getMessage(), e);
} finally {
@@ -176,6 +177,8 @@ public class Plotter {
final Iterator<LongList> it = timeValueStream.iterator();
while (it.hasNext()) {
AbortException.abortIfInterrupted();
final LongList entry = it.next();
for (int i = 0; i < entry.size(); i += 2) {

View File

@@ -8,6 +8,10 @@ public class InternalServerError extends RuntimeException {
private static final long serialVersionUID = 548651821080252932L;
public InternalServerError(final String message) {
super(message);
}
public InternalServerError(final String message, final Throwable cause) {
super(message, cause);
}

View File

@@ -9,10 +9,12 @@ import java.util.Locale;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
@@ -28,6 +30,8 @@ import org.lucares.pdbui.domain.FilterDefaults;
import org.lucares.pdbui.domain.PlotRequest;
import org.lucares.pdbui.domain.PlotResponse;
import org.lucares.pdbui.domain.PlotResponseStats;
import org.lucares.pdbui.job.Job;
import org.lucares.pdbui.job.JobService;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.InternalPlottingException;
import org.lucares.recommind.logs.NoDataPointsException;
@@ -65,8 +69,6 @@ public class PdbController implements HardcodedValues, PropertyKeys {
private final Plotter plotter;
private final PerformanceDb db;
private final ReentrantLock plotterLock = new ReentrantLock();
@Value("${" + DEFAULTS_QUERY_EXAMPLES + ":}")
private String queryExamples;
@@ -78,10 +80,14 @@ public class PdbController implements HardcodedValues, PropertyKeys {
private final CsvUploadHandler csvUploadHandler;
public PdbController(final PerformanceDb db, final Plotter plotter, final CsvUploadHandler csvUploadHandler) {
private final JobService jobService;
public PdbController(final PerformanceDb db, final Plotter plotter, final CsvUploadHandler csvUploadHandler,
final JobService jobService) {
this.db = db;
this.plotter = plotter;
this.csvUploadHandler = csvUploadHandler;
this.jobService = jobService;
}
@RequestMapping(path = "/plots", //
@@ -100,32 +106,65 @@ public class PdbController implements HardcodedValues, PropertyKeys {
// TODO the UI should cancel requests that are in flight before sending a plot
// request
if (plotterLock.tryLock(5, TimeUnit.SECONDS)) {
try {
final PlotResult result = plotter.plot(plotSettings);
final Future<ResponseEntity<PlotResponse>> future = jobService
.runJob(new Job<ResponseEntity<PlotResponse>>(request.getSubmitterId()) {
final String imageUrl = WEB_IMAGE_OUTPUT_PATH + "/" + result.getImageName();
LOGGER.trace("image url: {}", imageUrl);
@Override
public ResponseEntity<PlotResponse> executeJob() {
try {
final PlotResult result = plotter.plot(plotSettings);
final String thumbnailUrl = result.getThumbnailPath() != null
? WEB_IMAGE_OUTPUT_PATH + "/" + result.getThumbnailName()
: "img/no-thumbnail.png";
final String imageUrl = WEB_IMAGE_OUTPUT_PATH + "/" + result.getImageName();
LOGGER.trace("image url: {}", imageUrl);
final PlotResponseStats stats = PlotResponseStats.fromDataSeries(result.getDataSeries());
final PlotResponse plotResponse = new PlotResponse(stats, imageUrl, thumbnailUrl);
final String thumbnailUrl = result.getThumbnailPath() != null
? WEB_IMAGE_OUTPUT_PATH + "/" + result.getThumbnailName()
: "img/no-thumbnail.png";
return ResponseEntity.ok().body(plotResponse);
} catch (final NoDataPointsException e) {
throw new NotFoundException("No data was found. Try another query, or change the date range.", e);
} finally {
plotterLock.unlock();
final PlotResponseStats stats = PlotResponseStats.fromDataSeries(result.getDataSeries());
final PlotResponse plotResponse = new PlotResponse(stats, imageUrl, thumbnailUrl);
return ResponseEntity.ok().body(plotResponse);
} catch (final NoDataPointsException e) {
throw new NotFoundException(
"No data was found. Try another query, or change the date range.", e);
} catch (final InternalPlottingException e) {
throw new InternalServerError("Internal Server Error", e);
} catch (final RejectedExecutionException e) {
throw new ServiceUnavailableException("Too many parallel requests!");
}
}
});
try {
final long deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
while (System.currentTimeMillis() < deadline) {
try {
return future.get(1, TimeUnit.SECONDS);
} catch (final TimeoutException e) {
LOGGER.info("job for submitter {} still running", request.getSubmitterId());
} catch (final CancellationException e) {
throw new PlotAbortedWebException();
}
}
LOGGER.info("job for submitter {} timed out, will cancel the job", request.getSubmitterId());
future.cancel(true);
throw new InternalServerError("Internal Server Error");
} else {
throw new ServiceUnavailableException("Too many parallel requests!");
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(e.getMessage(), e);
throw new InternalServerError("Internal Server Error", e);
}
}
@RequestMapping(path = "/plots/{submitterId}", //
method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.NO_CONTENT)
public void cancelJob(@PathVariable("submitterId") final String submitterId)
throws InternalPlottingException, InterruptedException {
jobService.cancel(submitterId);
}
/*
* @RequestMapping(path = "/plots", // method = RequestMethod.GET, // produces =
* MediaType.APPLICATION_OCTET_STREAM_VALUE // ) StreamingResponseBody

View File

@@ -0,0 +1,11 @@
package org.lucares.pdbui;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(value = HttpStatus.CONFLICT, reason = "Request Aborted")
public class PlotAbortedWebException extends RuntimeException {
private static final long serialVersionUID = -601662865253785050L;
}

View File

@@ -39,6 +39,8 @@ public class PlotRequest {
private boolean renderBarChartTickLabels;
private String submitterId;
public String getQuery() {
return query;
}
@@ -179,4 +181,13 @@ public class PlotRequest {
public void setRenderBarChartTickLabels(final boolean renderBarChartTickLabels) {
this.renderBarChartTickLabels = renderBarChartTickLabels;
}
public String getSubmitterId() {
return submitterId;
}
public void setSubmitterId(final String submitterId) {
this.submitterId = submitterId;
}
}

View File

@@ -0,0 +1,16 @@
package org.lucares.pdbui.job;
public abstract class Job<T> {
private final String submitterId;
public Job(final String submitterId) {
this.submitterId = submitterId;
}
public String getSubmitterId() {
return submitterId;
}
public abstract T executeJob();
}

View File

@@ -0,0 +1,14 @@
package org.lucares.pdbui.job;
public class JobContext {
private JobState jobState = JobState.ACTIVE;
public void cancelJob() {
jobState = JobState.CANCELLED;
}
public boolean isCancelled() {
return jobState == JobState.CANCELLED;
}
}

View File

@@ -0,0 +1,87 @@
package org.lucares.pdbui.job;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
@Component
public class JobService implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(JobService.class);
private static final int MAX_JOBS = 1;
private final ExecutorService threadPool = new ThreadPoolExecutor(0, MAX_JOBS, 1L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), new CustomizableThreadFactory("jobs"));
private final Map<String, Future<?>> jobs = new ConcurrentHashMap<>();
@SuppressWarnings("unchecked")
public <T> Future<T> runJob(final Job<T> job) {
return (Future<T>) jobs.compute(job.getSubmitterId(), (submitterId, oldFuture) -> {
if (oldFuture != null) {
LOGGER.info("cancelling old job");
oldFuture.cancel(true);
try {
oldFuture.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
LOGGER.info("old job finished with exception", e);
} catch (final CancellationException e) {
LOGGER.info("cancelled job for submitter {}", submitterId);
} catch (final TimeoutException e) {
LOGGER.info("old job did not finish within 5 seconds");
}
}
return threadPool.submit(() -> {
try {
LOGGER.info("starting job for submitter {}", job.getSubmitterId());
return job.executeJob();
} finally {
jobs.remove(job.getSubmitterId());
}
});
});
}
@Override
public void close() throws Exception {
threadPool.shutdownNow();
threadPool.awaitTermination(5, TimeUnit.SECONDS);
}
public void cancel(final String submitterId) {
jobs.computeIfPresent(submitterId, (_submitterId, jobFuture) -> {
LOGGER.info("cancelling old job");
jobFuture.cancel(true);
try {
jobFuture.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
LOGGER.info("old job finished with exception", e);
} catch (final CancellationException e) {
LOGGER.info("cancelled job for submitter {}", submitterId);
} catch (final TimeoutException e) {
LOGGER.info("old job did not finish within 5 seconds");
}
return null;
});
}
}

View File

@@ -0,0 +1,5 @@
package org.lucares.pdbui.job;
public enum JobState {
ACTIVE, CANCELLED
}

View File

@@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.AbortException;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Query;
@@ -192,6 +193,7 @@ public class PerformanceDb implements AutoCloseable {
private Result toResult(final Grouping grouping) {
final List<GroupResult> groupResults = new ArrayList<>();
for (final Group group : grouping.getGroups()) {
AbortException.abortIfInterrupted();
final Stream<LongList> stream = PdbFile.toStream(group.getFiles(), dataStore.getDiskStorage());
final GroupResult groupResult = new GroupResult(stream, group.getTags());
groupResults.add(groupResult);