Skip to content
Snippets Groups Projects
Commit 7b8157e5 authored by Peter Alexander's avatar Peter Alexander
Browse files

Wait for LPJ yield data

parent 3af87a83
No related branches found
No related tags found
No related merge requests found
BASE_DIR=/Users/peteralexander/Documents/R_Workspace/UNPLUM
YIELD_DIR=/Users/peteralexander/Documents/LURG/LPJ/
\ No newline at end of file
...@@ -116,5 +116,6 @@ public class ModelConfig { ...@@ -116,5 +116,6 @@ public class ModelConfig {
// Output // Output
public static final boolean WRITE_JPEG_IMAGES = getBooleanProperty("WRITE_JPEG_IMAGES", Boolean.TRUE); public static final boolean WRITE_JPEG_IMAGES = getBooleanProperty("WRITE_JPEG_IMAGES", Boolean.TRUE);
public static final int LPJG_MONITOR_TIMEOUT_SEC = getIntProperty("LPJG_MONITOR_TIMEOUT", 60*60*2);
} }
\ No newline at end of file
package ac.ed.lurg.utils;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
public class WatchDir {
private final WatchService watcher;
private final Map<WatchKey,Path> keys;
private boolean recursive;
private final Callback callback;
private boolean stop = false;
public static interface Callback {
public void dirChange(Path path, Kind<?> kind);
}
public void stop() {
try {
watcher.close();
}
catch (IOException e) {
e.printStackTrace();
}
this.stop = true;
}
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>)event;
}
/**
* Register the given directory with the WatchService
*/
private void register(Path dir, Kind<?>... events) throws IOException {
if (events == null || events.length == 0) // if events are not specified call with all types
register(dir, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
else {
WatchKey key = dir.register(watcher, events);
keys.put(key, dir);
}
}
/**
* Register the given directory, and all its sub-directories, with the
* WatchService.
*/
private void registerAll(final Path start, final Kind<?>... events) throws IOException {
// register directory and sub-directories
Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
register(dir, events);
return FileVisitResult.CONTINUE;
}
});
}
/**
* Creates a WatchService and registers the given directory
*/
public WatchDir(Path dir, boolean recursive, Callback callback, Kind<?>... events) throws IOException {
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<WatchKey,Path>();
this.callback = callback;
this.recursive = recursive;
if (recursive) {
LogWriter.println(String.format("Scanning %s ...", dir));
registerAll(dir, events);
}
else {
register(dir, events);
}
}
/**
* Process all events for keys queued to the watcher
*/
public void processEvents() {
while (!stop) {
// wait for key to be signalled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException x) {
return;
}
Path dir = keys.get(key);
if (dir == null) {
LogWriter.printlnError("WatchKey not recognized!!");
continue;
}
for (WatchEvent<?> event: key.pollEvents()) {
Kind<?> kind = event.kind();
// TBD - provide example of how OVERFLOW event is handled
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
//LogWriter.format("%s: %s\n", kind.name(), child);
callback.dirChange(child, kind);
// if directory is created, and watching recursively, then
// register it and its sub-directories
if (recursive && (kind == StandardWatchEventKinds.ENTRY_CREATE)) {
try {
if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
registerAll(child);
}
} catch (IOException x) {
LogWriter.printlnError("WatchDir. Error on event " + x);
}
}
}
// reset key and remove from set if directory no longer accessible
boolean valid = key.reset();
if (!valid) {
keys.remove(key);
// all directories are inaccessible
if (keys.isEmpty()) {
break;
}
}
}
}
}
\ No newline at end of file
package ac.ed.lurg.utils;
import java.io.File;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent.Kind;
import java.util.Observable;
public class WatchForFile extends Observable implements WatchDir.Callback {
private File fileToWaitFor;
private WatchDir watchDir;
private Thread thread;
public WatchForFile(final File fileToWaitFor) {
this.fileToWaitFor = fileToWaitFor;
if (fileToWaitFor.exists()) {
LogWriter.println("Marker file already exists: " + fileToWaitFor);
return;
}
thread = new Thread("Watch Directory Thread") {
@Override
public void run() {
Path dirToMonitor = fileToWaitFor.getParentFile().getParentFile().toPath(); // goes up a directory level as lowest directory not always exists before
try {
watchDir = new WatchDir(dirToMonitor, true, WatchForFile.this, StandardWatchEventKinds.ENTRY_CREATE);
checkForFile(); // just in case file has been created before we get here
watchDir.processEvents();
}
catch (IOException e) {
LogWriter.printlnError("Problem watching directory " + e.getMessage());
throw new RuntimeException(e);
}
catch (ClosedWatchServiceException e) {
// LogWriter.println("Service still updating when closed. Done it's job so doesn't matter.");
}
}
};
thread.start(); // the thread terminated by running through as while loop ended by watchDir.stop()
}
@Override
public void dirChange(Path path, Kind<?> kind) {
LogWriter.println(String.format("Got %s: %s", kind.name(), path));
checkForFile();
}
public synchronized boolean checkForFile() {
if (fileToWaitFor.exists()) {
LogWriter.println("Found our marker file");
stop();
notify();
return true;
}
else
return false;
}
private void stop() {
if (watchDir != null)
watchDir.stop();
}
public synchronized boolean await(long timeoutSec) {
if (!fileToWaitFor.exists()) {
try {
wait(timeoutSec * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
stop();
}
}
return fileToWaitFor.exists();
}
}
...@@ -4,11 +4,13 @@ import java.io.BufferedReader; ...@@ -4,11 +4,13 @@ import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileReader; import java.io.FileReader;
import ac.ed.lurg.ModelConfig;
import ac.ed.lurg.types.CropType; import ac.ed.lurg.types.CropType;
import ac.ed.lurg.types.FertiliserRate; import ac.ed.lurg.types.FertiliserRate;
import ac.ed.lurg.types.IrrigationRate; import ac.ed.lurg.types.IrrigationRate;
import ac.ed.lurg.types.YieldType; import ac.ed.lurg.types.YieldType;
import ac.ed.lurg.utils.LogWriter; import ac.ed.lurg.utils.LogWriter;
import ac.ed.lurg.utils.WatchForFile;
import ac.sac.raster.RasterHeaderDetails; import ac.sac.raster.RasterHeaderDetails;
public class LPJYieldResponseMapReader { public class LPJYieldResponseMapReader {
...@@ -19,7 +21,6 @@ public class LPJYieldResponseMapReader { ...@@ -19,7 +21,6 @@ public class LPJYieldResponseMapReader {
public LPJYieldResponseMapReader(String rootDir, RasterHeaderDetails rasterProj) { public LPJYieldResponseMapReader(String rootDir, RasterHeaderDetails rasterProj) {
this.rootDir = rootDir; this.rootDir = rootDir;
dataset = new YieldRaster(rasterProj); dataset = new YieldRaster(rasterProj);
} }
private String[] parseLine(String line) { private String[] parseLine(String line) {
...@@ -28,11 +29,21 @@ public class LPJYieldResponseMapReader { ...@@ -28,11 +29,21 @@ public class LPJYieldResponseMapReader {
public YieldRaster getRasterDataFromFile(FertiliserRate fr) { public YieldRaster getRasterDataFromFile(FertiliserRate fr) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// wait
WatchForFile fileWatcher = new WatchForFile(new File(rootDir + File.separator + "done"));
boolean foundFile = fileWatcher.await(ModelConfig.LPJG_MONITOR_TIMEOUT_SEC);
if (!foundFile) {
LogWriter.printlnError("Not able to find marker file. May have timed out.");
throw new RuntimeException("Not able to find marker file. May have timed out.");
}
String filename = rootDir + File.separator + fr.getFileName();
YieldType noIrrigYieldType = YieldType.getYieldType(fr, IrrigationRate.NO_IRRIG); // just do this look up once per file YieldType noIrrigYieldType = YieldType.getYieldType(fr, IrrigationRate.NO_IRRIG); // just do this look up once per file
YieldType maxIrrigYieldType = YieldType.getYieldType(fr, IrrigationRate.MAX_IRRIG); YieldType maxIrrigYieldType = YieldType.getYieldType(fr, IrrigationRate.MAX_IRRIG);
int col=0; int col=0;
String filename = rootDir + File.separator + fr.getFileName();
try { try {
BufferedReader in = new BufferedReader(new FileReader(filename)); BufferedReader in = new BufferedReader(new FileReader(filename));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment