In this post I’ll consider performing a local hypothesis test for a difference in means with spatial data. I do not know if this is the optimal way to go about this sort of thing, but I have not yet found another solution. I think the best way to describe the problem is to consider the artificial data, and then wade through the code.
Data
I am thinking of a knot of test wells, surrounded by a field of offset control wells. I’d like to determine if the test wells are producing more than the offset wells, or if the production is a matter of natural variation. To simulate this I have a created two normally distributed sets. The first two dimensions are the spatial dimensions, and the third dimension is the production. I have denoted the test set A
, and the offsets B
, shown in red and green below.
In the code below, I have set the production of the test wells to be equal to that of the offset wells in order to assess the null case when the null hypothesis is true. I turns out that fourteen of the fifteen wells fail to reject the null hypothesis, so that checks out in this example.
import numpy as np import scipy.stats import random # prodction means for the offset and test wells, resp. base = 2000. effect = 2000. # variance of test and offset production Av, Bv = 100.0, 100.0 # size of test and offset sets An, Bn = 15, 40 # test set A = scipy.stats.norm((0,0,effect),(1,1,Av)).rvs((An,3)) # offset set B = scipy.stats.norm((0,0,base),(3,3,Bv)).rvs((Bn,3)) # indicator vector I = np.zeros((An+Bn,1)) # test := 1, offset := 0 I[:An] = 1.0 # stack test and offset S = np.vstack((A,B)) # add the indicator vector S = np.hstack((S,I))
Code
Here are some helper functions. The first is a Euclidean distance function. I’m assuming the wells are closed near enough that I don’t need to take into account the curvature of the Earth and use the Spherical Law of Cosines. Next is a function to return the nearest neighbors to a given test well. It will return enough neighbors to accumulate n
offset wells.
# distance function for two two dimensional points ds = lambda p,q: np.sqrt( (p[0]-q[0])**2.0 + (p[1]-q[1])**2.0 ) def nneighbors( S, idx, n ): ''' Input: (S) four dimensional NumPy array, the first two dimensions are the x and y coordinates, the third dimension is production, and the fourth dimension is an indicator vector, test wells are encoded with a 1, and offset wells are encoded with a 0 (idx) integer, the row of (S) to consider for the nearest neighbors (n) the number of offset wells to be considered Output: four dimensional array of the nearest neighbors that include n offset points ''' Sn = S.shape[0] # compare each row of S to S[idx] nbhd = np.array( [ ds( S[i,:2], S[idx,:2] ) for i in range( Sn ) ] ) # sort by distance nbhd = np.argsort( nbhd ) # nbhd is now a re-ordered copy of S nbhd = S[nbhd] # take n offset neighbors i, k = 0, 0 while True: if nbhd[i,3] == 0: k += 1 if k == n: break i += 1 return nbhd[:i+1]
The separate()
function separates a set of wells into test wells and offset wells based on the indicator vector in the fourth column.
def separate( X ): ''' Input: (X) four dimensional array with an indicator vector in the fourth dimension Output: (a) four dimensional array of test wells (b) four dimensional array of offset wells ''' a = np.array( [ i[:3] for i in X if i[3] == 1 ] ) b = np.array( [ i[:3] for i in X if i[3] == 0 ] ) return a, b
The ztest()
function is from a previous post. It performs the z-test using the hypothesis that the sample has a given mean.
def ztest( dp, mu=0.0, sigma=None, n=None, alpha=0.05, twotail=True ): ''' Input : (dp) iterable or float, sample data points, or estimated mean (mu) float, hypothesized population mean (sigma) float, standard deviation of sample (n) integer, number of data points (alpha) float, confidence level (twotail) boolean, twotailed test Output: (pv) probability of the hypothesis being true, but there exist data points that are at least as extreme as the observed values ''' # in case the data points are a NumPy array if( type( dp )==np.ndarray ): n = dp.size x = np.mean( dp ) # in case the data points are a list elif( type( dp )==list ): n = len( dp ) x = np.mean( dp ) # in case we provide an estimated mean value else: x = dp # two-tail correction if twotail: alpha /= 2.0 # sample standard deviation if sigma == None: sigma = np.std( dp ) # this prevents division by zero if sigma == 0.0: sigma = 0.01 # z-score z = ( x - mu ) / float( sigma ) # check n if n == None: print 'Need a sample size, n.' return None # test statistic ts = z * np.sqrt( n ) # calculate the p-value if ts <= 0: return scipy.stats.norm( 0, 1 ).cdf( ts ) elif ts > 0: return scipy.stats.norm( 0, 1 ).cdf( -ts )
This function calculates the inverse distance weighted average of a set of points, given some central point.
def invw_mean( X, cntr ): ''' Input: (X) four dimensional array (cntr) two dimensional point about which to weight points in (X) Output: a inversely weighted mean of (X), centered about the point (cntr) ''' N = X.shape[0] w = np.zeros(( N, )) for i in range( N ): try: # calculate the inverse weights w[i] = 1.0 / ds( X[i,:2], cntr[:2] ) except: # catch division by zero w[i] = 0.0 # if we have any zero-distance weights if np.min( w ) == 0.0: # make that weight double the largest weight w[ np.argmin( w ) ] = np.max( w ) * 2.0 # normalize w /= np.sum( w ) # calculate and return the weighted mean return np.dot( X[:,2], w )
This function ties together the ones listed above. First we find the nearest neighbors of a given test well, and then separate them into sample test and sample offset sets. If there is a surfeit of sample test data, I’ll re-sample it, perform hypothesis testing, and return the maximum p-value.
def spztest( S, idx, n ): # find the n nearest neighbors X = nneighbors( S, idx, n ) # separate into test and offset wells A, B = separate( X ) T = list() # there are more test wells than offset wells if A.shape[0] > B.shape[0]: # take five random samples of size n for i in range( 5 ): T.append( np.array( random.sample( A, n ) ) ) else: T.append( A ) pv = list() # for each random sample... for t in T: # lump the test and offset data together xx = np.vstack( ( t, B ) ) # calculate the standard deviation of the test and offset wells sd = np.std( xx[:,2] ) # let the center be the center of mass of the test wells cntr = ( np.mean( t[:,0] ), np.mean( t[:,1] ) ) # inverse distance weighted mean function tm = invw_mean( t, cntr ) Bm = invw_mean( B, cntr ) # perform classical hypothesis test # null hypothesis is that the means are equal pv.append( ztest( tm, mu=Bm, sigma=sd, n=n ) ) # return the maximum p-value return np.max( pv )
Visualization and Analysis
To test the artificial data, I’ll loop through the An
test wells in the data set S
and collect the p-values of each test.
mx = list() # recall, the first An points are the test wells for i in range( An ): smx, smd = spztest( S, i, 10 ) mx.append( smx ) mx
Here are the p-values returned by a test:
[0.063200612925140162, 0.46020042787985055, 0.34728701624228769, 0.28354222559091974, 0.19241053536792413, 0.41836201073014556, 0.2482599810294695, 0.074972425124645359, 3.6310642413222143e-05, 0.47695328351426147, 0.48227773825391373, 0.21425353985247148, 0.19364552150046321, 0.23721202001137376, 0.48310373126710204]
We see that only one fell below the 0.05 significance level. Below is a plot of these values. The grey scale runs from zero (black) to 0.05 (white). This means that test wells failing to reject the null hypothesis will have a black border and white center, while test wells that reject the null hypothesis of equal mean production will have a grey or black center. Offset wells are shown in green, as before.
fig, ax = subplots() ax.scatter( B[:,0], B[:,1], marker='o', edgecolor='g', facecolor='none', alpha=0.75 ) ax.scatter( A[:,0], A[:,1], marker='o', c=mx, cmap='gray', vmin=0.0, vmax=0.05 ) ;
As we can see, fourteen out of fifteen of the test wells fail to reject the null hypothesis that there is no difference in production, while one rejects this hypothesis. This seems pretty reasonable.