在 python 中检查某个点是否位于多边形内的最快方法是什么
- 2025-01-07 08:44:00
- admin 原创
- 191
问题描述:
我发现了两种主要方法来查看一个点是否属于多边形内部。一种是使用这里使用的光线追踪方法,这是最推荐的答案,另一种是使用 matplotlib path.contains_points
(对我来说有点晦涩)。我将不得不连续检查很多点。有人知道这两种方法中哪一种比另一种更值得推荐,或者是否有更好的第三种选择吗?
我检查了这两种方法,matplotlib 看起来速度要快得多。
from time import time
import numpy as np
import matplotlib.path as mpltPath
# regular polygon for testing
lenpoly = 100
polygon = [[np.sin(x)+0.5,np.cos(x)+0.5] for x in np.linspace(0,2*np.pi,lenpoly)[:-1]]
# random points set of points to test
N = 10000
points = np.random.rand(N,2)
# Ray tracing
def ray_tracing_method(x,y,poly):
n = len(poly)
inside = False
p1x,p1y = poly[0]
for i in range(n+1):
p2x,p2y = poly[i % n]
if y > min(p1y,p2y):
if y <= max(p1y,p2y):
if x <= max(p1x,p2x):
if p1y != p2y:
xints = (y-p1y)*(p2x-p1x)/(p2y-p1y)+p1x
if p1x == p2x or x <= xints:
inside = not inside
p1x,p1y = p2x,p2y
return inside
start_time = time()
inside1 = [ray_tracing_method(point[0], point[1], polygon) for point in points]
print("Ray Tracing Elapsed time: " + str(time()-start_time))
# Matplotlib mplPath
start_time = time()
path = mpltPath.Path(polygon)
inside2 = path.contains_points(points)
print("Matplotlib contains_points Elapsed time: " + str(time()-start_time))
由此得出,
Ray Tracing Elapsed time: 0.441395998001
Matplotlib contains_points Elapsed time: 0.00994491577148
使用三角形而不是 100 边多边形也得到了相同的相对差异。我还将检查 shapely,因为它看起来是一个专门用于解决此类问题的软件包。
解决方案 1:
您可以考虑shapely:
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon
point = Point(0.5, 0.5)
polygon = Polygon([(0, 0), (0, 1), (1, 1), (1, 0)])
print(polygon.contains(point))
从您提到的方法中,我只使用了第二种方法path.contains_points
,并且效果很好。无论如何,根据您测试所需的精度,我建议创建一个 numpy bool 网格,其中多边形内的所有节点都为 True(如果不是,则为 False)。如果您要对很多点进行测试,这可能会更快(但请注意,这取决于您在“像素”容差内进行测试):
from matplotlib import path
import matplotlib.pyplot as plt
import numpy as np
first = -3
size = (3-first)/100
xv,yv = np.meshgrid(np.linspace(-3,3,100),np.linspace(-3,3,100))
p = path.Path([(0,0), (0, 1), (1, 1), (1, 0)]) # square with legs length 1 and bottom left corner at the origin
flags = p.contains_points(np.hstack((xv.flatten()[:,np.newaxis],yv.flatten()[:,np.newaxis])))
grid = np.zeros((101,101),dtype='bool')
grid[((xv.flatten()-first)/size).astype('int'),((yv.flatten()-first)/size).astype('int')] = flags
xi,yi = np.random.randint(-300,300,100)/100,np.random.randint(-300,300,100)/100
vflag = grid[((xi-first)/size).astype('int'),((yi-first)/size).astype('int')]
plt.imshow(grid.T,origin='lower',interpolation='nearest',cmap='binary')
plt.scatter(((xi-first)/size).astype('int'),((yi-first)/size).astype('int'),c=vflag,cmap='Greens',s=90)
plt.show()
结果是这样的:
解决方案 2:
如果您需要的是速度,并且额外的依赖项不是问题,那么您可能会发现它numba
非常有用(现在在任何平台上都很容易安装)。您提出的经典方法可以通过使用装饰器并将多边形转换为 numpy 数组ray_tracing
轻松移植。代码应如下所示:numba
`numba @jit`
@jit(nopython=True)
def ray_tracing(x,y,poly):
n = len(poly)
inside = False
p2x = 0.0
p2y = 0.0
xints = 0.0
p1x,p1y = poly[0]
for i in range(n+1):
p2x,p2y = poly[i % n]
if y > min(p1y,p2y):
if y <= max(p1y,p2y):
if x <= max(p1x,p2x):
if p1y != p2y:
xints = (y-p1y)*(p2x-p1x)/(p2y-p1y)+p1x
if p1x == p2x or x <= xints:
inside = not inside
p1x,p1y = p2x,p2y
return inside
第一次执行将比任何后续调用花费更长的时间:
%%time
polygon=np.array(polygon)
inside1 = [numba_ray_tracing_method(point[0], point[1], polygon) for point in points]
CPU times: user 129 ms, sys: 4.08 ms, total: 133 ms
Wall time: 132 ms
经过编译后将减少到:
CPU times: user 18.7 ms, sys: 320 µs, total: 19.1 ms
Wall time: 18.4 ms
如果你需要在第一次调用函数时提高速度,那么可以使用 预编译模块中的代码pycc
。将函数存储在 src.py 中,如下所示:
from numba import jit
from numba.pycc import CC
cc = CC('nbspatial')
@cc.export('ray_tracing', 'b1(f8, f8, f8[:,:])')
@jit(nopython=True)
def ray_tracing(x,y,poly):
n = len(poly)
inside = False
p2x = 0.0
p2y = 0.0
xints = 0.0
p1x,p1y = poly[0]
for i in range(n+1):
p2x,p2y = poly[i % n]
if y > min(p1y,p2y):
if y <= max(p1y,p2y):
if x <= max(p1x,p2x):
if p1y != p2y:
xints = (y-p1y)*(p2x-p1x)/(p2y-p1y)+p1x
if p1x == p2x or x <= xints:
inside = not inside
p1x,p1y = p2x,p2y
return inside
if __name__ == "__main__":
cc.compile()
构建python src.py
并运行:
import nbspatial
import numpy as np
lenpoly = 100
polygon = [[np.sin(x)+0.5,np.cos(x)+0.5] for x in
np.linspace(0,2*np.pi,lenpoly)[:-1]]
# random points set of points to test
N = 10000
# making a list instead of a generator to help debug
points = zip(np.random.random(N),np.random.random(N))
polygon = np.array(polygon)
%%time
result = [nbspatial.ray_tracing(point[0], point[1], polygon) for point in points]
CPU times: user 20.7 ms, sys: 64 µs, total: 20.8 ms
Wall time: 19.9 ms
在我使用的 numba 代码中:b1(f8, f8, f8[:,:])
为了进行编译nopython=True
,每个变量都需要在循环之前声明for
。
在预构建 src 代码中,有以下行:
@cc.export('ray_tracing' , 'b1(f8, f8, f8[:,:])')
用于声明函数名称及其 I/O var 类型、一个布尔输出b1
和两个浮点数f8
以及一个二维浮点数组f8[:,:]
作为输入。
对于我的用例,我需要检查多个点是否位于单个多边形内 - 在这种情况下,利用 numba 并行功能循环遍历一系列点很有用。上面的示例可以更改为:
from numba import jit, njit
import numba
import numpy as np
@jit(nopython=True)
def pointinpolygon(x,y,poly):
n = len(poly)
inside = False
p2x = 0.0
p2y = 0.0
xints = 0.0
p1x,p1y = poly[0]
for i in numba.prange(n+1):
p2x,p2y = poly[i % n]
if y > min(p1y,p2y):
if y <= max(p1y,p2y):
if x <= max(p1x,p2x):
if p1y != p2y:
xints = (y-p1y)*(p2x-p1x)/(p2y-p1y)+p1x
if p1x == p2x or x <= xints:
inside = not inside
p1x,p1y = p2x,p2y
return inside
@njit(parallel=True)
def parallelpointinpolygon(points, polygon):
D = np.empty(len(points), dtype=numba.boolean)
for i in numba.prange(0, len(D)):
D[i] = pointinpolygon(points[i,0], points[i,1], polygon)
return D
注意:预编译上述代码不会启用 numba 的并行功能(编译不支持并行 CPU 目标pycc/AOT
)请参阅:https://github.com/numba/numba/issues/3336
测试:
import numpy as np
lenpoly = 100
polygon = [[np.sin(x)+0.5,np.cos(x)+0.5] for x in np.linspace(0,2*np.pi,lenpoly)[:-1]]
polygon = np.array(polygon)
N = 10000
points = np.random.uniform(-1.5, 1.5, size=(N, 2))
对于N=10000
72 核机器,返回:
%%timeit
parallelpointinpolygon(points, polygon)
# 480 µs ± 8.19 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
跟进 @mehdi 所做的比较,我在下面添加了一种基于 GPU 的方法。它使用point_in_polygon
来自cuspatial
库的方法:
import numpy as np
import cudf
import cuspatial
N = 100000002
lenpoly = 1000
polygon = [[np.sin(x)+0.5,np.cos(x)+0.5] for x in
np.linspace(0,2*np.pi,lenpoly)]
polygon = np.array(polygon)
points = np.random.uniform(-1.5, 1.5, size=(N, 2))
x_pnt = points[:,0]
y_pnt = points[:,1]
x_poly =polygon[:,0]
y_poly = polygon[:,1]
result = cuspatial.point_in_polygon(
x_pnt,
y_pnt,
cudf.Series([0], index=['geom']),
cudf.Series([0], name='r_pos', dtype='int32'),
x_poly,
y_poly,
)
按照@Mehdi 的比较。对于N=100000002
和lenpoly=1000
- 我得到了以下结果:
time_parallelpointinpolygon: 161.54760098457336
time_mpltPath: 307.1664695739746
time_ray_tracing_numpy_numba: 353.07356882095337
time_is_inside_sm_parallel: 37.45389246940613
time_is_inside_postgis_parallel: 127.13793849945068
time_is_inside_rapids: 4.246025562286377
硬件规格:
CPU 英特尔至强E1240
显卡 Nvidia GTX 1070
笔记:
该
cuspatial.point_in_poligon
方法非常强大,可以处理多个复杂的多边形(我猜是以牺牲性能为代价的)这些方法也可以在GPU
numba
上“移植” ——看看包括移植到@Mehdi()提到的最快方法的比较将会很有趣。cuda
`is_inside_sm`
解决方案 3:
不同方法的比较
我找到了其他方法来检查某个点是否位于多边形内(此处)。我只测试了其中两个(is_inside_sm 和 is_inside_postgis),结果与其他方法相同。
感谢@epifanio,我将代码并行化,并将其与@epifanio 和@user3274748 (ray_tracing_numpy) 方法进行了比较。请注意,这两种方法都有一个错误,因此我修复了它们,如下面的代码所示。
我还发现,用于创建多边形的代码不会生成封闭路径np.linspace(0,2*np.pi,lenpoly)[:-1]
。因此,上述 GitHub 存储库中提供的代码可能无法正常工作。因此,最好创建一条封闭路径(第一个点和最后一个点应该相同)。
代码
方法 1: parallelpointinpolygon
from numba import jit, njit
import numba
import numpy as np
@jit(nopython=True)
def pointinpolygon(x,y,poly):
n = len(poly)
inside = False
p2x = 0.0
p2y = 0.0
xints = 0.0
p1x,p1y = poly[0]
for i in numba.prange(n+1):
p2x,p2y = poly[i % n]
if y > min(p1y,p2y):
if y <= max(p1y,p2y):
if x <= max(p1x,p2x):
if p1y != p2y:
xints = (y-p1y)*(p2x-p1x)/(p2y-p1y)+p1x
if p1x == p2x or x <= xints:
inside = not inside
p1x,p1y = p2x,p2y
return inside
@njit(parallel=True)
def parallelpointinpolygon(points, polygon):
D = np.empty(len(points), dtype=numba.boolean)
for i in numba.prange(0, len(D)): #<-- Fixed here, must start from zero
D[i] = pointinpolygon(points[i,0], points[i,1], polygon)
return D
方法 2: ray_tracing_numpy_numba
@jit(nopython=True)
def ray_tracing_numpy_numba(points,poly):
x,y = points[:,0], points[:,1]
n = len(poly)
inside = np.zeros(len(x),np.bool_)
p2x = 0.0
p2y = 0.0
p1x,p1y = poly[0]
for i in range(n+1):
p2x,p2y = poly[i % n]
idx = np.nonzero((y > min(p1y,p2y)) & (y <= max(p1y,p2y)) & (x <= max(p1x,p2x)))[0]
if len(idx): # <-- Fixed here. If idx is null skip comparisons below.
if p1y != p2y:
xints = (y[idx]-p1y)*(p2x-p1x)/(p2y-p1y)+p1x
if p1x == p2x:
inside[idx] = ~inside[idx]
else:
idxx = idx[x[idx] <= xints]
inside[idxx] = ~inside[idxx]
p1x,p1y = p2x,p2y
return inside
方法 3: Matplotlib contains_points
path = mpltPath.Path(polygon,closed=True) # <-- Very important to mention that the path
# is closed (default is false)
方法 4: is_inside_sm(从这里获取)
@jit(nopython=True)
def is_inside_sm(polygon, point):
length = len(polygon)-1
dy2 = point[1] - polygon[0][1]
intersections = 0
ii = 0
jj = 1
while ii<length:
dy = dy2
dy2 = point[1] - polygon[jj][1]
# consider only lines which are not completely above/bellow/right from the point
if dy*dy2 <= 0.0 and (point[0] >= polygon[ii][0] or point[0] >= polygon[jj][0]):
# non-horizontal line
if dy<0 or dy2<0:
F = dy*(polygon[jj][0] - polygon[ii][0])/(dy-dy2) + polygon[ii][0]
if point[0] > F: # if line is left from the point - the ray moving towards left, will intersect it
intersections += 1
elif point[0] == F: # point on line
return 2
# point on upper peak (dy2=dx2=0) or horizontal line (dy=dy2=0 and dx*dx2<=0)
elif dy2==0 and (point[0]==polygon[jj][0] or (dy==0 and (point[0]-polygon[ii][0])*(point[0]-polygon[jj][0])<=0)):
return 2
ii = jj
jj += 1
#print 'intersections =', intersections
return intersections & 1
@njit(parallel=True)
def is_inside_sm_parallel(points, polygon):
ln = len(points)
D = np.empty(ln, dtype=numba.boolean)
for i in numba.prange(ln):
D[i] = is_inside_sm(polygon,points[i])
return D
方法 5: is_inside_postgis(从这里获取)
@jit(nopython=True)
def is_inside_postgis(polygon, point):
length = len(polygon)
intersections = 0
dx2 = point[0] - polygon[0][0]
dy2 = point[1] - polygon[0][1]
ii = 0
jj = 1
while jj<length:
dx = dx2
dy = dy2
dx2 = point[0] - polygon[jj][0]
dy2 = point[1] - polygon[jj][1]
F =(dx-dx2)*dy - dx*(dy-dy2);
if 0.0==F and dx*dx2<=0 and dy*dy2<=0:
return 2;
if (dy>=0 and dy2<0) or (dy2>=0 and dy<0):
if F > 0:
intersections += 1
elif F < 0:
intersections -= 1
ii = jj
jj += 1
#print 'intersections =', intersections
return intersections != 0
@njit(parallel=True)
def is_inside_postgis_parallel(points, polygon):
ln = len(points)
D = np.empty(ln, dtype=numba.boolean)
for i in numba.prange(ln):
D[i] = is_inside_postgis(polygon,points[i])
return D
基准
1000万积分时间:
parallelpointinpolygon Elapsed time: 4.0122294425964355
Matplotlib contains_points Elapsed time: 14.117807388305664
ray_tracing_numpy_numba Elapsed time: 7.908452272415161
sm_parallel Elapsed time: 0.7710440158843994
is_inside_postgis_parallel Elapsed time: 2.131121873855591
这是代码。
import matplotlib.pyplot as plt
import matplotlib.path as mpltPath
from time import time
import numpy as np
np.random.seed(2)
time_parallelpointinpolygon=[]
time_mpltPath=[]
time_ray_tracing_numpy_numba=[]
time_is_inside_sm_parallel=[]
time_is_inside_postgis_parallel=[]
n_points=[]
for i in range(1, 10000002, 1000000):
n_points.append(i)
lenpoly = 100
polygon = [[np.sin(x)+0.5,np.cos(x)+0.5] for x in np.linspace(0,2*np.pi,lenpoly)]
polygon = np.array(polygon)
N = i
points = np.random.uniform(-1.5, 1.5, size=(N, 2))
#Method 1
start_time = time()
inside1=parallelpointinpolygon(points, polygon)
time_parallelpointinpolygon.append(time()-start_time)
# Method 2
start_time = time()
path = mpltPath.Path(polygon,closed=True)
inside2 = path.contains_points(points)
time_mpltPath.append(time()-start_time)
# Method 3
start_time = time()
inside3=ray_tracing_numpy_numba(points,polygon)
time_ray_tracing_numpy_numba.append(time()-start_time)
# Method 4
start_time = time()
inside4=is_inside_sm_parallel(points,polygon)
time_is_inside_sm_parallel.append(time()-start_time)
# Method 5
start_time = time()
inside5=is_inside_postgis_parallel(points,polygon)
time_is_inside_postgis_parallel.append(time()-start_time)
plt.plot(n_points,time_parallelpointinpolygon,label='parallelpointinpolygon')
plt.plot(n_points,time_mpltPath,label='mpltPath')
plt.plot(n_points,time_ray_tracing_numpy_numba,label='ray_tracing_numpy_numba')
plt.plot(n_points,time_is_inside_sm_parallel,label='is_inside_sm_parallel')
plt.plot(n_points,time_is_inside_postgis_parallel,label='is_inside_postgis_parallel')
plt.xlabel("N points")
plt.ylabel("time (sec)")
plt.legend(loc = 'best')
plt.show()
结论
最快的算法是:
是否在 sm_parallel 内
是否在 postgis 内部并行
平行点多边形(@epifanio)
解决方案 4:
您的测试很好,但它仅测量一些特定情况:我们有一个具有许多顶点的多边形,以及一长串点来检查多边形内的情况。
此外,我认为你测量的不是 matplotlib-inside-polygon-method 与 ray-method,而是 matplotlib-somehow-optimized-iteration 与 simple-list-iteration
我们进行 N 次独立比较(N 对点和多边形)?
# ... your code...
lenpoly = 100
polygon = [[np.sin(x)+0.5,np.cos(x)+0.5] for x in np.linspace(0,2*np.pi,lenpoly)[:-1]]
M = 10000
start_time = time()
# Ray tracing
for i in range(M):
x,y = np.random.random(), np.random.random()
inside1 = ray_tracing_method(x,y, polygon)
print "Ray Tracing Elapsed time: " + str(time()-start_time)
# Matplotlib mplPath
start_time = time()
for i in range(M):
x,y = np.random.random(), np.random.random()
inside2 = path.contains_points([[x,y]])
print "Matplotlib contains_points Elapsed time: " + str(time()-start_time)
结果:
Ray Tracing Elapsed time: 0.548588991165
Matplotlib contains_points Elapsed time: 0.103765010834
Matplotlib 仍然好得多,但不是好 100 倍。现在让我们尝试更简单的多边形……
lenpoly = 5
# ... same code
结果:
Ray Tracing Elapsed time: 0.0727779865265
Matplotlib contains_points Elapsed time: 0.105288982391
解决方案 5:
我刚刚使用 numpy 重写了上面的代码:
def ray_tracing_numpy(x,y,poly):
n = len(poly)
inside = np.zeros(len(x),np.bool_)
p2x = 0.0
p2y = 0.0
xints = 0.0
p1x,p1y = poly[0]
for i in range(n+1):
p2x,p2y = poly[i % n]
idx = np.nonzero((y > min(p1y,p2y)) & (y <= max(p1y,p2y)) & (x <= max(p1x,p2x)))[0]
if p1y != p2y:
xints = (y[idx]-p1y)*(p2x-p1x)/(p2y-p1y)+p1x
if p1x == p2x:
inside[idx] = ~inside[idx]
else:
idxx = idx[x[idx] <= xints]
inside[idxx] = ~inside[idxx]
p1x,p1y = p2x,p2y
return inside
包裹ray_tracing
进
def ray_tracing_mult(x,y,poly):
return [ray_tracing(xi, yi, poly[:-1,:]) for xi,yi in zip(x,y)]
在100000个点上进行测试,结果:
ray_tracing_mult 0:00:00.850656
ray_tracing_numpy 0:00:00.003769
解决方案 6:
@Tatarize 的 inpoly 实现比我下面的光线追踪算法快几个数量级。对他的实现进行矢量化处理可以使大数加速约 40%
def build_edge_list_vectorized(polygon):
imag = polygon.imag
real = polygon.real
idx = np.arange(len(polygon) - 1)
num_pts = 2 * (len(polygon) - 1)
edge_list = np.empty(num_pts, dtype='complex64')
indexes = np.empty(num_pts, dtype='int32')
imag_compare = imag[:-1] < imag[1:]
real_compare = (imag[:-1] == imag[1:]) & (real[:-1] < real[1:])
mask = imag_compare | real_compare
mask_inv = ~mask
true_idxs = idx[mask]
true_idxs_in_edge_list = true_idxs*2
edge_list[true_idxs_in_edge_list] = polygon[:-1][mask]
indexes[true_idxs_in_edge_list] = true_idxs
edge_list[true_idxs_in_edge_list+1] = polygon[1:][mask]
indexes[true_idxs_in_edge_list+1] = ~true_idxs
false_idxs = idx[mask_inv]
false_idxs_in_edge_list = false_idxs*2
edge_list[false_idxs_in_edge_list] = polygon[:-1][mask_inv]
indexes[false_idxs_in_edge_list] = ~false_idxs
edge_list[false_idxs_in_edge_list+1] = polygon[1:][mask_inv]
indexes[false_idxs_in_edge_list+1] = false_idxs
sorting = np.lexsort((~indexes, edge_list.real, edge_list.imag))
edge_list = edge_list[sorting]
indexes = indexes[sorting]
return edge_list, indexes
def build_scanbeam_vectorized(edge_list, indexes):
imag = edge_list.imag
unique_imag_mask = np.empty(len(edge_list), dtype='bool')
unique_imag_mask[0] = True
unique_imag_mask[1:] = imag[:-1] != imag[1:]
events = imag[unique_imag_mask]
actives_table = []
actives = []
indexes = indexes.tolist()
for is_unique, index in zip(unique_imag_mask, indexes):
if is_unique:
actives_table.append(list(actives))
if index >= 0:
actives.append(index)
else:
actives.remove(~index)
actives_table.append(list(actives))
largest_actives = max(map(len, actives_table))
scan = np.full((len(actives_table), largest_actives), -1, dtype='int32')
for i, active in enumerate(actives_table):
scan[i, :len(active)] = active
return scan, events
def points_in_polygon_vectorized(polygon, points):
points_complex = points[:, 0] + points[:, 1] * 1j
polygon_complex = polygon[:, 0] + polygon[:, 1] * 1j
edge_list, indexes = build_edge_list_vectorized(polygon_complex)
scan, events = build_scanbeam_vectorized(edge_list, indexes)
pts_y = points_complex.imag
idx = np.searchsorted(events, pts_y)
actives = scan[idx]
a = polygon_complex[actives]
b = polygon_complex[actives + 1]
mask_non_actives = actives == -1
complex_nan = np.nan + np.nan * 1j
a = np.where(mask_non_actives, complex_nan, a)
b = np.where(mask_non_actives, complex_nan, b)
old_np_seterr = np.seterr(invalid="ignore", divide="ignore")
try:
# If horizontal slope is undefined. But, all x-ints are at x since x0=x1
m = (b.imag - a.imag) / (b.real - a.real)
y0 = a.imag - (m * a.real)
x_intercepts = np.where(~np.isinf(m), (points_complex.imag[:,None] - y0) / m, a.real)
finally:
np.seterr(**old_np_seterr)
results = np.sum(x_intercepts <= points_complex.real[:,None], axis=1)
results %= 2
mask = results.astype('bool')
return mask
对于尺寸为 1000 的多边形,1 mil 点的计时:
旧光线追踪算法:20.5 秒(由于 CPU 较新,时间比我以前的算法要短)
@Tatarize 的实现:0.19 秒
@Tatarize 的矢量化版本:0.12s
较慢的旧光线追踪算法
奇偶规则的纯 numpy 矢量化实现
其他答案要么是缓慢的 Python 循环,要么需要外部依赖或 cython 处理。
import numpy as np
def points_in_polygon(polygon, pts):
pts = np.asarray(pts,dtype='float32')
polygon = np.asarray(polygon,dtype='float32')
contour2 = np.vstack((polygon[1:], polygon[:1]))
test_diff = contour2-polygon
mask1 = (pts[:,None] == polygon).all(-1).any(-1)
m1 = (polygon[:,1] > pts[:,None,1]) != (contour2[:,1] > pts[:,None,1])
slope = ((pts[:,None,0]-polygon[:,0])*test_diff[:,1])-(test_diff[:,0]*(pts[:,None,1]-polygon[:,1]))
m2 = slope == 0
mask2 = (m1 & m2).any(-1)
m3 = (slope < 0) != (contour2[:,1] < polygon[:,1])
m4 = m1 & m3
count = np.count_nonzero(m4,axis=-1)
mask3 = ~(count%2==0)
mask = mask1 | mask2 | mask3
return mask
N = 1000000
lenpoly = 1000
polygon = [[np.sin(x)+0.5,np.cos(x)+0.5] for x in np.linspace(0,2*np.pi,lenpoly)]
polygon = np.array(polygon,dtype='float32')
points = np.random.uniform(-1.5, 1.5, size=(N, 2)).astype('float32')
mask = points_in_polygon(polygon, points)
1000 尺寸多边形的 1 mil 点耗时 44 秒。
它比其他实现慢几个数量级,但仍然比 python 循环快,并且仅使用 numpy。
解决方案 7:
inpoly
是在 Python 中进行多边形检查的黄金标准,并且可以处理大量查询:
https://github.com/dengwirda/inpoly-python
简单用法:
from inpoly import inpoly2
import numpy as np
xmin, xmax, ymin, ymax = 0, 1, 0, 1
x0, y0, x1, y1 = 0.5, 0.5, 0, 1
#define any n-sided polygon
p = np.array([[xmin, ymin],
[xmax, ymin],
[xmax, ymax],
[xmin, ymax],
[xmin, ymin]])
#define some coords
coords = np.array([[x0, y0],
[x1, y1]])
#get boolean mask for points if in or on polygon perimeter
isin, ison = inpoly2(coords, p)
后端的 C 实现速度极快
解决方案 8:
渐近最快的算法如下。
def build_edge_list(polygon):
edge_list = []
for i in range(0, len(polygon) - 1):
if (polygon[i].imag, polygon[i].real) < (
polygon[i + 1].imag,
polygon[i + 1].real,
):
edge_list.append((polygon[i], i))
edge_list.append((polygon[i + 1], ~i))
else:
edge_list.append((polygon[i], ~i))
edge_list.append((polygon[i + 1], i))
def sort_key(e):
return e[0].imag, e[0].real, ~e[1]
edge_list.sort(key=sort_key)
return edge_list
def build_scanbeam(edge_list):
actives = []
actives_table = []
events = []
y = -float("inf")
for pt, index in edge_list:
if y != pt.imag:
actives_table.append(list(actives))
events.append(pt.imag)
if index >= 0:
actives.append(index)
else:
actives.remove(~index)
y = pt.imag
actives_table.append(list(actives))
largest_actives = max([len(a) for a in actives_table])
scan = np.zeros((len(actives_table), largest_actives), dtype=int)
scan -= 1
for i, active in enumerate(actives_table):
scan[i, 0 : len(active)] = active
return scan, events
def points_in_polygon(polygon, point):
edge_list = build_edge_list(polygon)
scan, events = build_scanbeam(edge_list)
pts_y = np.imag(point)
idx = np.searchsorted(events, pts_y)
actives = scan[idx]
a = polygon[actives]
b = polygon[actives + 1]
a = np.where(actives == -1, np.nan + np.nan * 1j, a)
b = np.where(actives == -1, np.nan + np.nan * 1j, b)
old_np_seterr = np.seterr(invalid="ignore", divide="ignore")
try:
# If horizontal slope is undefined. But, all x-ints are at x since x0=x1
m = (b.imag - a.imag) / (b.real - a.real)
y0 = a.imag - (m * a.real)
ys = np.reshape(np.repeat(np.imag(point), y0.shape[1]), y0.shape)
x_intercepts = np.where(~np.isinf(m), (ys - y0) / m, a.real)
finally:
np.seterr(**old_np_seterr)
xs = np.reshape(np.repeat(np.real(point), y0.shape[1]), y0.shape)
results = np.sum(x_intercepts <= xs, axis=1)
results %= 2
return results
这实际上更快(在时间复杂度上比 inpoly 更快)。尽管 C++ 意味着它实际上直到达到数万亿个多边形时才会更快。(但是,我们说的是 1 个数量级,而不是接近)。
import time
import numpy as np
def points_in_polygon_Ta946(polygon, pts):
pts = np.asarray(pts, dtype="float32")
polygon = np.asarray(polygon, dtype="float32")
contour2 = np.vstack((polygon[1:], polygon[:1]))
test_diff = contour2 - polygon
mask1 = (pts[:, None] == polygon).all(-1).any(-1)
m1 = (polygon[:, 1] > pts[:, None, 1]) != (contour2[:, 1] > pts[:, None, 1])
slope = ((pts[:, None, 0] - polygon[:, 0]) * test_diff[:, 1]) - (
test_diff[:, 0] * (pts[:, None, 1] - polygon[:, 1])
)
m2 = slope == 0
mask2 = (m1 & m2).any(-1)
m3 = (slope < 0) != (contour2[:, 1] < polygon[:, 1])
m4 = m1 & m3
count = np.count_nonzero(m4, axis=-1)
mask3 = ~(count % 2 == 0)
mask = mask1 | mask2 | mask3
return mask
def build_edge_list(polygon):
edge_list = []
for i in range(0, len(polygon) - 1):
if (polygon[i].imag, polygon[i].real) < (
polygon[i + 1].imag,
polygon[i + 1].real,
):
edge_list.append((polygon[i], i))
edge_list.append((polygon[i + 1], ~i))
else:
edge_list.append((polygon[i], ~i))
edge_list.append((polygon[i + 1], i))
def sort_key(e):
return e[0].imag, e[0].real, ~e[1]
edge_list.sort(key=sort_key)
return edge_list
def build_scanbeam(edge_list):
actives = []
actives_table = []
events = []
y = -float("inf")
for pt, index in edge_list:
if y != pt.imag:
actives_table.append(list(actives))
events.append(pt.imag)
if index >= 0:
actives.append(index)
else:
actives.remove(~index)
y = pt.imag
actives_table.append(list(actives))
largest_actives = max([len(a) for a in actives_table])
scan = np.zeros((len(actives_table), largest_actives), dtype=int)
scan -= 1
for i, active in enumerate(actives_table):
scan[i, 0: len(active)] = active
return scan, events
def points_in_polygon(polygon, point):
edge_list = build_edge_list(polygon)
scan, events = build_scanbeam(edge_list)
pts_y = np.imag(point)
idx = np.searchsorted(events, pts_y)
actives = scan[idx]
a = polygon[actives]
b = polygon[actives + 1]
a = np.where(actives == -1, np.nan + np.nan * 1j, a)
b = np.where(actives == -1, np.nan + np.nan * 1j, b)
old_np_seterr = np.seterr(invalid="ignore", divide="ignore")
try:
# If horizontal slope is undefined. But, all x-ints are at x since x0=x1
m = (b.imag - a.imag) / (b.real - a.real)
y0 = a.imag - (m * a.real)
ys = np.reshape(np.repeat(np.imag(point), y0.shape[1]), y0.shape)
x_intercepts = np.where(~np.isinf(m), (ys - y0) / m, a.real)
finally:
np.seterr(**old_np_seterr)
xs = np.reshape(np.repeat(np.real(point), y0.shape[1]), y0.shape)
results = np.sum(x_intercepts <= xs, axis=1)
results %= 2
return results
N = 50000
lenpoly = 1000
polygon = [
[np.sin(x) + 0.5, np.cos(x) + 0.5]
for x in np.linspace(0, 2 * np.pi, lenpoly)
]
polygon = np.array(polygon, dtype="float32")
points = np.random.uniform(-1.5, 1.5, size=(N, 2)).astype("float32")
t = time.time()
mask = points_in_polygon_Ta946(polygon, points)
t1 = time.time() - t
# Convert to correct format.
t = time.time()
points = points[:, 0] + points[:, 1] * 1j
pg = polygon[:, 0] + polygon[:, 1] * 1j
r = points_in_polygon(pg, points)
t2 = time.time() - t
for i in range(N):
assert(bool(mask[i]) == bool(r[i]))
try:
print(
f"geomstr points in poly took {t2} seconds. Raytraced-numpy took {t1}. Speed-up {t1 / t2}x"
)
except ZeroDivisionError:
pass
在纯 Python(使用 numpy)的答案中,Ta946 的答案是迄今为止最好的。但是,作为答案,它仍然在进行光线追踪,这最终是一种速度慢得多的算法,O(NlogN)。inpoly
正在使用一种O(log(N)*K)
算法,而这里的算法是 O(log(N)log(K)) 算法。虽然 K 几乎总是 N 大小的一小部分,所以很难击败该解决方案在 C++ 中运行它。
geomstr points in poly took 0.03299069404602051 seconds. Raytraced-numpy took 3.781247615814209. Speed-up 114.61558251971121x
这是在 50k 个点时,样本量越大,差异就越明显。
解决方案 9:
你可以使用Geopandas sjoin。可能是最简单的
import geopandas as gpd
gpd.sjoin(points, polygon, op = 'within')
扫码咨询,免费领取项目管理大礼包!