# Jenk’s Natural Breaks Optimization

A co-worker was interested in segmenting a list of data points, and I went down a rabbit hole on one dimensional segmentation. I found an article on the Jenk’s natural breaks optimization on Wikipedia. I found another article that had some examples. This is used to bin data points so that clusters are always binned together. There is an iterative method that takes unordered data, but this implementation just sorts the data before binning.

```#!/usr/local/bin/python3
# jenks.py

import copy
import numpy as np

class Jenks:
"""Compute Jenks' Natural Breaks

I've used the term "zone(s)" instead of "break(s)"
because "break" is a reserved word in Python.
"""

def __init__(self, data):
self.data = data
self.n_data = len(data)
self.mu = np.mean(data)
# compute this value once because it won't change later
self._sdam = sum([(i-self.mu)**2 for i in data])

def __repr__(self):
return f"zones: {self.zones}, data: {self.segmented_data()}, GVF: {self.gvf():1.3f}"

def set_n_zones(self, n_zones) -> None:
"""Set the number of zones we want to use
"""
self.n_zones = n_zones
_zone_size = self.n_data // self.n_zones
self.zones = []
for i in range(self.n_zones):
zone = [i * _zone_size, (i + 1) * _zone_size]
self.zones.append(zone)
# push the last zone boundary to be the last index
self.zones[-1] = self.n_data
return self.zones

def set_zones(self, zones) -> None:
"""Set the "official" zones for the object
"""
self.zones = zones

def segmented_data(self):
"""Segment the data according to the breaks/zones
"""
result = []
for group in self.groups():
result.append(group)
return result

def _group(self, i):
"""Return the data in one zone
"""
if i >= self.n_zones:
raise ValueError("Group index be greater than or equal to the number of breaks")
left_bound, right_bound = self.zones[i]
return self.data[left_bound:right_bound]

def groups(self):
"""Generator
"""
for i in range(self.n_zones):
yield self._group(i)

def sdam(self):
"""Sum of Squared Deviations from the Array Mean
"""
return self._sdam

def sdcm(self, zones=None):
"""Sum of Squared Deviations for Class Means
"""
result = 0
if zones is None:
zones = self.zones
for zone in zones:
left_bound, right_bound = zone
data = self.data[left_bound:right_bound]
for item in data:
result += (item - np.mean(data))**2
return result

def gvf(self, zones=None):
"""Goodness of Variance Fit
"""
if zones is None:
zones = self.zones
return (self.sdam() - self.sdcm(zones)) / self.sdam()

def _expand_right(self, i:int):
"""Expand one zone to the right, and return that zone mapping

Args:
i: index of a zone in self.zones
"""
result = copy.deepcopy(self.zones)
if i >= self.n_zones - 1:
return result
if result[i+1] - result[i+1] == 1:
return result
result[i] += 1
result[i+1] += 1
return result

def _expand_left(self, i):
"""Expand one zone to the left, and return that zone mapping

Args:
i: index of a zone in self.zones
"""
result = copy.deepcopy(self.zones)
if i == 0:
return result
if result[i-1] - result[i-1] == 1:
return result
result[i] -= 1
result[i-1] -= 1
return result

def solve(self):
"""Greedily find an optimal segmentation using Jenks' method
"""
gvfs = [self.gvf()]
while True:
if len(gvfs) > 1 and np.abs(gvfs[-2] - gvfs[-1]) < 0.0001:
break
options = []
scores = []
for i in range(self.n_zones):
options.append(self._expand_left(i))
options.append(self._expand_right(i))
for option in options:
scores.append(self.gvf(option))
option_scores = []
for score, option in zip(scores, options):
option_scores.append((score, option))
option_scores = sorted(option_scores)
optimal = option_scores[-1]
self.zones = optimal
gvfs.append(optimal)
return {"zones": self.zones, "segmentation": self.segmented_data(), "GVF": self.gvf()}
```

I wanted to see how this would work in practice, so I wrote some tests.

```data = [1,2,4,5,6,10,11]
jenks = Jenks(data)

jenks.set_n_zones(2)
assert [[0, 3], [3, 7]] == jenks.zones, jenks.zones

jenks.set_n_zones(3)
assert [[0, 2], [2, 4], [4, 7]] == jenks.zones, jenks.zones
assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 7]], 'segmentation': [[1, 2], [4, 5, 6], [10, 11]], 'GVF': 0.965}

data = [1,2,4,5,6,10]
jenks = Jenks(data)

jenks.set_n_zones(2)
assert [[0, 3], [3, 6]] == jenks.zones, jenks.zones

jenks.set_n_zones(3)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones

expand_right = jenks._expand_right(1)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones
assert [[0, 2], [2, 5], [5, 6]] == expand_right, expand_right

expand_left = jenks._expand_left(1)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones
assert [[0, 1], [1, 4], [4, 6]] == expand_left, expand_left

assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 6]], 'segmentation': [[1, 2], [4, 5, 6], ], 'GVF': 0.9512987012987013}

data = [4, 5, 9, 10]
jenks = Jenks(data_test)
assert jenks.sdam() == 26.0

assert jenks.set_n_zones(2) == [[0, 2], [2, 4]]

assert jenks.sdcm(jenks.zones) == 1.0
assert jenks.sdcm(jenks._expand_left(1)) == 14.0
assert jenks.sdcm(jenks._expand_right(0)) == 14.0

assert np.abs(jenks.gvf(jenks.zones) - 0.96) < 0.01
assert np.abs(jenks.gvf(jenks._expand_left(1)) - 0.46) < 0.01
assert np.abs(jenks.gvf(jenks._expand_right(0)) - 0.46) < 0.01

assert jenks.solve() == {'zones': [[0, 2], [2, 4]], 'segmentation': [[4, 5], [9, 10]], 'GVF': 0.9615384615384616}
```

Finally, I played around with testing this on Pareto distributed data,

```data = [np.random.pareto(a=1.16) for i in range(1000)]
jenks = Jenks(sorted(data))
jenks.set_n_zones(3)
%timeit jenks.solve()
```
```165 ms ± 9.33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```