From 7b8157e5110c886c4f7ec7d063b482787037cc29 Mon Sep 17 00:00:00 2001 From: Peter Alexander <peter@blackhillock.co.uk> Date: Tue, 4 Aug 2015 15:45:37 +0100 Subject: [PATCH] Wait for LPJ yield data --- config.properties | 2 + src/ac/ed/lurg/ModelConfig.java | 1 + src/ac/ed/lurg/utils/WatchDir.java | 152 ++++++++++++++++++ src/ac/ed/lurg/utils/WatchForFile.java | 83 ++++++++++ .../lurg/yield/LPJYieldResponseMapReader.java | 15 +- 5 files changed, 251 insertions(+), 2 deletions(-) create mode 100644 config.properties create mode 100644 src/ac/ed/lurg/utils/WatchDir.java create mode 100644 src/ac/ed/lurg/utils/WatchForFile.java diff --git a/config.properties b/config.properties new file mode 100644 index 00000000..c87e4957 --- /dev/null +++ b/config.properties @@ -0,0 +1,2 @@ +BASE_DIR=/Users/peteralexander/Documents/R_Workspace/UNPLUM +YIELD_DIR=/Users/peteralexander/Documents/LURG/LPJ/ \ No newline at end of file diff --git a/src/ac/ed/lurg/ModelConfig.java b/src/ac/ed/lurg/ModelConfig.java index 827d3bbd..3fbba9a1 100644 --- a/src/ac/ed/lurg/ModelConfig.java +++ b/src/ac/ed/lurg/ModelConfig.java @@ -116,5 +116,6 @@ public class ModelConfig { // Output public static final boolean WRITE_JPEG_IMAGES = getBooleanProperty("WRITE_JPEG_IMAGES", Boolean.TRUE); + public static final int LPJG_MONITOR_TIMEOUT_SEC = getIntProperty("LPJG_MONITOR_TIMEOUT", 60*60*2); } \ No newline at end of file diff --git a/src/ac/ed/lurg/utils/WatchDir.java b/src/ac/ed/lurg/utils/WatchDir.java new file mode 100644 index 00000000..a6bb75fa --- /dev/null +++ b/src/ac/ed/lurg/utils/WatchDir.java @@ -0,0 +1,152 @@ +package ac.ed.lurg.utils; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchEvent.Kind; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.HashMap; +import java.util.Map; + +public class WatchDir { + + private final WatchService watcher; + private final Map<WatchKey,Path> keys; + private boolean recursive; + private final Callback callback; + private boolean stop = false; + + public static interface Callback { + public void dirChange(Path path, Kind<?> kind); + } + + public void stop() { + try { + watcher.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + this.stop = true; + } + + @SuppressWarnings("unchecked") + static <T> WatchEvent<T> cast(WatchEvent<?> event) { + return (WatchEvent<T>)event; + } + + /** + * Register the given directory with the WatchService + */ + private void register(Path dir, Kind<?>... events) throws IOException { + + if (events == null || events.length == 0) // if events are not specified call with all types + register(dir, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY); + else { + WatchKey key = dir.register(watcher, events); + keys.put(key, dir); + } + } + /** + * Register the given directory, and all its sub-directories, with the + * WatchService. + */ + private void registerAll(final Path start, final Kind<?>... events) throws IOException { + // register directory and sub-directories + Files.walkFileTree(start, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + register(dir, events); + return FileVisitResult.CONTINUE; + } + }); + } + + /** + * Creates a WatchService and registers the given directory + */ + public WatchDir(Path dir, boolean recursive, Callback callback, Kind<?>... events) throws IOException { + this.watcher = FileSystems.getDefault().newWatchService(); + this.keys = new HashMap<WatchKey,Path>(); + this.callback = callback; + this.recursive = recursive; + + if (recursive) { + LogWriter.println(String.format("Scanning %s ...", dir)); + registerAll(dir, events); + } + else { + register(dir, events); + } + } + + /** + * Process all events for keys queued to the watcher + */ + public void processEvents() { + while (!stop) { + + // wait for key to be signalled + WatchKey key; + try { + key = watcher.take(); + } catch (InterruptedException x) { + return; + } + + Path dir = keys.get(key); + if (dir == null) { + LogWriter.printlnError("WatchKey not recognized!!"); + continue; + } + + for (WatchEvent<?> event: key.pollEvents()) { + Kind<?> kind = event.kind(); + + // TBD - provide example of how OVERFLOW event is handled + if (kind == StandardWatchEventKinds.OVERFLOW) { + continue; + } + + // Context for directory entry event is the file name of entry + WatchEvent<Path> ev = cast(event); + Path name = ev.context(); + Path child = dir.resolve(name); + + //LogWriter.format("%s: %s\n", kind.name(), child); + callback.dirChange(child, kind); + + // if directory is created, and watching recursively, then + // register it and its sub-directories + if (recursive && (kind == StandardWatchEventKinds.ENTRY_CREATE)) { + try { + if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) { + registerAll(child); + } + } catch (IOException x) { + LogWriter.printlnError("WatchDir. Error on event " + x); + } + } + } + + // reset key and remove from set if directory no longer accessible + boolean valid = key.reset(); + if (!valid) { + keys.remove(key); + + // all directories are inaccessible + if (keys.isEmpty()) { + break; + } + } + } + } +} \ No newline at end of file diff --git a/src/ac/ed/lurg/utils/WatchForFile.java b/src/ac/ed/lurg/utils/WatchForFile.java new file mode 100644 index 00000000..4ac0f3e2 --- /dev/null +++ b/src/ac/ed/lurg/utils/WatchForFile.java @@ -0,0 +1,83 @@ +package ac.ed.lurg.utils; + +import java.io.File; +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent.Kind; +import java.util.Observable; + +public class WatchForFile extends Observable implements WatchDir.Callback { + + private File fileToWaitFor; + private WatchDir watchDir; + private Thread thread; + + public WatchForFile(final File fileToWaitFor) { + this.fileToWaitFor = fileToWaitFor; + + if (fileToWaitFor.exists()) { + LogWriter.println("Marker file already exists: " + fileToWaitFor); + return; + } + + thread = new Thread("Watch Directory Thread") { + @Override + public void run() { + + Path dirToMonitor = fileToWaitFor.getParentFile().getParentFile().toPath(); // goes up a directory level as lowest directory not always exists before + + try { + watchDir = new WatchDir(dirToMonitor, true, WatchForFile.this, StandardWatchEventKinds.ENTRY_CREATE); + checkForFile(); // just in case file has been created before we get here + watchDir.processEvents(); + } + catch (IOException e) { + LogWriter.printlnError("Problem watching directory " + e.getMessage()); + throw new RuntimeException(e); + } + catch (ClosedWatchServiceException e) { + // LogWriter.println("Service still updating when closed. Done it's job so doesn't matter."); + } + } + }; + thread.start(); // the thread terminated by running through as while loop ended by watchDir.stop() + } + + @Override + public void dirChange(Path path, Kind<?> kind) { + LogWriter.println(String.format("Got %s: %s", kind.name(), path)); + checkForFile(); + } + + public synchronized boolean checkForFile() { + if (fileToWaitFor.exists()) { + LogWriter.println("Found our marker file"); + stop(); + notify(); + return true; + } + else + return false; + } + + private void stop() { + if (watchDir != null) + watchDir.stop(); + } + + public synchronized boolean await(long timeoutSec) { + if (!fileToWaitFor.exists()) { + try { + wait(timeoutSec * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + finally { + stop(); + } + } + return fileToWaitFor.exists(); + } +} diff --git a/src/ac/ed/lurg/yield/LPJYieldResponseMapReader.java b/src/ac/ed/lurg/yield/LPJYieldResponseMapReader.java index 5a7e1573..4fe146ff 100644 --- a/src/ac/ed/lurg/yield/LPJYieldResponseMapReader.java +++ b/src/ac/ed/lurg/yield/LPJYieldResponseMapReader.java @@ -4,11 +4,13 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; +import ac.ed.lurg.ModelConfig; import ac.ed.lurg.types.CropType; import ac.ed.lurg.types.FertiliserRate; import ac.ed.lurg.types.IrrigationRate; import ac.ed.lurg.types.YieldType; import ac.ed.lurg.utils.LogWriter; +import ac.ed.lurg.utils.WatchForFile; import ac.sac.raster.RasterHeaderDetails; public class LPJYieldResponseMapReader { @@ -19,7 +21,6 @@ public class LPJYieldResponseMapReader { public LPJYieldResponseMapReader(String rootDir, RasterHeaderDetails rasterProj) { this.rootDir = rootDir; dataset = new YieldRaster(rasterProj); - } private String[] parseLine(String line) { @@ -28,11 +29,21 @@ public class LPJYieldResponseMapReader { public YieldRaster getRasterDataFromFile(FertiliserRate fr) { long startTime = System.currentTimeMillis(); + + // wait + WatchForFile fileWatcher = new WatchForFile(new File(rootDir + File.separator + "done")); + boolean foundFile = fileWatcher.await(ModelConfig.LPJG_MONITOR_TIMEOUT_SEC); + if (!foundFile) { + LogWriter.printlnError("Not able to find marker file. May have timed out."); + throw new RuntimeException("Not able to find marker file. May have timed out."); + } + + String filename = rootDir + File.separator + fr.getFileName(); + YieldType noIrrigYieldType = YieldType.getYieldType(fr, IrrigationRate.NO_IRRIG); // just do this look up once per file YieldType maxIrrigYieldType = YieldType.getYieldType(fr, IrrigationRate.MAX_IRRIG); int col=0; - String filename = rootDir + File.separator + fr.getFileName(); try { BufferedReader in = new BufferedReader(new FileReader(filename)); -- GitLab