Spatial Statistical Hypothesis Testing

In this post I’ll consider performing a local hypothesis test for a difference in means with spatial data. I do not know if this is the optimal way to go about this sort of thing, but I have not yet found another solution. I think the best way to describe the problem is to consider the artificial data, and then wade through the code.

Data

I am thinking of a knot of test wells, surrounded by a field of offset control wells. I’d like to determine if the test wells are producing more than the offset wells, or if the production is a matter of natural variation. To simulate this I have a created two normally distributed sets. The first two dimensions are the spatial dimensions, and the third dimension is the production. I have denoted the test set A, and the offsets B, shown in red and green below.

In the code below, I have set the production of the test wells to be equal to that of the offset wells in order to assess the null case when the null hypothesis is true. I turns out that fourteen of the fifteen wells fail to reject the null hypothesis, so that checks out in this example.

import numpy as np
import scipy.stats
import random

# prodction means for the offset and test wells, resp.
base = 2000.
effect = 2000.
# variance of test and offset production
Av, Bv = 100.0, 100.0
# size of test and offset sets
An, Bn = 15, 40
# test set
A = scipy.stats.norm((0,0,effect),(1,1,Av)).rvs((An,3))
# offset set
B = scipy.stats.norm((0,0,base),(3,3,Bv)).rvs((Bn,3))
# indicator vector
I = np.zeros((An+Bn,1))
# test := 1, offset := 0
I[:An] = 1.0
# stack test and offset
S = np.vstack((A,B))
# add the indicator vector
S = np.hstack((S,I))

Code

Here are some helper functions. The first is a Euclidean distance function. I’m assuming the wells are closed near enough that I don’t need to take into account the curvature of the Earth and use the Spherical Law of Cosines. Next is a function to return the nearest neighbors to a given test well. It will return enough neighbors to accumulate n offset wells.

# distance function for two two dimensional points
ds = lambda p,q: np.sqrt( (p[0]-q[0])**2.0 + (p[1]-q[1])**2.0 )

def nneighbors( S, idx, n ):
    '''
    Input:  (S)  four dimensional NumPy array, the first two dimensions are
                 the x and y coordinates, the third dimension is production,
                 and the fourth dimension is an indicator vector, test wells
                 are encoded with a 1, and offset wells are encoded with a 0
           (idx) integer, the row of (S) to consider for the nearest neighbors
            (n)  the number of offset wells to be considered
    Output:      four dimensional array of the nearest neighbors 
                 that include n offset points
    '''
    Sn = S.shape[0]
    # compare each row of S to S[idx]
    nbhd = np.array( [ ds( S[i,:2], S[idx,:2] ) for i in range( Sn ) ] )
    # sort by distance
    nbhd = np.argsort( nbhd )
    # nbhd is now a re-ordered copy of S
    nbhd = S[nbhd]
    # take n offset neighbors
    i, k = 0, 0
    while True:
        if nbhd[i,3] == 0:
            k += 1
        if k == n:
            break
        i += 1
    return nbhd[:i+1]

The separate() function separates a set of wells into test wells and offset wells based on the indicator vector in the fourth column.

def separate( X ):
    '''
    Input:  (X) four dimensional array with an indicator vector
                in the fourth dimension
    Output: (a) four dimensional array of test wells
            (b) four dimensional array of offset wells
    '''
    a = np.array( [ i[:3] for i in X if i[3] == 1 ] )
    b = np.array( [ i[:3] for i in X if i[3] == 0 ] )
    return a, b

The ztest() function is from a previous post. It performs the z-test using the hypothesis that the sample has a given mean.

def ztest( dp, mu=0.0, sigma=None, n=None, alpha=0.05, twotail=True ):
    '''
    Input : (dp)      iterable or float, sample data points, or estimated mean 
            (mu)      float, hypothesized population mean 
            (sigma)   float, standard deviation of sample
            (n)       integer, number of data points
            (alpha)   float, confidence level
            (twotail) boolean, twotailed test
    Output: (pv)      probability of the hypothesis being true,
                      but there exist data points that are at 
                      least as extreme as the observed values
    '''
    # in case the data points are a NumPy array
    if( type( dp )==np.ndarray ):
        n = dp.size
        x = np.mean( dp )
    # in case the data points are a list
    elif( type( dp )==list ):
        n = len( dp )
        x = np.mean( dp )
    # in case we provide an estimated mean value
    else:
        x = dp
    # two-tail correction
    if twotail:
        alpha /= 2.0
    # sample standard deviation
    if sigma == None:
        sigma = np.std( dp )
    # this prevents division by zero
    if sigma == 0.0:
        sigma = 0.01
    # z-score
    z = ( x - mu ) / float( sigma )
    # check n
    if n == None:
        print 'Need a sample size, n.'
        return None
    # test statistic
    ts = z * np.sqrt( n )
    # calculate the p-value
    if ts <= 0:
        return scipy.stats.norm( 0, 1 ).cdf( ts )
    elif ts > 0:
        return scipy.stats.norm( 0, 1 ).cdf( -ts )

This function calculates the inverse distance weighted average of a set of points, given some central point.

def invw_mean( X, cntr ):
    '''
    Input:  (X)    four dimensional array
            (cntr) two dimensional point about 
                   which to weight points in (X)
    Output:        a inversely weighted mean of (X),
                   centered about the point (cntr)
    '''
    N = X.shape[0]
    w = np.zeros(( N, ))
    for i in range( N ):
        try:
            # calculate the inverse weights
            w[i] = 1.0 / ds( X[i,:2], cntr[:2] )
        except:
            # catch division by zero
            w[i] = 0.0
    # if we have any zero-distance weights
    if np.min( w ) == 0.0:
        # make that weight double the largest weight
        w[ np.argmin( w ) ] = np.max( w ) * 2.0
    # normalize
    w /= np.sum( w )
    # calculate and return the weighted mean
    return np.dot( X[:,2], w )

This function ties together the ones listed above. First we find the nearest neighbors of a given test well, and then separate them into sample test and sample offset sets. If there is a surfeit of sample test data, I’ll re-sample it, perform hypothesis testing, and return the maximum p-value.

def spztest( S, idx, n ):
    # find the n nearest neighbors
    X = nneighbors( S, idx, n )
    # separate into test and offset wells
    A, B = separate( X )
    T = list()
    # there are more test wells than offset wells
    if A.shape[0] > B.shape[0]:
        # take five random samples of size n
        for i in range( 5 ):
            T.append( np.array( random.sample( A, n ) ) )
    else:
        T.append( A )
    pv = list()
    # for each random sample...
    for t in T:
        # lump the test and offset data together
        xx = np.vstack( ( t, B ) )
        # calculate the standard deviation of the test and offset wells
        sd = np.std( xx[:,2] )
        # let the center be the center of mass of the test wells
        cntr = ( np.mean( t[:,0] ), np.mean( t[:,1] ) )
        # inverse distance weighted mean function
        tm = invw_mean( t, cntr )
        Bm = invw_mean( B, cntr )
        # perform classical hypothesis test
        # null hypothesis is that the means are equal
        pv.append( ztest( tm, mu=Bm, sigma=sd, n=n ) )
    # return the maximum p-value
    return np.max( pv )

Visualization and Analysis

To test the artificial data, I’ll loop through the An test wells in the data set S and collect the p-values of each test.

mx = list()
# recall, the first An points are the test wells
for i in range( An ):
    smx, smd = spztest( S, i, 10 )
    mx.append( smx )
mx

Here are the p-values returned by a test:

[0.063200612925140162,
 0.46020042787985055,
 0.34728701624228769,
 0.28354222559091974,
 0.19241053536792413,
 0.41836201073014556,
 0.2482599810294695,
 0.074972425124645359,
 3.6310642413222143e-05,
 0.47695328351426147,
 0.48227773825391373,
 0.21425353985247148,
 0.19364552150046321,
 0.23721202001137376,
 0.48310373126710204]

We see that only one fell below the 0.05 significance level. Below is a plot of these values. The grey scale runs from zero (black) to 0.05 (white). This means that test wells failing to reject the null hypothesis will have a black border and white center, while test wells that reject the null hypothesis of equal mean production will have a grey or black center. Offset wells are shown in green, as before.

fig, ax = subplots()
ax.scatter( B[:,0], B[:,1], marker='o', edgecolor='g', facecolor='none', alpha=0.75 )
ax.scatter( A[:,0], A[:,1], marker='o', c=mx, cmap='gray', vmin=0.0, vmax=0.05 ) ;

As we can see, fourteen out of fifteen of the test wells fail to reject the null hypothesis that there is no difference in production, while one rejects this hypothesis. This seems pretty reasonable.