import csv import math import os import re from collections import deque import matplotlib.pyplot as plot def load(directory): def _read(iterable): for x, y in iterable: yield float(x), float(y) def _load(filename): with open(filename) as fd: reader = csv.reader(fd) return list(_read(reader)) def _files(directory): for file in os.listdir(directory): match = re.match(r"SWI_(-?\d+)\.csv", file) if match: yield int(match.group(1)), os.path.join(directory, file) return [(x, _load(y)) for x, y in sorted(_files(directory), key=lambda x: x[0])] def look_downwards(data, x, start): for i in range(start, 0, -1): if data[i - 1][0] < x: break else: raise IndexError return i - 1 def look_upwards(data, x, start): for i in range(start, len(data)): if data[i + 1][0] > x: break else: raise IndexError return i def find_segment(data, x): width = data[-1][0] - data[0][0] relative = x - data[0][0] candidate = math.floor(relative / width * len(data)) look = look_downwards if data[candidate][0] > x else look_upwards # May raise IndexError candidate = look(data, x, candidate) return candidate, candidate + 1 def find_boundary_curves(swis, x, y): segments = deque() for index, data in swis: i, j = find_segment(data, x) if data[i][1] > y and data[j][1] > y: segments.append((index, data, i, j)) break if data[i][1] < y and data[j][1] < y: if segments: segments.popleft() segments.append((index, data, i, j)) if len(segments) == 3: middle = segments[1][1] run = middle[j][0] - middle[i][0] if run == 0: raise RuntimeError # tidy up dataset slope = (middle[j][1] - middle[i][1]) / run intercept = middle[j][1] - slope * middle[j][0] value = slope * x + intercept if value == y: raise RuntimeError # Exactly on point; SWI == index if value < y: segments.popleft() else: segments.pop() if len(segments) == 1: raise RuntimeError # SWI == -10 return segments def dist(x1, y1, x2, y2): return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) def calculate_swi(segments, x, y): low = segments[0] high = segments[1] dist_to_low = min(dist(p[0], p[1], x, y) for p in (low[1][low[2]], low[1][low[2]])) dist_to_high = min(dist(p[0], p[1], x, y) for p in (high[1][high[2]], high[1][high[2]])) return dist_to_low / (dist_to_low + dist_to_high) * (high[0] - low[0]) + low[0] swis = load("dataset") def _swi(x, y): # dt, depth segments = find_boundary_curves(swis, x, y) return calculate_swi(segments, x, y) def grid(start, end, steps): step = (end - start) / steps i = start while i <= end: yield i i += step C = [] X = list(grid(0, 40, 1000)) Y = list(grid(0, 50000, 1000)) for y_scaled in range(0, 1000): y = y_scaled * 50 row = [] for x_scaled in range(0, 1000): x = x_scaled / 25 try: swi = _swi(x, y) except (IndexError, RuntimeError): swi = -10 row.append(swi) C.append(row) plot.pcolormesh(X, Y, C, cmap='viridis', vmin=-10, vmax=10, rasterized=True) for _, data in swis: plot.plot([x[0] for x in data], [x[1] for x in data]) plot.show()