A co-worker was interested in segmenting a list of data points, and I went down a rabbit hole on one dimensional segmentation. I found an article on the Jenk’s natural breaks optimization on Wikipedia. I found another article that had some examples. This is used to bin data points so that clusters are always binned together. There is an iterative method that takes unordered data, but this implementation just sorts the data before binning.
#!/usr/local/bin/python3
# jenks.py
import copy
import numpy as np
class Jenks:
"""Compute Jenks' Natural Breaks
I've used the term "zone(s)" instead of "break(s)"
because "break" is a reserved word in Python.
"""
def __init__(self, data):
self.data = data
self.n_data = len(data)
self.mu = np.mean(data)
# compute this value once because it won't change later
self._sdam = sum([(i-self.mu)**2 for i in data])
def __repr__(self):
return f"zones: {self.zones}, data: {self.segmented_data()}, GVF: {self.gvf():1.3f}"
def set_n_zones(self, n_zones) -> None:
"""Set the number of zones we want to use
"""
self.n_zones = n_zones
_zone_size = self.n_data // self.n_zones
self.zones = []
for i in range(self.n_zones):
zone = [i * _zone_size, (i + 1) * _zone_size]
self.zones.append(zone)
# push the last zone boundary to be the last index
self.zones[-1][1] = self.n_data
return self.zones
def set_zones(self, zones) -> None:
"""Set the "official" zones for the object
"""
self.zones = zones
def segmented_data(self):
"""Segment the data according to the breaks/zones
"""
result = []
for group in self.groups():
result.append(group)
return result
def _group(self, i):
"""Return the data in one zone
"""
if i >= self.n_zones:
raise ValueError("Group index be greater than or equal to the number of breaks")
left_bound, right_bound = self.zones[i]
return self.data[left_bound:right_bound]
def groups(self):
"""Generator
"""
for i in range(self.n_zones):
yield self._group(i)
def sdam(self):
"""Sum of Squared Deviations from the Array Mean
"""
return self._sdam
def sdcm(self, zones=None):
"""Sum of Squared Deviations for Class Means
"""
result = 0
if zones is None:
zones = self.zones
for zone in zones:
left_bound, right_bound = zone
data = self.data[left_bound:right_bound]
for item in data:
result += (item - np.mean(data))**2
return result
def gvf(self, zones=None):
"""Goodness of Variance Fit
"""
if zones is None:
zones = self.zones
return (self.sdam() - self.sdcm(zones)) / self.sdam()
def _expand_right(self, i:int):
"""Expand one zone to the right, and return that zone mapping
Args:
i: index of a zone in self.zones
"""
result = copy.deepcopy(self.zones)
if i >= self.n_zones - 1:
return result
if result[i+1][1] - result[i+1][0] == 1:
return result
result[i][1] += 1
result[i+1][0] += 1
return result
def _expand_left(self, i):
"""Expand one zone to the left, and return that zone mapping
Args:
i: index of a zone in self.zones
"""
result = copy.deepcopy(self.zones)
if i == 0:
return result
if result[i-1][1] - result[i-1][0] == 1:
return result
result[i][0] -= 1
result[i-1][1] -= 1
return result
def solve(self):
"""Greedily find an optimal segmentation using Jenks' method
"""
gvfs = [self.gvf()]
while True:
if len(gvfs) > 1 and np.abs(gvfs[-2] - gvfs[-1]) < 0.0001:
break
options = []
scores = []
for i in range(self.n_zones):
options.append(self._expand_left(i))
options.append(self._expand_right(i))
for option in options:
scores.append(self.gvf(option))
option_scores = []
for score, option in zip(scores, options):
option_scores.append((score, option))
option_scores = sorted(option_scores)
optimal = option_scores[-1]
self.zones = optimal[1]
gvfs.append(optimal[0])
return {"zones": self.zones, "segmentation": self.segmented_data(), "GVF": self.gvf()}
I wanted to see how this would work in practice, so I wrote some tests.
data = [1,2,4,5,6,10,11]
jenks = Jenks(data)
jenks.set_n_zones(2)
assert [[0, 3], [3, 7]] == jenks.zones, jenks.zones
jenks.set_n_zones(3)
assert [[0, 2], [2, 4], [4, 7]] == jenks.zones, jenks.zones
assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 7]], 'segmentation': [[1, 2], [4, 5, 6], [10, 11]], 'GVF': 0.965}
data = [1,2,4,5,6,10]
jenks = Jenks(data)
jenks.set_n_zones(2)
assert [[0, 3], [3, 6]] == jenks.zones, jenks.zones
jenks.set_n_zones(3)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones
expand_right = jenks._expand_right(1)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones
assert [[0, 2], [2, 5], [5, 6]] == expand_right, expand_right
expand_left = jenks._expand_left(1)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones
assert [[0, 1], [1, 4], [4, 6]] == expand_left, expand_left
assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 6]], 'segmentation': [[1, 2], [4, 5, 6], [10]], 'GVF': 0.9512987012987013}
data = [4, 5, 9, 10]
jenks = Jenks(data_test)
assert jenks.sdam() == 26.0
assert jenks.set_n_zones(2) == [[0, 2], [2, 4]]
assert jenks.sdcm(jenks.zones) == 1.0
assert jenks.sdcm(jenks._expand_left(1)) == 14.0
assert jenks.sdcm(jenks._expand_right(0)) == 14.0
assert np.abs(jenks.gvf(jenks.zones) - 0.96) < 0.01
assert np.abs(jenks.gvf(jenks._expand_left(1)) - 0.46) < 0.01
assert np.abs(jenks.gvf(jenks._expand_right(0)) - 0.46) < 0.01
assert jenks.solve() == {'zones': [[0, 2], [2, 4]], 'segmentation': [[4, 5], [9, 10]], 'GVF': 0.9615384615384616}
Finally, I played around with testing this on Pareto distributed data,
data = [np.random.pareto(a=1.16) for i in range(1000)] jenks = Jenks(sorted(data)) jenks.set_n_zones(3) %timeit jenks.solve()
165 ms ± 9.33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)