Jenk’s Natural Breaks Optimization

A co-worker was interested in segmenting a list of data points, and I went down a rabbit hole on one dimensional segmentation. I found an article on the Jenk’s natural breaks optimization on Wikipedia. I found another article that had some examples. This is used to bin data points so that clusters are always binned together. There is an iterative method that takes unordered data, but this implementation just sorts the data before binning.

#!/usr/local/bin/python3
# jenks.py

import copy
import numpy as np

class Jenks:
    """Compute Jenks' Natural Breaks
    
    I've used the term "zone(s)" instead of "break(s)" 
    because "break" is a reserved word in Python.
    """
    
    def __init__(self, data):
        self.data = data
        self.n_data = len(data)
        self.mu = np.mean(data)
        # compute this value once because it won't change later
        self._sdam = sum([(i-self.mu)**2 for i in data])
        
    def __repr__(self):
        return f"zones: {self.zones}, data: {self.segmented_data()}, GVF: {self.gvf():1.3f}"
        
    def set_n_zones(self, n_zones) -> None:
        """Set the number of zones we want to use
        """
        self.n_zones = n_zones
        _zone_size = self.n_data // self.n_zones
        self.zones = []
        for i in range(self.n_zones):
            zone = [i * _zone_size, (i + 1) * _zone_size]
            self.zones.append(zone)
        # push the last zone boundary to be the last index
        self.zones[-1][1] = self.n_data
        return self.zones
        
    def set_zones(self, zones) -> None:
        """Set the "official" zones for the object
        """
        self.zones = zones
        
    def segmented_data(self):
        """Segment the data according to the breaks/zones
        """
        result = []
        for group in self.groups():
            result.append(group)
        return result
        
    def _group(self, i):
        """Return the data in one zone
        """
        if i >= self.n_zones:
            raise ValueError("Group index be greater than or equal to the number of breaks")
        left_bound, right_bound = self.zones[i]
        return self.data[left_bound:right_bound]
    
    def groups(self):
        """Generator 
        """
        for i in range(self.n_zones):
            yield self._group(i)
    
    def sdam(self):
        """Sum of Squared Deviations from the Array Mean
        """
        return self._sdam
    
    def sdcm(self, zones=None):
        """Sum of Squared Deviations for Class Means
        """
        result = 0
        if zones is None:
            zones = self.zones
        for zone in zones:
            left_bound, right_bound = zone
            data = self.data[left_bound:right_bound]
            for item in data:
                result += (item - np.mean(data))**2
        return result
    
    def gvf(self, zones=None):
        """Goodness of Variance Fit
        """
        if zones is None:
            zones = self.zones
        return (self.sdam() - self.sdcm(zones)) / self.sdam()
    
    def _expand_right(self, i:int):
        """Expand one zone to the right, and return that zone mapping
        
        Args:
            i: index of a zone in self.zones
        """
        result = copy.deepcopy(self.zones)
        if i >= self.n_zones - 1:
            return result
        if result[i+1][1] - result[i+1][0] == 1:
            return result
        result[i][1] += 1
        result[i+1][0] += 1
        return result

    def _expand_left(self, i):
        """Expand one zone to the left, and return that zone mapping
        
        Args:
            i: index of a zone in self.zones
        """
        result = copy.deepcopy(self.zones)
        if i == 0:
            return result
        if result[i-1][1] - result[i-1][0] == 1:
            return result
        result[i][0] -= 1
        result[i-1][1] -= 1
        return result 
    
    def solve(self):
        """Greedily find an optimal segmentation using Jenks' method
        """
        gvfs = [self.gvf()]
        while True:
            if len(gvfs) > 1 and np.abs(gvfs[-2] - gvfs[-1]) < 0.0001:
                break
            options = []
            scores = []
            for i in range(self.n_zones):
                options.append(self._expand_left(i))
                options.append(self._expand_right(i))
            for option in options:
                scores.append(self.gvf(option))
            option_scores = []
            for score, option in zip(scores, options):
                option_scores.append((score, option))
            option_scores = sorted(option_scores)
            optimal = option_scores[-1]
            self.zones = optimal[1]
            gvfs.append(optimal[0])
        return {"zones": self.zones, "segmentation": self.segmented_data(), "GVF": self.gvf()}

I wanted to see how this would work in practice, so I wrote some tests.

data = [1,2,4,5,6,10,11]
jenks = Jenks(data)

jenks.set_n_zones(2)
assert [[0, 3], [3, 7]] == jenks.zones, jenks.zones

jenks.set_n_zones(3)
assert [[0, 2], [2, 4], [4, 7]] == jenks.zones, jenks.zones
assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 7]], 'segmentation': [[1, 2], [4, 5, 6], [10, 11]], 'GVF': 0.965}

data = [1,2,4,5,6,10]
jenks = Jenks(data)

jenks.set_n_zones(2)
assert [[0, 3], [3, 6]] == jenks.zones, jenks.zones

jenks.set_n_zones(3)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones

expand_right = jenks._expand_right(1)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones
assert [[0, 2], [2, 5], [5, 6]] == expand_right, expand_right

expand_left = jenks._expand_left(1)
assert [[0, 2], [2, 4], [4, 6]] == jenks.zones, jenks.zones
assert [[0, 1], [1, 4], [4, 6]] == expand_left, expand_left

assert jenks.solve() == {'zones': [[0, 2], [2, 5], [5, 6]], 'segmentation': [[1, 2], [4, 5, 6], [10]], 'GVF': 0.9512987012987013}

data = [4, 5, 9, 10]
jenks = Jenks(data_test)
assert jenks.sdam() == 26.0

assert jenks.set_n_zones(2) == [[0, 2], [2, 4]]

assert jenks.sdcm(jenks.zones) == 1.0
assert jenks.sdcm(jenks._expand_left(1)) == 14.0
assert jenks.sdcm(jenks._expand_right(0)) == 14.0

assert np.abs(jenks.gvf(jenks.zones) - 0.96) < 0.01
assert np.abs(jenks.gvf(jenks._expand_left(1)) - 0.46) < 0.01
assert np.abs(jenks.gvf(jenks._expand_right(0)) - 0.46) < 0.01

assert jenks.solve() == {'zones': [[0, 2], [2, 4]], 'segmentation': [[4, 5], [9, 10]], 'GVF': 0.9615384615384616}

Finally, I played around with testing this on Pareto distributed data,

data = [np.random.pareto(a=1.16) for i in range(1000)]
jenks = Jenks(sorted(data))
jenks.set_n_zones(3)
%timeit jenks.solve()
165 ms ± 9.33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)