A co-worker was interested in segmenting a list of data points, and I went down a rabbit hole on one dimensional segmentation. I found an article on the Jenk’s natural breaks optimization on Wikipedia. I found another article that had some examples. This is used to bin data points so that clusters are always binned together. There is an iterative method that takes unordered data, but this implementation just sorts the data before binning.
#!/usr/local/bin/python3 # jenks.py import copy import numpy as np class Jenks: """Compute Jenks' Natural Breaks I've used the term "zone(s)" instead of "break(s)" because "break" is a reserved word in Python. """ def __init__(self, data): self.data = data self.n_data = len(data) self.mu = np.mean(data) # compute this value once because it won't change later self._sdam = sum([(i-self.mu)**2 for i in data]) def __repr__(self): return f"zones: {self.zones}, data: {self.segmented_data()}, GVF: {self.gvf():1.3f}" def set_n_zones(self, n_zones) -> None: """Set the number of zones we want to use """ self.n_zones = n_zones _zone_size = self.n_data // self.n_zones self.zones = [] for i in range(self.n_zones): zone = [i * _zone_size, (i + 1) * _zone_size] self.zones.append(zone) # push the last zone boundary to be the last index self.zones[-1][1] = self.n_data return self.zones def set_zones(self, zones) -> None: """Set the "official" zones for the object """ self.zones = zones def segmented_data(self): """Segment the data according to the breaks/zones """ result = [] for group in self.groups(): result.append(group) return result def _group(self, i): """Return the data in one zone """ if i >= self.n_zones: raise ValueError("Group index be greater than or equal to the number of breaks") left_bound, right_bound = self.zones[i] return self.data[left_bound:right_bound] def groups(self): """Generator """ for i in range(self.n_zones): yield self._group(i) def sdam(self): """Sum of Squared Deviations from the Array Mean """ return self._sdam def sdcm(self, zones=None): """Sum of Squared Deviations for Class Means """ result = 0 if zones is None: zones = self.zones for zone in zones: left_bound, right_bound = zone data = self.data[left_bound:right_bound] for item in data: result += (item - np.mean(data))**2 return result def gvf(self, zones=None): """Goodness of Variance Fit """ if zones is None: zones = self.zones return (self.sdam() - self.sdcm(zones)) / self.sdam() def _expand_right(self, i:int): """Expand one zone to the right, and return that zone mapping Args: i: index of a zone in self.zones """ result = copy.deepcopy(self.zones) if i >= self.n_zones - 1: return result if result[i+1][1] - result[i+1][0] == 1: return result result[i][1] += 1 result[i+1][0] += 1 return result def _expand_left(self, i): """Expand one zone to the left, and return that zone mapping Args: i: index of a zone in self.zones """ result = copy.deepcopy(self.zones) if i == 0: return result if result[i-1][1] - result[i-1][0] == 1: return result result[i][0] -= 1 result[i-1][1] -= 1 return result def solve(self): """Greedily find an optimal segmentation using Jenks' method """ gvfs = [self.gvf()] while True: if len(gvfs) > 1 and np.abs(gvfs[-2] - gvfs[-1]) < 0.0001: break options = [] scores = [] for i in range(self.n_zones): options.append(self._expand_left(i)) options.append(self._expand_right(i)) for option in options: scores.append(self.gvf(option)) option_scores = [] for score, option in zip(scores, options): option_scores.append((score, option)) option_scores = sorted(option_scores) optimal = option_scores[-1] self.zones = optimal[1] gvfs.append(optimal[0]) return {"zones": self.zones, "segmentation": self.segmented_data(), "GVF": self.gvf()}
I wanted to see how this would work in practice, so I wrote some tests.
data = [1,2,4,5,6,10,11] jenks = Jenks(data) jenks.set_n_zones(2) assert [[0, 3], [3, 7]] == jenks.zones, jenks.zones jenks.set_n_zones(3) assert [[0, 2], [2, 4], [4, 7]] == jenks.zones, jenks.zones assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 7]], 'segmentation': [[1, 2], [4, 5, 6], [10, 11]], 'GVF': 0.965} data = [1,2,4,5,6,10] jenks = Jenks(data) jenks.set_n_zones(2) assert [[0, 3], [3, 6]] == jenks.zones, jenks.zones jenks.set_n_zones(3) assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones expand_right = jenks._expand_right(1) assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones assert [[0, 2], [2, 5], [5, 6]] == expand_right, expand_right expand_left = jenks._expand_left(1) assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones assert [[0, 1], [1, 4], [4, 6]] == expand_left, expand_left assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 6]], 'segmentation': [[1, 2], [4, 5, 6], [10]], 'GVF': 0.9512987012987013} data = [4, 5, 9, 10] jenks = Jenks(data_test) assert jenks.sdam() == 26.0 assert jenks.set_n_zones(2) == [[0, 2], [2, 4]] assert jenks.sdcm(jenks.zones) == 1.0 assert jenks.sdcm(jenks._expand_left(1)) == 14.0 assert jenks.sdcm(jenks._expand_right(0)) == 14.0 assert np.abs(jenks.gvf(jenks.zones) - 0.96) < 0.01 assert np.abs(jenks.gvf(jenks._expand_left(1)) - 0.46) < 0.01 assert np.abs(jenks.gvf(jenks._expand_right(0)) - 0.46) < 0.01 assert jenks.solve() == {'zones': [[0, 2], [2, 4]], 'segmentation': [[4, 5], [9, 10]], 'GVF': 0.9615384615384616}
Finally, I played around with testing this on Pareto distributed data,
data = [np.random.pareto(a=1.16) for i in range(1000)] jenks = Jenks(sorted(data)) jenks.set_n_zones(3) %timeit jenks.solve()
165 ms ± 9.33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)