numpy np.apply_along_axis 函数加速吗?
- 2025-04-10 09:45:00
- admin 原创
- 19
问题描述:
np.apply_along_axis() 函数似乎非常慢(15 分钟后没有输出)。有没有一种快速的方法可以在长数组上执行此函数而无需并行化操作?我特别指的是具有数百万个元素的数组。
下面是我尝试做的一个例子。请忽略 my_func 的简单定义,目标不是将数组乘以 55(当然,这无论如何都可以在原地完成),而是一个示例。实际上,my_func 稍微复杂一些,需要额外的参数,因此 a 的每个元素都会被不同地修改,即不仅仅是乘以 55。
>>> def my_func(a):
... return a[0]*55
>>> a = np.ones((200000000,1))
>>> np.apply_along_axis(my_func, 1, a)
编辑:
a = np.ones((20,1))
def my_func(a, i,j):
... b = np.zeros((2,2))
... b[0,0] = a[i]
... b[1,0] = a[i]
... b[0,1] = a[i]
... b[1,1] = a[j]
... return linalg.eigh(b)
>>> my_func(a,1,1)
(array([ 0., 2.]), array([[-0.70710678, 0.70710678],
[ 0.70710678, 0.70710678]]))
解决方案 1:
np.apply_along_axis
不是为了速度。
除了 AST 重写之外,没有办法将纯 Python函数应用于 Numpy 数组的每个元素,而无需多次调用它......
幸运的是,有解决方案:
向量化
虽然这通常很难,但通常是简单的解决方案。找到某种方式来表达您的计算,使其能够概括所有元素,这样您就可以一次处理整个矩阵。这将导致循环从 Python 中提升到高度优化的 C 和 Fortran 例程中。
JITing:Numba和Parakeet,以及较小程度上的PyPy和NumPyPy
Numba 和 Parakeet 都处理 Numpy 数据结构上的 JITing 循环,因此如果您将循环内联到函数中(这可以是包装函数),则可以几乎免费地获得巨大的速度提升。不过,这取决于所使用的数据结构。
符号求值器,例如Theano和numexpr
这些允许您使用嵌入式语言来表达计算,这甚至比矢量化版本更快。
Cython和C 扩展
如果一切都丢失了,您可以随时手动深入到 C。Cython 隐藏了很多复杂性并且还有很多可爱的魔力,所以它并不总是那么糟糕(尽管它有助于了解你在做什么)。
干得好。
这是我的测试“环境”(你真的应该提供这个:P):
import itertools
import numpy
a = numpy.arange(200).reshape((200,1)) ** 2
def my_func(a, i,j):
b = numpy.zeros((2,2))
b[0,0] = a[i]
b[1,0] = a[i]
b[0,1] = a[i]
b[1,1] = a[j]
return numpy.linalg.eigh(b)
eigvals = {}
eigvecs = {}
for i, j in itertools.combinations(range(a.size), 2):
eigvals[i, j], eigvecs[i, j] = my_func(a,i,j)
现在,获取所有排列而不是组合变得容易得多,因为您只需这样做即可:
# All *permutations*, not combinations
indexes = numpy.mgrid[:a.size, :a.size]
这看起来似乎有些浪费,但排列数只有两倍,所以没什么大不了的。
所以我们想使用这些索引来获取相关元素:
# Remove the extra dimension; it's not wanted here!
subs = a[:,0][indexes]
然后我们就可以创建矩阵:
target = numpy.array([
[subs[0], subs[0]],
[subs[0], subs[1]]
])
我们需要矩阵位于最后两个维度:
target.shape
#>>> (2, 2, 200, 200)
target = numpy.swapaxes(target, 0, 2)
target = numpy.swapaxes(target, 1, 3)
target.shape
#>>> (200, 200, 2, 2)
我们可以检查它是否有效:
target[10, 20]
#>>> array([[100, 100],
#>>> [100, 400]])
耶!
那么我们只需运行numpy.linalg.eigh
:
values, vectors = numpy.linalg.eigh(target)
看看,它成功了!
values[10, 20]
#>>> array([ 69.72243623, 430.27756377])
eigvals[10, 20]
#>>> array([ 69.72243623, 430.27756377])
因此我想您可能想要将它们连接起来:
numpy.concatenate([values[row, row+1:] for row in range(len(values))])
#>>> array([[ 0.00000000e+00, 1.00000000e+00],
#>>> [ 0.00000000e+00, 4.00000000e+00],
#>>> [ 0.00000000e+00, 9.00000000e+00],
#>>> ...,
#>>> [ 1.96997462e+02, 7.78160025e+04],
#>>> [ 3.93979696e+02, 7.80160203e+04],
#>>> [ 1.97997475e+02, 7.86070025e+04]])
numpy.concatenate([vectors[row, row+1:] for row in range(len(vectors))])
#>>> array([[[ 1. , 0. ],
#>>> [ 0. , 1. ]],
#>>>
#>>> [[ 1. , 0. ],
#>>> [ 0. , 1. ]],
#>>>
#>>> [[ 1. , 0. ],
#>>> [ 0. , 1. ]],
#>>>
#>>> ...,
#>>> [[-0.70890372, 0.70530527],
#>>> [ 0.70530527, 0.70890372]],
#>>>
#>>> [[-0.71070503, 0.70349013],
#>>> [ 0.70349013, 0.71070503]],
#>>>
#>>> [[-0.70889463, 0.7053144 ],
#>>> [ 0.7053144 , 0.70889463]]])
也可以在之后执行这个连接循环以numpy.mgrid
将工作量减半:
# All *permutations*, not combinations
indexes = numpy.mgrid[:a.size, :a.size]
# Convert to all *combinations* and reduce the dimensionality
indexes = numpy.concatenate([indexes[:, row, row+1:] for row in range(indexes.shape[1])], axis=1)
# Remove the extra dimension; it's not wanted here!
subs = a[:,0][indexes]
target = numpy.array([
[subs[0], subs[0]],
[subs[0], subs[1]]
])
target = numpy.rollaxis(target, 2)
values, vectors = numpy.linalg.eigh(target)
是的,最后一个样本就是您所需要的。
解决方案 2:
因此,a 中的每个元素都会被不同地修改
如果数组元素之间没有连接,那么 Veedracs 的回答总结了典型的策略。然而,通常可以找到一种矢量化方案,可以大大加快计算速度。如果您提供函数本身的相关代码片段,我们可以为您提供更有用的答案。
编辑:
以下代码说明了如何对示例函数进行矢量化。虽然它并不完整(块矩阵和特征值检索),但它应该为您提供了一些有关如何做到这一点的基本想法。查看函数中的每个矩阵和子矩阵,了解如何设置这样的计算。此外,我使用了密集矩阵,当使用数百万个元素a
和大量索引对时,它很可能不适合内存。但计算过程中的大多数矩阵都是稀疏的。因此,您始终可以将代码转换为使用稀疏矩阵。该函数现在采用向量a
和索引向量pairs
。
import numpy as np
def my_func(a,pairs):
#define mask matrix
g=np.zeros((4,2))
g[:3,0]=1
g[3,1]=1
# k is the number of index pairs which need calculation
k=pairs.shape[0]
h=np.kron(np.eye(k),g)
b=np.dot(h,a[pairs.ravel()[:2*k]]) # this matrix product generates your matrix b
b.shape=-1,2
out = np.zeros((2*k,2*k)) # pre allocate memory of the block diagonal matrix
# make block diagonal matrix
for i in xrange(k):
out[i*2:(i+1)*2, i*2:(i+1)*2] = b[i*2:(i+1)*2,:]
res = np.linalg.eigh(out) # the eigenvalues of each 2by2 matrix are the same as the ones of one large block diagonal matrix
# unfortunately eigh sorts the eigenvalues
# to retrieve the correct pairs of eigenvalues
# for each submatrix b, one has to inspect res[1] and pick
# corresponding eigenvalues
# I leave that for you, remember out=res[1] diag(res[0]) res[1].T
return res
#vektor a
a=np.arange(20)
#define index pairs for calculation
pairs=np.asarray([[1,3],[2,7],[1,7],[2,3]])
print my_func(a,pairs)
扫码咨询,免费领取项目管理大礼包!