fix poison mediawikiimporter output queue also after ExecutionException

in worker thread.
Writer of importer keeps needs a poison to close the file. On exception (e.g. OOM)
add a poison marker in outer most try/catch to assure output queue will terminate
in this condition too (and closes+renames the surrogate/in/xxx.prt file)
This commit is contained in:
reger 2015-12-28 02:32:00 +01:00
parent a7591d3ed0
commit 46ac0867ff

View File

@ -146,6 +146,11 @@ public class MediawikiImporter extends Thread implements Importer {
@Override
public void run() {
this.start = System.currentTimeMillis();
final int threads = Math.max(2, Runtime.getRuntime().availableProcessors() - 1);
// out keeps a outputfile open until poisened, to make sure underlaying thread gets the end condition
// regardless of any exception (e.g. eof memory) a add(poison) is added to the most outer final block
final BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
final wikiparserrecord poison = newRecord();
try {
String targetstub = this.sourcefile.getName();
int p = targetstub.lastIndexOf("\\.");
@ -161,10 +166,7 @@ public class MediawikiImporter extends Thread implements Importer {
StringBuilder sb = new StringBuilder();
boolean page = false, text = false;
String title = null;
final wikiparserrecord poison = newRecord();
final int threads = Math.max(2, Runtime.getRuntime().availableProcessors() - 1);
final BlockingQueue<wikiparserrecord> in = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
final BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
final ExecutorService service = Executors.newCachedThreadPool();
final convertConsumer[] consumers = new convertConsumer[threads];
final Future<?>[] consumerResults = (Future<?>[]) Array.newInstance(Future.class, threads);
@ -261,8 +263,6 @@ public class MediawikiImporter extends Thread implements Importer {
for (int i = 0; i < threads; i++) {
consumerResults[i].get(10000, TimeUnit.MILLISECONDS);
}
out.put(poison);
writerResult.get(10000, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
} catch (final ExecutionException e) {
@ -271,11 +271,18 @@ public class MediawikiImporter extends Thread implements Importer {
ConcurrentLog.logException(e);
} catch (final Exception e) {
ConcurrentLog.logException(e);
} finally {
out.put(poison); // output thread condition (for file.close)
writerResult.get(10000, TimeUnit.MILLISECONDS);
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
} catch (final Exception e) {
ConcurrentLog.logException(e);
} finally {
try {
out.put(poison); // out keeps output file open until poisened, to close file if exception happend in this block
} catch (InterruptedException ex) { }
}
}
@ -733,10 +740,12 @@ public class MediawikiImporter extends Thread implements Importer {
ConcurrentLog.logException(e);
} finally {
try {
this.osw.write(SurrogateReader.SURROGATES_MAIN_ELEMENT_CLOSE + "\n");
this.osw.close();
final String finalfilename = this.targetstub + "." + this.fc + ".xml";
new File(this.targetdir, this.outputfilename).renameTo(new File(this.targetdir, finalfilename));
if (osw != null) { // maybe null on poison (immediately)
this.osw.write(SurrogateReader.SURROGATES_MAIN_ELEMENT_CLOSE + "\n");
this.osw.close();
final String finalfilename = this.targetstub + "." + this.fc + ".xml";
new File(this.targetdir, this.outputfilename).renameTo(new File(this.targetdir, finalfilename));
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
}