scipy 计算具有循环(周期性)边界条件的2D numpy数组中点之间距离的最快代码

q0qdq0h2  于 2023-10-20  发布在  其他
关注(0)|答案(5)|浏览(181)

我知道如何使用scipy.spatial.distance.cdist计算数组中点之间的欧几里得距离
类似于这个问题的答案:Calculate Distances Between One Point in Matrix From All Other Points
然而,我想做的计算假设 * 循环边界条件 *,例如。所以在这种情况下,点[0,0]与点[0,n-1]距离为1,而不是距离n-1。(然后我将为目标细胞阈值距离内的所有点制作一个遮罩,但这不是问题的核心)。
我能想到的唯一方法是重复计算9次,域索引在x,y和x&y方向上增加/减去n,然后堆叠结果并在9个切片上找到最小值。为了说明9次重复的必要性,我将一个简单的示意图放在一起,只有1个J点,用圆圈标记,并显示了一个例子,在这种情况下,三角形标记的单元格在左上角反射的域中具有最近的邻居。

这是我使用cdist开发的代码:

import numpy as np
from scipy import spatial
    
n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
i=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
j=np.argwhere(a>0.85) # set of J locations to find distance to.

# this will be used in the KDtree soln 
global maxdist
maxdist=2.0

def dist_v1(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:
            jo=j.copy()
            jo[:,0]-=xoff
            jo[:,1]-=yoff
            dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1)) 
    dist=np.amin(np.stack(dist),0).reshape([n,n])
    return(dist)

这是可行的,并产生例如。:

print(dist_v1(i,j))

[[1.41421356 1.         1.41421356 1.41421356 1.        ]
 [2.23606798 2.         1.41421356 1.         1.41421356]
 [2.         2.         1.         0.         1.        ]
 [1.41421356 1.         1.41421356 1.         1.        ]
 [1.         0.         1.         1.         0.        ]]

0显然标记了J点,距离是正确的(这个编辑纠正了我以前的尝试,这是不正确的)。
请注意,如果您更改最后两行以堆叠原始距离,然后仅使用一个最小值,如下所示:

def dist_v2(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:
            jo=j.copy()
            jo[:,0]-=xoff
            jo[:,1]-=yoff
            dist.append(spatial.distance.cdist(i,jo,metric='euclidean')) 
    dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
    return(dist)

对于小的n(<10),它更快,但是对于大的数组(n>10),它相当慢
...但无论哪种方式,对于我的大型数组(N=500,J点数约为70),它都很慢,这种搜索占用了大约99%的计算时间,(使用循环也有点难看)-有更好/更快的方法吗?
我想到的其他选择是:
1.scipy.spatial.KDTree.query_ball_point
通过进一步搜索,我发现有一个函数scipy.spatial.KDTree.query_ball_point可以直接计算我的J点半径内的坐标,但它似乎没有任何使用周期边界的功能,所以我认为仍然需要使用3x 3循环,堆栈,然后像我上面那样使用amin,所以我不确定这是否会更快。
我使用这个函数编码了一个解决方案,而不用担心周期性边界条件(即,这并没有回答我的问题)

def dist_v3(n,j):
    x, y = np.mgrid[0:n, 0:n]
    points = np.c_[x.ravel(), y.ravel()]
    tree=spatial.KDTree(points)
    mask=np.zeros([n,n])
    for results in tree.query_ball_point((j), maxdist):
        mask[points[results][:,0],points[results][:,1]]=1
    return(mask)

也许我没有以最有效的方式使用它,但即使没有周期性边界,这也已经和我基于cdist的解决方案一样慢了。在两个cdist解决方案中包括掩码函数,即将这些函数中的return(dist)替换为return(np.where(dist<=maxdist,1,0)),然后使用timeit,我得到了以下n=100的时间:

from timeit import timeit

print("cdist v1:",timeit(lambda: dist_v1(i,j), number=3)*100)
print("cdist v2:",timeit(lambda: dist_v2(i,j), number=3)*100)
print("KDtree:", timeit(lambda: dist_v3(n,j), number=3)*100)

cdist v1: 181.80927299981704
cdist v2: 554.8205785999016
KDtree: 605.119637199823

1.在[0,0]的设定距离内为点制作一个相对坐标数组,然后手动循环J点,用这个相对点列表设置一个掩码-这有一个优点,即“相对距离”计算只执行一次(我的J点每个时间步都会改变),但我怀疑循环会非常慢。
1.为2D域中的每个点预先计算一组掩码,因此在模型集成的每个时间步中,我只需为J点选择掩码并应用。这将使用大量的内存(与n^4成比例),也许仍然很慢,因为你需要循环J点来合并组合掩码。

rsaldnfx

rsaldnfx1#

我将从图像处理的Angular 展示另一种方法,您可能会感兴趣,不管它是否是最快的。为了方便起见,我只对奇数n实现了它。
与其考虑一组nxni,不如考虑nxn框。我们可以将其视为二进制图像。假设j中的每个点都是该图像中的正像素。对于n=5,它看起来像这样:

现在让我们考虑图像处理的另一个概念:膨胀。对于任何输入像素,如果其neighborhood中有一个正像素,则输出像素将为1。这个邻域由所谓的Structuring Element定义:一个布尔核,其中的1将显示要考虑哪些邻居。
下面是我如何定义这个问题的SE:

Y, X = np.ogrid[-n:n+1, -n:n+1]
SQ = X*X+Y*Y

H = SQ == r

直观地说,H是一个掩码,表示从中心到满足方程x*x+y*y=r的所有点。也就是说,H中的所有点都位于距离中心sqrt(r)的位置。再看一个图像,它会非常清晰:

这是一个不断扩大的像素圈。每个掩模中的每个白色像素表示与中心像素的距离正好为sqrt(r)的点。你也可以看出,如果我们迭代地增加r的值,我们实际上会稳定地覆盖特定位置周围的所有像素位置,最终覆盖整个图像。(某些r值不会给出给予响应,因为任何一对点都不存在这样的距离sqrt(r)。我们跳过那些r值-比如3。
这就是主算法的作用。

  • 我们将r的值从0开始递增到某个上限。
  • 在每一步中,如果图像中的任何位置(x,y)对膨胀做出响应,这意味着有一个j点与它的距离正好是sqrt(r)!
  • 我们可以多次找到匹配的;我们将只保留第一场比赛,并放弃更多的比赛得分。我们这样做,直到所有像素(x,y)都找到了它们的最小距离/第一个匹配。

所以你可以说这个算法依赖于nxn图像中唯一距离对的数量。
这也意味着,如果你在j中有越来越多的点,算法实际上会变得更快,这违背了常识!
这种膨胀算法的最坏情况是当你有最少数量的点(正好是j中的一个点)时,因为这样它就需要将bjr设置为一个非常高的值才能从很远的点获得匹配。
在执行方面:

n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
I=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
J=np.argwhere(a>0.85)

Y, X = np.ogrid[-n:n+1, -n:n+1]
SQ = X*X+Y*Y

point_space = np.zeros((n, n))
point_space[J[:,0], J[:,1]] = 1

C1 = point_space[:, :n//2]
C2 = point_space[:, n//2+1:]
C = np.hstack([C2, point_space, C1])

D1 = point_space[:n//2, :]
D2 = point_space[n//2+1:, :]
D2_ = np.hstack([point_space[n//2+1:, n//2+1:],D2,point_space[n//2+1:, :n//2]])
D1_ = np.hstack([point_space[:n//2:, n//2+1:],D1,point_space[:n//2, :n//2]])
D = np.vstack([D2_, C, D1_])
p = (3*n-len(D))//2
D = np.pad(D, (p,p), constant_values=(0,0))

plt.imshow(D, cmap='gray')
plt.title(f'n={n}')

如果你看n=5的图像,你可以知道我做了什么;我只是简单地用四个象限填充图像,以表示循环空间,然后添加一些额外的零填充来考虑最坏情况下的搜索边界。

@nb.jit
def dilation(image, output, kernel, N, i0, i1):
    for i in range(i0,i1):
        for j in range(i0, i1):
            a_0 = i-(N//2)
            a_1 = a_0+N
            b_0 = j-(N//2)
            b_1 = b_0+N
            neighborhood = image[a_0:a_1, b_0:b_1]*kernel
            if np.any(neighborhood):
                output[i-i0,j-i0] = 1
    return output

@nb.njit(cache=True)
def progressive_dilation(point_space, out, total, dist, SQ, n, N_):
    for i in range(N_):
        if not np.any(total): 
            break
            
        H = SQ == i

        rows, cols = np.nonzero(H)
        if len(rows) == 0: continue
        
        rmin, rmax = rows.min(), rows.max()
        cmin, cmax = cols.min(), cols.max()

        H_ = H[rmin:rmax+1, cmin:cmax+1]
        
        out[:] = False 
        out = dilation(point_space, out, H_, len(H_), n, 2*n)
        
        idx = np.logical_and(out, total)
        
        for a, b in  zip(*np.where(idx)):
            dist[a, b] = i
        
        total = total * np.logical_not(out)
    return dist

def dilateWrap(D, SQ, n):
    out = np.zeros((n,n), dtype=bool)
    total = np.ones((n,n), dtype=bool)
    dist=-1*np.ones((n,n))
    dist = progressive_dilation(D, out, total, dist, SQ, n, 2*n*n+1)
    return dist

dout = dilateWrap(D, SQ, n)

如果我们把dout可视化,我们实际上可以得到一个很棒的距离的视觉表示。

黑点基本上是存在j点的位置。最亮的点自然意味着离任何j最远的点。请注意,我将值保持平方形式以获得整数图像。实际距离仍然是一个平方根。结果与球场算法的输出相匹配。

# after resetting n = 501 and rerunning the first block

N = J.copy()
NE = J.copy()
E = J.copy()
SE = J.copy()
S = J.copy()
SW = J.copy()
W = J.copy()
NW = J.copy()

N[:,1] = N[:,1] - n
NE[:,0] = NE[:,0] - n
NE[:,1] = NE[:,1] - n
E[:,0] = E[:,0] - n
SE[:,0] = SE[:,0] - n
SE[:,1] = SE[:,1] + n
S[:,1] = S[:,1] + n
SW[:,0] = SW[:,0] + n
SW[:,1] = SW[:,1] + n
W[:,0] = W[:,0] + n
NW[:,0] = NW[:,0] + n
NW[:,1] = NW[:,1] - n

def distBP(I,J):
    tree = BallTree(np.concatenate([J,N,E,S,W,NE,SE,SW,NW]), leaf_size=15, metric='euclidean')
    dist = tree.query(I, k=1, return_distance=True)
    minimum_distance = dist[0].reshape(n,n)
    return minimum_distance

print(np.array_equal(distBP(I,J), np.sqrt(dilateWrap(D, SQ, n))))

输出:

True

现在进行时间检查,n=501。

from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: dilateWrap(D, SQ, n),number=nl))

输出:

ball tree: 1.1706031339999754
dilation: 1.086665302000256

我会说它们大致相等,尽管膨胀有一个非常微小的边缘。事实上,膨胀仍然缺少一个平方根运算,让我们加上它。

from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))

输出:

ball tree: 1.1712950239998463
dilation: 1.092416919000243

平方根基本上对时间的影响可以忽略不计。
我之前说过,当j* 中有更多的点时,* 膨胀会变快。所以让我们增加j的点数。

n=501 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
I=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
J=np.argwhere(a>0.4) # previously a>0.85

现在查看时间:

from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))

输出:

ball tree: 3.3354218500007846
dilation: 0.2178608220001479

球树实际上变慢了,而膨胀变快了!这是因为如果有许多j点,我们可以通过几次重复的膨胀快速找到所有的距离。我发现这种效果相当有趣--通常你会期望运行时随着点数的增加而变差,但这里发生了相反的情况。
相反,如果我们减小j,我们会看到膨胀变慢:

#Setting a>0.9
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))

输出:

ball tree: 1.010353464000218
dilation: 1.4776274510004441

我认为我们可以安全地得出结论,基于卷积或内核的方法在这个特定的问题上会提供更好的收益,而不是基于点或树的方法。
最后,我在一开始就提到过,我会再提一次:这整个实现仅考虑了n的奇数值;我没有耐心去计算偶数的适当填充。(如果你熟悉图像处理,你可能以前遇到过这个问题:所有的事情都容易与奇数大小)。
这也可以进一步优化,因为我只是一个偶尔涉猎数字。

bxjv4tth

bxjv4tth2#

[编辑] -我发现代码跟踪工作完成点的方式中有一个错误,用mask_kernel修复了它。新代码的纯python版本慢1.5倍,但numba版本稍微快一些(由于其他一些优化)。
【当前最佳:~ 100倍至120倍原始速度]
首先,感谢您提交这个问题,我有很多的乐趣优化它!
我目前的最佳解决方案依赖于这样的假设,即网格是规则的,并且“源”点(我们需要计算距离的点)大致均匀分布。
这里的想法是,所有的距离都将是1,sqrt(2)sqrt(3),.这样我们就可以提前做数值计算了。然后我们简单地将这些值放在一个矩阵中,并将该矩阵复制到每个源点(并确保保持在每个点上找到的最小值)。这涵盖了绝大多数要点(>99%)。然后,我们对剩下的1%应用另一种更“经典”的方法。
代码如下:

import numpy as np

def sq_distance(x1, y1, x2, y2, n): 
    # computes the pairwise squared distance between 2 sets of points (with periodicity)
    # x1, y1 : coordinates of the first set of points (source)
    # x2, y2 : same
    dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
    dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
    d  = (dx*dx + dy*dy)
    return d

def apply_kernel(sources, sqdist, kern_size, n, mask):
    ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
    kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
    mask_kernel = kernel > kern_size**2

    for pi, pj in sources:
        ind_i = (pi+ker_i)%n
        ind_j = (pj+ker_j)%n
        sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
        mask[ind_i,ind_j] *= mask_kernel

def dist_vf(sources, n, kernel_size):
    sources = np.asfortranarray(sources) #for memory contiguity

    kernel_size = min(kernel_size, n//2)
    kernel_size = max(kernel_size, 1)

    sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
    mask   = np.ones((n,n), dtype=bool)              #which points have not been reached?

    #main code
    apply_kernel(sources, sqdist, kernel_size, n, mask) 

    #remaining points
    rem_i, rem_j = np.nonzero(mask)
    if len(rem_i) > 0:
        sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
        sqdist[rem_i, rem_j] = sq_d

    #eff = 1-rem_i.size/n**2
    #print("covered by kernel :", 100*eff, "%")
    #print("overlap :", sources.shape[0]*(1+2*kernel_size)**2/n**2)
    #print()

    return np.sqrt(sqdist)

测试此版本

n=500  # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
all_points=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
source_points=np.argwhere(a>1-70/n**2) # set of J locations to find distance to.

#
# code for dist_v1 and dist_vf
#

overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)

print("cdist v1      :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel version:", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

cdist v1      : 1148.6694 ms
kernel version: 69.21876999999998 ms

这已经是一个~ 17倍的加速!我还实现了一个numba版本的sq_distanceapply_kernel:[这是新的正确版本]

@njit(cache=True)
def sq_distance(x1, y1, x2, y2, n):
    m1 = x1.size
    m2 = x2.size
    n2 = n//2
    d = np.empty((m1,m2), dtype=np.int32)
    for i in range(m1):
        for j in range(m2):
            dx = np.abs(x1[i] - x2[j] + n2)%n - n2
            dy = np.abs(y1[i] - y2[j] + n2)%n - n2
            d[i,j]  = (dx*dx + dy*dy)
    return d

@njit(cache=True)
def apply_kernel(sources, sqdist, kern_size, n, mask):
    # creating the kernel
    kernel = np.empty((2*kern_size+1, 2*kern_size+1))
    vals = np.arange(-kern_size, kern_size+1)**2
    for i in range(2*kern_size+1):
        for j in range(2*kern_size+1):
            kernel[i,j] = vals[i] + vals[j]
    mask_kernel = kernel > kern_size**2

    I = sources[:,0]
    J = sources[:,1]

    # applying the kernel for each point
    for l in range(sources.shape[0]):
        pi = I[l]
        pj = J[l]

        if pj - kern_size >= 0 and pj + kern_size<n: #if we are in the middle, no need to do the modulo for j
            for i in range(2*kern_size+1):
                ind_i = np.mod((pi+i-kern_size), n)
                for j in range(2*kern_size+1):
                    ind_j = (pj+j-kern_size)
                    sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
                    mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]

        else:
            for i in range(2*kern_size+1):
                ind_i = np.mod((pi+i-kern_size), n)
                for j in range(2*kern_size+1):
                    ind_j = np.mod((pj+j-kern_size), n)
                    sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
                    mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
    return

并测试,

overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)

print("cdist v1                :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel numba (first run):", timeit(lambda: dist_vf(source_points, n, kernel_size), number=1)*1000, "ms") #first run = cimpilation = long
print("kernel numba            :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

得到了以下结果

cdist v1                : 1163.0742 ms
kernel numba (first run): 2060.0802 ms
kernel numba            : 8.80377000000001 ms

由于JIT编译,第一次运行非常慢,但除此之外,它是120倍的改进!
通过调整kernel_size参数(或overlap),可以从该算法中获得更多信息。当前选择的kernel_size仅对少量源点有效。例如,这个选择在source_points=np.argwhere(a>0.85)(13秒)中失败,而手动设置kernel_size=5在22毫秒内给出答案。

  • 我希望我的帖子不会(不必要地)太复杂,我真的不知道如何更好地组织它。
    [编辑2]

我对代码的非numba部分给予了更多的关注,并设法获得了相当显著的加速,非常接近numba可以实现的目标:下面是函数apply_kernel的新版本:

def apply_kernel(sources, sqdist, kern_size, n, mask):
    ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
    ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))

    kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
    mask_kernel = kernel > kern_size**2

    for pi, pj in sources:

        imin = pi-kern_size
        jmin = pj-kern_size
        imax = pi+kern_size+1
        jmax = pj+kern_size+1
        if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
            sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
            mask[imin:imax,jmin:jmax] *= mask_kernel

        elif imax < n and imin >=0:
            ind_j = (pj+ker_j.ravel())%n
            sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
            mask[imin:imax,ind_j] *= mask_kernel

        elif jmax < n and jmin >=0:
            ind_i = (pi+ker_i.ravel())%n
            sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
            mask[ind_i,jmin:jmax] *= mask_kernel

        else :
            ind_i = (pi+ker_i)%n
            ind_j = (pj+ker_j)%n
            sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
            mask[ind_i,ind_j] *= mask_kernel

主要的优化是

  • 使用切片索引(而不是密集数组)
  • 使用稀疏索引(我怎么没想到这一点)

测试与

overlap=5.4
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)

print("cdist v1  :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel v2 :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

cdist v1  : 1209.8163000000002 ms
kernel v2 : 11.319049999999997 ms

这是一个很好的100倍的cdist改进,一个~ 5.5倍的改进,比以前的numpy-only版本,只是~25%慢,我可以实现与numba。

hpcdzsge

hpcdzsge3#

这里有一个固定版本的代码和一个不同的方法,这是一个有点快。他们给予相同的结果,所以我有理由相信他们是正确的:

import numpy as np
from scipy.spatial.distance import squareform, pdist, cdist
from numpy.linalg import norm

def pb_OP(A, p=1.0):
    distl = []
    for *offs, ct in [(0, 0, 0), (0, p, 1), (p, 0, 1), (p, p, 1), (-p, p, 1)]:
        B = A - offs
        distl.append(cdist(B, A, metric='euclidean'))
        if ct:
            distl.append(distl[-1].T)
    return np.amin(np.dstack(distl), 2)

def pb_pp(A, p=1.0):
    out = np.empty((2, A.shape[0]*(A.shape[0]-1)//2))
    for o, i in zip(out, A.T):
        pdist(i[:, None], 'cityblock', out=o)
    out[out > p/2] -= p
    return squareform(norm(out, axis=0))

test = np.random.random((1000, 2))

assert np.allclose(pb_OP(test), pb_pp(test))

from timeit import timeit
t_OP = timeit(lambda: pb_OP(test), number=10)*100
t_pp = timeit(lambda: pb_pp(test), number=10)*100
print('OP', t_OP)
print('pp', t_pp)

样本运行,1000点:

OP 210.11001259903423
pp 22.288734700123314

我们看到我的方法快了9倍,巧合的是,这是OP版本必须检查的偏移配置的数量。它在各个坐标上使用pdist来获得绝对差。当这些大于网格间距的一半时,我们减去一个周期。剩下的就是取欧几里德范数和解包存储。

u1ehiz5o

u1ehiz5o4#

对于计算多个距离,我认为很难击败一个简单的BallTree(或类似的)。
我不太理解循环边界,或者至少为什么你需要循环3x3次,因为我看到它的行为像环面,它足以制作5个副本。更新:确实你需要3x3的边缘。我更新了代码。
为了确保我的minimum_distance是正确的,对n = 200做了一个np.all( minimum_distance == dist_v1(i,j) )测试,得到了True
对于用提供的代码生成的n = 500,用于冷启动的%%time给出了

CPU times: user 1.12 s, sys: 0 ns, total: 1.12 s
Wall time: 1.11 s

所以我生成了500个数据点,

import numpy as np

n=500 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
i=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
j=np.argwhere(a>0.85) # set of J locations to find distance to.

使用球树

import numpy as np
from sklearn.neighbors import BallTree

N = j.copy()
NE = j.copy()
E = j.copy()
SE = j.copy()
S = j.copy()
SW = j.copy()
W = j.copy()
NW = j.copy()

N[:,1] = N[:,1] - n
NE[:,0] = NE[:,0] - n
NE[:,1] = NE[:,1] - n
E[:,0] = E[:,0] - n
SE[:,0] = SE[:,0] - n
SE[:,1] = SE[:,1] + n
S[:,1] = S[:,1] + n
SW[:,0] = SW[:,0] + n
SW[:,1] = SW[:,1] + n
W[:,0] = W[:,0] + n
NW[:,0] = NW[:,0] + n
NW[:,1] = NW[:,1] - n

tree = BallTree(np.concatenate([j,N,E,S,W,NE,SE,SW,NW]), leaf_size=15, metric='euclidean')

dist = tree.query(i, k=1, return_distance=True)

minimum_distance = dist[0].reshape(n,n)

更新:
注意这里我把数据复制到框的N,E,S,W,NE,SE,NW,SE来处理边界条件。同样,对于n = 200,这给出了相同的结果。你可以调整leaf_size,但我觉得这个设置是好的。
性能对j中的点数敏感。

tpxzln5u

tpxzln5u5#

这是我计时的8种不同的解决方案,有些是我自己的,有些是针对我的问题发布的,使用4种广泛的方法:
1.空间CDIST
1.空间KD树

  1. Sklearn BallTree
    1.核方法
    这是包含8个测试例程的代码:
import numpy as np
from scipy import spatial
from sklearn.neighbors import BallTree

n=500 # size of 2D box
f=200./(n*n) # first number is rough number of target cells...

np.random.seed(1) # to make reproducable
a=np.random.uniform(size=(n,n))
i=np.argwhere(a>-1)  # all points, we want to know distance to nearest point
j=np.argwhere(a>1.0-f) # set of locations to find distance to.

# long array of 3x3 j points:
for xoff in [0,n,-n]:
    for yoff in [0,-n,n]:
        if xoff==0 and yoff==0:
            j9=j.copy()
        else:
            jo=j.copy()
            jo[:,0]+=xoff
            jo[:,1]+=yoff
            j9=np.vstack((j9,jo))

global maxdist
maxdist=10
overlap=5.2
kernel_size=int(np.sqrt(overlap*n**2/j.shape[0])/2)

print("no points",len(j))

# repear cdist over each member of 3x3 block
def dist_v1(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:ds=Dataset(path_file)
            jo=j.copy()
            jo[:,0]+=xoff
            jo[:,1]+=yoff
            dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1))
    dist=np.amin(np.stack(dist),0).reshape([n,n])
    #dmask=np.where(dist<=maxdist,1,0)
    return(dist)

# same as v1, but taking one amin function at the end
def dist_v2(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:
            jo=j.copy()
            jo[:,0]+=xoff
            jo[:,1]+=yoff
            dist.append(spatial.distance.cdist(i,jo,metric='euclidean'))
    dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
    #dmask=np.where(dist<=maxdist,1,0)
    return(dist)

# using a KDTree query ball points, looping over j9 points as in online example
def dist_v3(n,j):
    x,y=np.mgrid[0:n,0:n]
    points=np.c_[x.ravel(), y.ravel()]
    tree=spatial.KDTree(points)
    mask=np.zeros([n,n])
    for results in tree.query_ball_point((j), 2.1):
        mask[points[results][:,0],points[results][:,1]]=1
    return(mask)

# using ckdtree query on the j9 long array
def dist_v4(i,j):
    tree=spatial.cKDTree(j)
    dist,minid=tree.query(i)
    return(dist.reshape([n,n]))

# back to using Cdist, but on the long j9 3x3 array, rather than on each element separately
def dist_v5(i,j):
    # 3x3 search required for periodic boundaries.
    dist=np.amin(spatial.distance.cdist(i,j,metric='euclidean'),1)
    #dmask=np.where(dist<=maxdist,1,0)
    return(dist)

def dist_v6(i,j):
    tree = BallTree(j,leaf_size=5,metric='euclidean')
    dist = tree.query(i, k=1, return_distance=True)
    mindist = dist[0].reshape(n,n)
    return(mindist)

def sq_distance(x1, y1, x2, y2, n):
    # computes the pairwise squared distance between 2 sets of points (with periodicity)
    # x1, y1 : coordinates of the first set of points (source)
    # x2, y2 : same
    dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
    dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
    d  = (dx*dx + dy*dy)
    return d

def apply_kernel1(sources, sqdist, kern_size, n, mask):
    ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
    kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
    mask_kernel = kernel > kern_size**2

    for pi, pj in sources:
        ind_i = (pi+ker_i)%n
        ind_j = (pj+ker_j)%n
        sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
        mask[ind_i,ind_j] *= mask_kernel

def apply_kernel2(sources, sqdist, kern_size, n, mask):
    ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
    ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))

    kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
    mask_kernel = kernel > kern_size**2

    for pi, pj in sources:

        imin = pi-kern_size
        jmin = pj-kern_size
        imax = pi+kern_size+1
        jmax = pj+kern_size+1
        if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
            sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
            mask[imin:imax,jmin:jmax] *= mask_kernel

        elif imax < n and imin >=0:
            ind_j = (pj+ker_j.ravel())%n
            sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
            mask[imin:imax,ind_j] *= mask_kernel

        elif jmax < n and jmin >=0:
            ind_i = (pi+ker_i.ravel())%n
            sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
            mask[ind_i,jmin:jmax] *= mask_kernel

        else :
            ind_i = (pi+ker_i)%n
            ind_j = (pj+ker_j)%n
            sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
            mask[ind_i,ind_j] *= mask_kernel

def dist_v7(sources, n, kernel_size,method):
    sources = np.asfortranarray(sources) #for memory contiguity
    kernel_size = min(kernel_size, n//2)
    kernel_size = max(kernel_size, 1)
    sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
    mask   = np.ones((n,n), dtype=bool)              #which points have not been reached?
    #main code
    if (method==1):
        apply_kernel1(sources, sqdist, kernel_size, n, mask)
    else:
        apply_kernel2(sources, sqdist, kernel_size, n, mask)
      
    #remaining points
    rem_i, rem_j = np.nonzero(mask)
    if len(rem_i) > 0:
        sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
        sqdist[rem_i, rem_j] = sq_d
    return np.sqrt(sqdist)


from timeit import timeit
nl=10
print ("-----------------------")
print ("Timings for ",nl,"loops")
print ("-----------------------")
print("1. cdist looped amin:",timeit(lambda: dist_v1(i,j),number=nl))
print("2. cdist single amin:",timeit(lambda: dist_v2(i,j),number=nl))
print("3. KDtree ball pt:", timeit(lambda: dist_v3(n,j9),number=nl))
print("4. KDtree query:",timeit(lambda: dist_v4(i,j9),number=nl))
print("5. cdist long list:",timeit(lambda: dist_v5(i,j9),number=nl))
print("6. ball tree:",timeit(lambda: dist_v6(i,j9),number=nl))
print("7. kernel orig:", timeit(lambda: dist_v7(j, n, kernel_size,1), number=nl))
print("8. kernel optimised:", timeit(lambda: dist_v7(j, n, kernel_size,2), number=nl))

在我的Linux 12核心桌面(48 GB RAM)上的输出(以秒为单位),n=350和63点:

no points 63
-----------------------
Timings for  10 loops
-----------------------
1. cdist looped amin: 3.2488364999881014
2. cdist single amin: 6.494611179979984
3. KDtree ball pt: 5.180531410995172
4. KDtree query: 0.9377906009904109
5. cdist long list: 3.906166430999292
6. ball tree: 3.3540162370190956
7. kernel orig: 0.7813036740117241
8. kernel optimised: 0.17046571199898608

对于n=500和npts=176:

no points 176
-----------------------
Timings for  10 loops
-----------------------
1. cdist looped amin: 16.787221198988846
2. cdist single amin: 40.97849371898337
3. KDtree ball pt: 9.926229109987617
4. KDtree query: 0.8417396580043714
5. cdist long list: 14.345821461000014
6. ball tree: 1.8792325239919592
7. kernel orig: 1.0807358759921044
8. kernel optimised: 0.5650744160229806

因此,总结起来,我得出了以下结论:
1.如果你有一个相当大的问题,避免使用cdist,因为与其他方法相比,它非常慢(超过一个数量级!)
1.如果你的问题在计算时间上没有太大的限制,我推荐“KDtree查询”方法,因为它只需要两行没有周期性边界的代码和几行有周期性边界的代码来建立j 9数组。如此简单,如此快速!
1.为了获得最佳性能(例如一个模型的长时间积分,每个时间步都需要,就像我的情况一样),那么Kernal解决方案现在是最快的。更重的代码,但另一个CA。第2章加速

相关问题