Python:基于交集的简单列表合并
- 2025-03-05 09:17:00
- admin 原创
- 88
问题描述:
考虑一些整数列表:
#--------------------------------------
0 [0,1,3]
1 [1,0,3,4,5,10,...]
2 [2,8]
3 [3,1,0,...]
...
n []
#--------------------------------------
问题是合并至少有一个共同元素的列表。因此,仅给定部分的结果将如下:
#--------------------------------------
0 [0,1,3,4,5,10,...]
2 [2,8]
#--------------------------------------
在大数据(元素只是数字)上执行此操作的最有效方法是什么?结构
是tree
需要考虑的问题吗?我现在通过将列表转换为sets
并迭代交集来完成这项工作,但它很慢!此外,我有一种感觉,这太基础了!此外,实现缺少一些东西(未知),因为有些列表有时仍未合并!话虽如此,如果您建议自行实现,请慷慨并提供一个简单的示例代码[显然Python是我最喜欢的 :)] 或伪代码。
更新 1:
这是我使用的代码:
#--------------------------------------
lsts = [[0,1,3],
[1,0,3,4,5,10,11],
[2,8],
[3,1,0,16]];
#--------------------------------------
该功能是(有问题!!):
#--------------------------------------
def merge(lsts):
sts = [set(l) for l in lsts]
i = 0
while i < len(sts):
j = i+1
while j < len(sts):
if len(sts[i].intersection(sts[j])) > 0:
sts[i] = sts[i].union(sts[j])
sts.pop(j)
else: j += 1 #---corrected
i += 1
lst = [list(s) for s in sts]
return lst
#--------------------------------------
结果是:
#--------------------------------------
>>> merge(lsts)
>>> [0, 1, 3, 4, 5, 10, 11, 16], [8, 2]]
#--------------------------------------
更新 2:根据我的经验,下面Niklas Baumstark
给出的代码对于简单情况来说要快一些。尚未测试“Hooked”给出的方法,因为它是完全不同的方法(顺便说一句,它看起来很有趣)。所有这些的测试过程可能非常困难,甚至无法确保结果。我将使用的真实数据集非常大且复杂,因此不可能仅通过重复来跟踪任何错误。也就是说,在将其作为模块推入大型代码之前,我需要 100% 地确信该方法的可靠性。目前,Niklas的方法更快,并且对于简单集合的答案当然是正确的。
但是,我如何确保它适用于真正的大型数据集?因为我无法直观地跟踪错误!
更新 3:
请注意,对于这个问题,方法的可靠性比速度更重要。我希望能够将 Python 代码转换为 Fortran,以获得最佳性能。
更新 4:
这篇文章中有很多有趣的观点,慷慨的答案和建设性的评论。我建议仔细阅读所有内容。请接受我对问题的展开、令人惊叹的答案以及建设性的评论和讨论的赞赏。
解决方案 1:
我的尝试:
def merge(lsts):
sets = [set(lst) for lst in lsts if lst]
merged = True
while merged:
merged = False
results = []
while sets:
common, rest = sets[0], sets[1:]
sets = []
for x in rest:
if x.isdisjoint(common):
sets.append(x)
else:
merged = True
common |= x
results.append(common)
sets = results
return sets
lst = [[65, 17, 5, 30, 79, 56, 48, 62],
[6, 97, 32, 93, 55, 14, 70, 32],
[75, 37, 83, 34, 9, 19, 14, 64],
[43, 71],
[],
[89, 49, 1, 30, 28, 3, 63],
[35, 21, 68, 94, 57, 94, 9, 3],
[16],
[29, 9, 97, 43],
[17, 63, 24]]
print merge(lst)
基准:
import random
# adapt parameters to your own usage scenario
class_count = 50
class_size = 1000
list_count_per_class = 100
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.5
if False: # change to true to generate the test data file (takes a while)
with open("/tmp/test.txt", "w") as f:
lists = []
classes = [
range(class_size * i, class_size * (i + 1)) for i in range(class_count)
]
for c in classes:
# distribute each class across ~300 lists
for i in xrange(list_count_per_class):
lst = []
if random.random() < large_list_probability:
size = random.choice(large_list_sizes)
else:
size = random.choice(small_list_sizes)
nums = set(c)
for j in xrange(size):
x = random.choice(list(nums))
lst.append(x)
nums.remove(x)
random.shuffle(lst)
lists.append(lst)
random.shuffle(lists)
for lst in lists:
f.write(" ".join(str(x) for x in lst) + "
")
setup = """
# Niklas'
def merge_niklas(lsts):
sets = [set(lst) for lst in lsts if lst]
merged = 1
while merged:
merged = 0
results = []
while sets:
common, rest = sets[0], sets[1:]
sets = []
for x in rest:
if x.isdisjoint(common):
sets.append(x)
else:
merged = 1
common |= x
results.append(common)
sets = results
return sets
# Rik's
def merge_rik(data):
sets = (set(e) for e in data if e)
results = [next(sets)]
for e_set in sets:
to_update = []
for i, res in enumerate(results):
if not e_set.isdisjoint(res):
to_update.insert(0, i)
if not to_update:
results.append(e_set)
else:
last = results[to_update.pop(-1)]
for i in to_update:
last |= results[i]
del results[i]
last |= e_set
return results
# katrielalex's
def pairs(lst):
i = iter(lst)
first = prev = item = i.next()
for item in i:
yield prev, item
prev = item
yield item, first
import networkx
def merge_katrielalex(lsts):
g = networkx.Graph()
for lst in lsts:
for edge in pairs(lst):
g.add_edge(*edge)
return networkx.connected_components(g)
# agf's (optimized)
from collections import deque
def merge_agf_optimized(lists):
sets = deque(set(lst) for lst in lists if lst)
results = []
disjoint = 0
current = sets.pop()
while True:
merged = False
newsets = deque()
for _ in xrange(disjoint, len(sets)):
this = sets.pop()
if not current.isdisjoint(this):
current.update(this)
merged = True
disjoint = 0
else:
newsets.append(this)
disjoint += 1
if sets:
newsets.extendleft(sets)
if not merged:
results.append(current)
try:
current = newsets.pop()
except IndexError:
break
disjoint = 0
sets = newsets
return results
# agf's (simple)
def merge_agf_simple(lists):
newsets, sets = [set(lst) for lst in lists if lst], []
while len(sets) != len(newsets):
sets, newsets = newsets, []
for aset in sets:
for eachset in newsets:
if not aset.isdisjoint(eachset):
eachset.update(aset)
break
else:
newsets.append(aset)
return newsets
# alexis'
def merge_alexis(data):
bins = range(len(data)) # Initialize each bin[n] == n
nums = dict()
data = [set(m) for m in data] # Convert to sets
for r, row in enumerate(data):
for num in row:
if num not in nums:
# New number: tag it with a pointer to this row's bin
nums[num] = r
continue
else:
dest = locatebin(bins, nums[num])
if dest == r:
continue # already in the same bin
if dest > r:
dest, r = r, dest # always merge into the smallest bin
data[dest].update(data[r])
data[r] = None
# Update our indices to reflect the move
bins[r] = dest
r = dest
# Filter out the empty bins
have = [m for m in data if m]
return have
def locatebin(bins, n):
while bins[n] != n:
n = bins[n]
return n
lsts = []
size = 0
num = 0
max = 0
for line in open("/tmp/test.txt", "r"):
lst = [int(x) for x in line.split()]
size += len(lst)
if len(lst) > max:
max = len(lst)
num += 1
lsts.append(lst)
"""
setup += """
print "%i lists, {class_count} equally distributed classes, average size %i, max size %i" % (num, size/num, max)
""".format(class_count=class_count)
import timeit
print "niklas"
print timeit.timeit("merge_niklas(lsts)", setup=setup, number=3)
print "rik"
print timeit.timeit("merge_rik(lsts)", setup=setup, number=3)
print "katrielalex"
print timeit.timeit("merge_katrielalex(lsts)", setup=setup, number=3)
print "agf (1)"
print timeit.timeit("merge_agf_optimized(lsts)", setup=setup, number=3)
print "agf (2)"
print timeit.timeit("merge_agf_simple(lsts)", setup=setup, number=3)
print "alexis"
print timeit.timeit("merge_alexis(lsts)", setup=setup, number=3)
这些时间显然取决于基准的具体参数,例如类别数量、列表数量、列表大小等。根据您的需要调整这些参数以获得更有用的结果。
下面是我的机器上针对不同参数的一些示例输出。它们表明,所有算法都有其优点和缺点,具体取决于它们获得的输入类型:
=====================
# many disjoint classes, large lists
class_count = 50
class_size = 1000
list_count_per_class = 100
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.5
=====================
niklas
5000 lists, 50 equally distributed classes, average size 298, max size 999
4.80084705353
rik
5000 lists, 50 equally distributed classes, average size 298, max size 999
9.49251699448
katrielalex
5000 lists, 50 equally distributed classes, average size 298, max size 999
21.5317108631
agf (1)
5000 lists, 50 equally distributed classes, average size 298, max size 999
8.61671280861
agf (2)
5000 lists, 50 equally distributed classes, average size 298, max size 999
5.18117713928
=> alexis
=> 5000 lists, 50 equally distributed classes, average size 298, max size 999
=> 3.73504281044
===================
# less number of classes, large lists
class_count = 15
class_size = 1000
list_count_per_class = 300
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.5
===================
niklas
4500 lists, 15 equally distributed classes, average size 296, max size 999
1.79993700981
rik
4500 lists, 15 equally distributed classes, average size 296, max size 999
2.58237695694
katrielalex
4500 lists, 15 equally distributed classes, average size 296, max size 999
19.5465381145
agf (1)
4500 lists, 15 equally distributed classes, average size 296, max size 999
2.75445604324
=> agf (2)
=> 4500 lists, 15 equally distributed classes, average size 296, max size 999
=> 1.77850699425
alexis
4500 lists, 15 equally distributed classes, average size 296, max size 999
3.23530197144
===================
# less number of classes, smaller lists
class_count = 15
class_size = 1000
list_count_per_class = 300
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.1
===================
niklas
4500 lists, 15 equally distributed classes, average size 95, max size 997
0.773697137833
rik
4500 lists, 15 equally distributed classes, average size 95, max size 997
1.0523750782
katrielalex
4500 lists, 15 equally distributed classes, average size 95, max size 997
6.04466891289
agf (1)
4500 lists, 15 equally distributed classes, average size 95, max size 997
1.20285701752
=> agf (2)
=> 4500 lists, 15 equally distributed classes, average size 95, max size 997
=> 0.714507102966
alexis
4500 lists, 15 equally distributed classes, average size 95, max size 997
1.1286110878
解决方案 2:
我试图总结在这个问题和重复的问题中关于这个主题所说的和做的一切。
我尝试测试并计时每个解决方案(所有代码均在此处)。
测试
这是TestCase
来自测试模块的:
class MergeTestCase(unittest.TestCase):
def setUp(self):
with open('./lists/test_list.txt') as f:
self.lsts = json.loads(f.read())
self.merged = self.merge_func(deepcopy(self.lsts))
def test_disjoint(self):
"""Check disjoint-ness of merged results"""
from itertools import combinations
for a,b in combinations(self.merged, 2):
self.assertTrue(a.isdisjoint(b))
def test_coverage(self): # Credit to katrielalex
"""Check coverage original data"""
merged_flat = set()
for s in self.merged:
merged_flat |= s
original_flat = set()
for lst in self.lsts:
original_flat |= set(lst)
self.assertTrue(merged_flat == original_flat)
def test_subset(self): # Credit to WolframH
"""Check that every original data is a subset"""
for lst in self.lsts:
self.assertTrue(any(set(lst) <= e for e in self.merged))
该测试假设结果为集合列表,因此我无法测试几个适用于列表的解决方案。
我无法测试以下内容:
katrielalex
steabert
在我可以测试的那些中,有两个失败了:
-- Going to test: agf (optimized) --
Check disjoint-ness of merged results ... FAIL
-- Going to test: robert king --
Check disjoint-ness of merged results ... FAIL
定时
性能与所采用的数据测试密切相关。
到目前为止,有三个答案尝试对他们自己的解决方案和其他人的解决方案进行计时。由于他们使用了不同的测试数据,因此得到了不同的结果。
Niklas 基准测试非常灵活。利用他的基准测试,可以更改一些参数进行不同的测试。
我使用了他在自己的答案中使用的相同的三组参数,并将它们放在三个不同的文件中:
filename = './lists/timing_1.txt'
class_count = 50,
class_size = 1000,
list_count_per_class = 100,
large_list_sizes = (100, 1000),
small_list_sizes = (0, 100),
large_list_probability = 0.5,
filename = './lists/timing_2.txt'
class_count = 15,
class_size = 1000,
list_count_per_class = 300,
large_list_sizes = (100, 1000),
small_list_sizes = (0, 100),
large_list_probability = 0.5,
filename = './lists/timing_3.txt'
class_count = 15,
class_size = 1000,
list_count_per_class = 300,
large_list_sizes = (100, 1000),
small_list_sizes = (0, 100),
large_list_probability = 0.1,
以下是我得到的结果:
来自文件:timing_1.txt
Timing with: >> Niklas << Benchmark
Info: 5000 lists, average size 305, max size 999
Timing Results:
10.434 -- alexis
11.476 -- agf
11.555 -- Niklas B.
13.622 -- Rik. Poggi
14.016 -- agf (optimized)
14.057 -- ChessMaster
20.208 -- katrielalex
21.697 -- steabert
25.101 -- robert king
76.870 -- Sven Marnach
133.399 -- hochl
来自文件:timing_2.txt
Timing with: >> Niklas << Benchmark
Info: 4500 lists, average size 305, max size 999
Timing Results:
8.247 -- Niklas B.
8.286 -- agf
8.637 -- Rik. Poggi
8.967 -- alexis
9.090 -- ChessMaster
9.091 -- agf (optimized)
18.186 -- katrielalex
19.543 -- steabert
22.852 -- robert king
70.486 -- Sven Marnach
104.405 -- hochl
来自文件:timing_3.txt
Timing with: >> Niklas << Benchmark
Info: 4500 lists, average size 98, max size 999
Timing Results:
2.746 -- agf
2.850 -- Niklas B.
2.887 -- Rik. Poggi
2.972 -- alexis
3.077 -- ChessMaster
3.174 -- agf (optimized)
5.811 -- katrielalex
7.208 -- robert king
9.193 -- steabert
23.536 -- Sven Marnach
37.436 -- hochl
通过Sven的测试数据,我得到了以下结果:
Timing with: >> Sven << Benchmark
Info: 200 lists, average size 10, max size 10
Timing Results:
2.053 -- alexis
2.199 -- ChessMaster
2.410 -- agf (optimized)
3.394 -- agf
3.398 -- Rik. Poggi
3.640 -- robert king
3.719 -- steabert
3.776 -- Niklas B.
3.888 -- hochl
4.610 -- Sven Marnach
5.018 -- katrielalex
最后通过Agf的基准测试我得到了:
Timing with: >> Agf << Benchmark
Info: 2000 lists, average size 246, max size 500
Timing Results:
3.446 -- Rik. Poggi
3.500 -- ChessMaster
3.520 -- agf (optimized)
3.527 -- Niklas B.
3.527 -- agf
3.902 -- hochl
5.080 -- alexis
15.997 -- steabert
16.422 -- katrielalex
18.317 -- robert king
1257.152 -- Sven Marnach
正如我在开头所说,所有代码都可以在该 git 存储库中找到。所有合并函数都在一个名为的文件中core.py
,其中每个以 结尾的函数都_merge
将在测试期间自动加载,因此添加/测试/改进您自己的解决方案应该不难。
如果有什么问题也请告诉我,因为编码工作量很大,我可能需要一些全新的视角:)
解决方案 3:
使用矩阵操作
让我用以下评论作为这个答案的开头:
这是错误的做法。它容易出现数值不稳定,而且比其他方法慢得多,使用时请自担风险。
话虽如此,我还是忍不住从动态的角度来解决这个问题(我希望你能对这个问题有一个全新的认识)。理论上,这应该一直有效,但特征值计算经常会失败。这个想法是将你的列表视为从行到列的流。如果两行共享一个共同的值,那么它们之间就会有连接流。如果我们把这些流想象成水,我们会看到,当它们之间有一条连接路径时,这些流会聚集成小水池。为简单起见,我将使用一个较小的集合,但它也适用于你的数据集:
from numpy import where, newaxis
from scipy import linalg, array, zeros
X = [[0,1,3],[2],[3,1]]
我们需要将数据转换为流图。如果行i流入值j,我们将其放入矩阵中。这里我们有 3 行和 4 个唯一值:
A = zeros((4,len(X)), dtype=float)
for i,row in enumerate(X):
for val in row: A[val,i] = 1
一般来说,您需要更改4
以捕获您拥有的唯一值的数量。如果集合是从 0 开始的整数列表,就像我们拥有的一样,您可以简单地将其设为最大数字。我们现在执行特征值分解。确切地说是 SVD,因为我们的矩阵不是方阵。
S = linalg.svd(A)
我们只想保留这个答案的 3x3 部分,因为它将代表池的流量。事实上,我们只想要这个矩阵的绝对值;我们只关心这个集群空间中是否存在流量。
M = abs(S[2])
我们可以将此矩阵 M 视为马尔可夫矩阵,并通过行规范化使其显式化。一旦我们有了这个,我们就可以计算此矩阵的(左)特征值分解。
M /= M.sum(axis=1)[:,newaxis]
U,V = linalg.eig(M,left=True, right=False)
V = abs(V)
现在,不连通(非遍历)马尔可夫矩阵具有很好的特性,即对于每个非连通簇,都有一个单位特征值。与这些单位值相关的特征向量就是我们想要的:
idx = where(U > .999)[0]
C = V.T[idx] > 0
由于上述数值不稳定性,我不得不使用 .999。至此,我们完成了!每个独立集群现在可以拉出相应的行:
for cluster in C:
print where(A[:,cluster].sum(axis=1))[0]
正如预期的那样:
[0 1 3]
[2]
更改X
为您的lst
,您将获得:[ 0 1 3 4 5 10 11 16] [2 8]
。
附录
为什么这可能有用?我不知道您的基础数据来自哪里,但是当连接不是绝对时会发生什么?假设行1
有 80% 的时间有条目3
- 您将如何概括这个问题?上面的流方法可以很好地工作,并且将完全由该.999
值参数化,距离统一越远,关联就越松散。
视觉表现
由于一张图片胜过千言万语,因此这里分别给出了我的示例和您的lst
示例的矩阵 A 和 V 的图。请注意如何V
分成两个簇(置换后它是一个具有两个块的块对角矩阵),因为每个示例只有两个唯一的列表!
更快实施
现在回想起来,我意识到你可以跳过 SVD 步骤并只计算一次分解:
M = dot(A.T,A)
M /= M.sum(axis=1)[:,newaxis]
U,V = linalg.eig(M,left=True, right=False)
该方法的优点(除了速度之外)在于它M
现在是对称的,因此计算可以更快、更准确(无需担心虚值)。
解决方案 4:
这是我的答案。我还没有将其与今天的答案进行核对。
基于交集的算法是 O(N^2),因为它们会将每个新集合与所有现有集合进行对比,因此我使用了一种索引每个数字的方法,并且运行时间接近 O(N)(如果我们接受字典查找是 O(1))。然后我运行了基准测试,感觉自己像个十足的白痴,因为它运行得更慢,但仔细检查后发现,测试数据最终只有少数几个不同的结果集,因此二次算法没有太多工作要做。使用超过 10-15 个不同的箱体进行测试,我的算法要快得多。尝试使用超过 50 个不同的箱体进行测试,它的速度要快得多。
(编辑:基准测试的运行方式也存在问题,但我的诊断是错误的。我修改了代码以适应重复测试的运行方式)。
def mergelists5(data):
"""Check each number in our arrays only once, merging when we find
a number we have seen before.
"""
bins = range(len(data)) # Initialize each bin[n] == n
nums = dict()
data = [set(m) for m in data ] # Convert to sets
for r, row in enumerate(data):
for num in row:
if num not in nums:
# New number: tag it with a pointer to this row's bin
nums[num] = r
continue
else:
dest = locatebin(bins, nums[num])
if dest == r:
continue # already in the same bin
if dest > r:
dest, r = r, dest # always merge into the smallest bin
data[dest].update(data[r])
data[r] = None
# Update our indices to reflect the move
bins[r] = dest
r = dest
# Filter out the empty bins
have = [ m for m in data if m ]
print len(have), "groups in result"
return have
def locatebin(bins, n):
"""
Find the bin where list n has ended up: Follow bin references until
we find a bin that has not moved.
"""
while bins[n] != n:
n = bins[n]
return n
解决方案 5:
编辑:好的,其他问题已关闭,发布在这里。
好问题!如果你把它看作图中的连通分量问题,那就简单多了。以下代码使用了出色的networkx
图库和本问题pairs
中的函数。
def pairs(lst):
i = iter(lst)
first = prev = item = i.next()
for item in i:
yield prev, item
prev = item
yield item, first
lists = [[1,2,3],[3,5,6],[8,9,10],[11,12,13]]
import networkx
g = networkx.Graph()
for sub_list in lists:
for edge in pairs(sub_list):
g.add_edge(*edge)
networkx.connected_components(g)
[[1, 2, 3, 5, 6], [8, 9, 10], [11, 12, 13]]
解释
我们创建一个新的(空)图g
。对于中的每个子列表lists
,将其元素视为图的节点并在它们之间添加一条边。(由于我们只关心连通性,因此我们不需要添加所有边 - 只需添加相邻的边!)请注意,它add_edge
接受两个对象,将它们视为节点(如果它们尚不存在,则添加它们),并在它们之间添加一条边。
然后,我们只需找到图的连通分量(一个已解决的问题!)并将它们作为相交集输出。
解决方案 6:
这个新函数只进行最少数量的不相交测试,这是其他类似解决方案无法做到的。它还使用deque
尽可能多地避免线性时间操作,例如列表切片和从列表开头删除。
from collections import deque
def merge(lists):
sets = deque(set(lst) for lst in lists if lst)
results = []
disjoint = 0
current = sets.pop()
while True:
merged = False
newsets = deque()
for _ in xrange(disjoint, len(sets)):
this = sets.pop()
if not current.isdisjoint(this):
current.update(this)
merged = True
disjoint = 0
else:
newsets.append(this)
disjoint += 1
if sets:
newsets.extendleft(sets)
if not merged:
results.append(current)
try:
current = newsets.pop()
except IndexError:
break
disjoint = 0
sets = newsets
return results
与其他函数相比,给定数据集中集合之间的重叠越少,其效果就越好。
以下是一个示例案例。如果您有 4 组,则需要比较:
1、2
1, 3
1, 4
2、3
2、4
3、4
如果 1 与 3 重叠,则需要重新测试 2 以查看它现在是否与 1 重叠,以便安全地跳过对 3 的测试 2。
有两种方法可以解决这个问题。第一种方法是在每次重叠和合并后重新开始对集合 1 和其他集合的测试。第二种方法是继续测试,将 1 与 4 进行比较,然后返回并重新测试。后者会导致不相交测试更少,因为一次测试中会发生更多合并,因此在重新测试时,剩余要测试的集合更少。
问题在于追踪哪些集合需要重新测试。在上面的例子中,1 需要针对 2 重新测试,但不需要针对 4 重新测试,因为在第一次测试 4 之前 1 已经处于当前状态。
计数器disjoint
可以对此进行跟踪。
我的回答对于寻找一种改进的重新编码为 FORTRAN 的算法这一主要问题没有帮助;在我看来,这只是在 Python 中实现该算法的最简单、最优雅的方法。
根据我的测试(或接受答案中的测试),它比下一个最快的解决方案稍微快一点(最多 10%)。
def merge0(lists):
newsets, sets = [set(lst) for lst in lists if lst], []
while len(sets) != len(newsets):
sets, newsets = newsets, []
for aset in sets:
for eachset in newsets:
if not aset.isdisjoint(eachset):
eachset.update(aset)
break
else:
newsets.append(aset)
return newsets
不需要其他实现中使用的非 Python 计数器(i
、range
)或复杂的变异(del
、pop
、insert
)。它仅使用简单的迭代,以最简单的方式合并重叠集合,并在每次传递数据时构建一个新列表。
我的测试代码(更快更简单):
import random
tenk = range(10000)
lsts = [random.sample(tenk, random.randint(0, 500)) for _ in range(2000)]
setup = """
def merge0(lists):
newsets, sets = [set(lst) for lst in lists if lst], []
while len(sets) != len(newsets):
sets, newsets = newsets, []
for aset in sets:
for eachset in newsets:
if not aset.isdisjoint(eachset):
eachset.update(aset)
break
else:
newsets.append(aset)
return newsets
def merge1(lsts):
sets = [set(lst) for lst in lsts if lst]
merged = 1
while merged:
merged = 0
results = []
while sets:
common, rest = sets[0], sets[1:]
sets = []
for x in rest:
if x.isdisjoint(common):
sets.append(x)
else:
merged = 1
common |= x
results.append(common)
sets = results
return sets
lsts = """ + repr(lsts)
import timeit
print timeit.timeit("merge0(lsts)", setup=setup, number=10)
print timeit.timeit("merge1(lsts)", setup=setup, number=10)
解决方案 7:
这是一个使用不相交集数据结构(特别是不相交森林)的实现,这要感谢comingstorm 提示合并具有一个共同元素的集合。我使用路径压缩来稍微(约 5%)提高速度;这不是完全必要的(它可以防止find
尾部递归,尾部递归可能会降低速度)。请注意,我使用 adict
来表示不相交森林;假设数据是int
s,数组也可以工作,尽管它可能不会快很多。
def merge(data):
parents = {}
def find(i):
j = parents.get(i, i)
if j == i:
return i
k = find(j)
if k != j:
parents[i] = k
return k
for l in filter(None, data):
parents.update(dict.fromkeys(map(find, l), find(l[0])))
merged = {}
for k, v in parents.items():
merged.setdefault(find(v), []).append(k)
return merged.values()
该方法与 Rik 基准测试中的其他最佳算法相当。
解决方案 8:
这将是我更新的方法:
def merge(data):
sets = (set(e) for e in data if e)
results = [next(sets)]
for e_set in sets:
to_update = []
for i,res in enumerate(results):
if not e_set.isdisjoint(res):
to_update.insert(0,i)
if not to_update:
results.append(e_set)
else:
last = results[to_update.pop(-1)]
for i in to_update:
last |= results[i]
del results[i]
last |= e_set
return results
注意:合并过程中空列表将被删除。
更新:可靠性。
您需要进行两项测试才能确保 100% 的成功率:
检查所有结果集是否互相不相交:
merged = [{0, 1, 3, 4, 5, 10, 11, 16}, {8, 2}, {8}]
from itertools import combinations
for a,b in combinations(merged,2):
if not a.isdisjoint(b):
raise Exception(a,b) # just an example
检查合并后的集合是否覆盖原始数据。(按照 katrielalex 的建议)
我认为这会花费一些时间,但如果你想 100% 确定的话也许这是值得的。
解决方案 9:
只是为了好玩......
def merge(mylists):
results, sets = [], [set(lst) for lst in mylists if lst]
upd, isd, pop = set.update, set.isdisjoint, sets.pop
while sets:
if not [upd(sets[0],pop(i)) for i in xrange(len(sets)-1,0,-1) if not isd(sets[0],sets[i])]:
results.append(pop(0))
return results
以及我对最佳答案的改写
def merge(lsts):
sets = map(set,lsts)
results = []
while sets:
first, rest = sets[0], sets[1:]
merged = False
sets = []
for s in rest:
if s and s.isdisjoint(first):
sets.append(s)
else:
first |= s
merged = True
if merged: sets.append(first)
else: results.append(first)
return results
解决方案 10:
这是一个函数(Python 3.1),用于检查合并函数的结果是否正常。它检查:
结果集是否不相交?(并集元素的数量 == 元素数量之和)
结果集的元素是否与输入列表的元素相同?
每个输入列表都是结果集的子集吗?
每个结果集是否最小,即是否不可能将其分成两个较小的集合?
它不会检查是否存在空结果集 - 我不知道您是否想要它们......
。
from itertools import chain
def check(lsts, result):
lsts = [set(s) for s in lsts]
all_items = set(chain(*lsts))
all_result_items = set(chain(*result))
num_result_items = sum(len(s) for s in result)
if num_result_items != len(all_result_items):
print("Error: result sets overlap!")
print(num_result_items, len(all_result_items))
print(sorted(map(len, result)), sorted(map(len, lsts)))
if all_items != all_result_items:
print("Error: result doesn't match input lists!")
if not all(any(set(s).issubset(t) for t in result) for s in lst):
print("Error: not all input lists are contained in a result set!")
seen = set()
todo = list(filter(bool, lsts))
done = False
while not done:
deletes = []
for i, s in enumerate(todo): # intersection with seen, or with unseen result set, is OK
if not s.isdisjoint(seen) or any(t.isdisjoint(seen) for t in result if not s.isdisjoint(t)):
seen.update(s)
deletes.append(i)
for i in reversed(deletes):
del todo[i]
done = not deletes
if todo:
print("Error: A result set should be split into two or more parts!")
print(todo)
解决方案 11:
lists = [[1,2,3],[3,5,6],[8,9,10],[11,12,13]]
import networkx as nx
g = nx.Graph()
for sub_list in lists:
for i in range(1,len(sub_list)):
g.add_edge(sub_list[0],sub_list[i])
print nx.connected_components(g)
#[[1, 2, 3, 5, 6], [8, 9, 10], [11, 12, 13]]
表现:
5000 lists, 5 classes, average size 74, max size 1000
15.2264976415
合并1的性能:
print timeit.timeit("merge1(lsts)", setup=setup, number=10)
5000 lists, 5 classes, average size 74, max size 1000
1.26998780571
因此,它比最快的慢 11 倍...但代码更加简单和易读!
解决方案 12:
这比 Niklas 提供的解决方案要慢(我在 test.txt 上得到了 3.9 秒,而他的解决方案花了 0.5 秒),但产生了相同的结果,并且可能更容易在 Fortran 等语言中实现,因为它不使用集合,只对元素总量进行排序,然后对所有元素进行一次运行。
它返回一个包含合并列表 ID 的列表,因此也会跟踪空列表,它们保持未合并状态。
def merge(lsts):
# this is an index list that stores the joined id for each list
joined = range(len(lsts))
# create an ordered list with indices
indexed_list = sorted((el,index) for index,lst in enumerate(lsts) for el in lst)
# loop throught the ordered list, and if two elements are the same and
# the lists are not yet joined, alter the list with joined id
el_0,idx_0 = None,None
for el,idx in indexed_list:
if el == el_0 and joined[idx] != joined[idx_0]:
old = joined[idx]
rep = joined[idx_0]
joined = [rep if id == old else id for id in joined]
el_0, idx_0 = el, idx
return joined
解决方案 13:
首先,我不确定这些基准是否公平:
将以下代码添加到我的函数的开始处:
c = Counter(chain(*lists))
print c[1]
"88"
这意味着在所有列表中的所有值中,只有 88 个不同的值。通常在现实世界中重复很少见,并且您会期望有更多不同的值。(当然我不知道您的数据来自哪里,所以无法做出假设)。
因为重复更常见,所以集合不相交的可能性更小。这意味着 set.isdisjoint() 方法会更快,因为只需经过几次测试就会发现集合不相交。
尽管如此,我确实相信使用不相交的方法是最快的,但我只是说,它们不应该快 20 倍,也许只应该比其他方法在不同基准测试中快 10 倍。
无论如何,我想我会尝试一种稍微不同的技术来解决这个问题,但是合并排序太慢了,这种方法比使用基准测试的两种最快的方法慢 20 倍:
我以为我会点所有的东西
import heapq
from itertools import chain
def merge6(lists):
for l in lists:
l.sort()
one_list = heapq.merge(*[zip(l,[i]*len(l)) for i,l in enumerate(lists)]) #iterating through one_list takes 25 seconds!!
previous = one_list.next()
d = {i:i for i in range(len(lists))}
for current in one_list:
if current[0]==previous[0]:
d[current[1]] = d[previous[1]]
previous=current
groups=[[] for i in range(len(lists))]
for k in d:
groups[d[k]].append(lists[k]) #add a each list to its group
return [set(chain(*g)) for g in groups if g] #since each subroup in each g is sorted, it would be faster to merge these subgroups removing duplicates along the way.
lists = [[1,2,3],[3,5,6],[8,9,10],[11,12,13]]
print merge6(lists)
"[set([1, 2, 3, 5, 6]), set([8, 9, 10]), set([11, 12, 13])]""
import timeit
print timeit.timeit("merge1(lsts)", setup=setup, number=10)
print timeit.timeit("merge4(lsts)", setup=setup, number=10)
print timeit.timeit("merge6(lsts)", setup=setup, number=10)
5000 lists, 5 classes, average size 74, max size 1000
1.26732238315
5000 lists, 5 classes, average size 74, max size 1000
1.16062907437
5000 lists, 5 classes, average size 74, max size 1000
30.7257182826
解决方案 14:
我发现@Niklas B. 的回答确实很有帮助...但我花了一段时间才读完并理解其中的逻辑。这是对完全相同的代码的重写,使用了新的变量名和更多解释...以帮助其他新手!
def mergeUntilOnlyDisjointSetsRemain(_listsOfLists):
"""Function for merging lists until there are only disjoint sets"""
"""Imagine this algorithm as if it were processing train cars filled with
integers. It takes the first car of the train, separates it from the rest,
and then compares the first car to each subsequent car.
Start by renaming the first car to 'common'
If the two train cars have a common integer, you merge the two cars into
common, and continue down the line until you reach the end of the train.
Once you reach the end of the train, place the common car in results, (which
is essentially a collection of train cars that have already been compared
to all other cars).
You can exit the loop as soon as you get to the end of the train without
merging any of the cars. This is controlled by the continueMerge variable.
This variable is only reset to True after a merge operation.
"""
# Start by creating a trainCar(i.e. a set) from each list in our listOfLists
freightTrain = [set(trainCar) for trainCar in _listsOfLists if trainCar]
# This continueMerge means that we have not yet compared all cars in the train.
continueMerge = True
while continueMerge:
# Reset the while loop trigger.
continueMerge = False
# Create a fresh empty list of cars that have already been cross checked
crossCheckedCars = []
# While there are still elements in freightTrain
while freightTrain:
# Split the freightTrain up, into first car vs all the remaining cars
commonFirstTrainCar = freightTrain[0]
remainingCars = freightTrain[1:]
# The freightTrain is now empty
freightTrain = []
# Iterate over all the remaining traincars
for currentTrainCar in remainingCars:
# If there are not any common integers with the first car...
if currentTrainCar.isdisjoint(commonFirstTrainCar):
# Add each train car back onto the freightTrain
freightTrain.append(currentTrainCar)
# But if they share a common integer...
else:
# Trigger the reset switch to continueMerging cars
continueMerge = True
# and Join he cars together
commonFirstTrainCar |= currentTrainCar
# Once we have checked commonFirstTrainCar, remove it from the
# freightTrain and place it in crossCheckedCars
crossCheckedCars.append(commonFirstTrainCar)
# Now we have compared the first car to all subsequent cars
# (... but we aren't finished because the 5th and 7th cars might have
# had a common integer with each other... but only 1st and 5th cars
# shared an integer the first time around... so the 1st and 5th cars
# were merged, but the 7th car is still alone!)
# Reset the system by creating a new freightTrain
freightTrain = crossCheckedCars
# Post-process the freight train to turn it into lists instead of sets
listsForReturnTripHome = []
for processedTraincar in freightTrain:
listsForReturnTripHome.append(list(processedTraincar))
return listsForReturnTripHome
解决方案 15:
我今天需要这个功能,然后我偶然发现了这个 SO 线程和重复的线程。首先,非常感谢所有参与者,特别是 @alexis,他的回答对我的回答影响很大,还有 @Rik Poggi,他的 GitHub 存储库对我的版本进行测试和基准测试非常有用。这种线程让 SO 变得非常有价值!
几点说明:
我重新编写了 repo,将“deepcopy(lsts)”从基准执行移至设置。使用
timeit.repeat
,我们拥有完全相同的行为和安全性,但deepcopy
不会影响时间。这解释了为什么下面报告的时间如此之低。我在 repo 中添加了一个分析器脚本,因此如果您添加了您的函数,您可以轻松地根据任何列表对其进行分析。
我对@Rik Poggi 的 repo 的分叉在这里。
如上所述,我的版本是 @alexis 答案的重写。我们都是从并查集算法中衍生出来的,但主要区别在于我的版本
bin_ref
在合并时会更新所有节点的父节点(),而不仅仅是主节点。这使我避免了 @alexis 答案()中的递归树遍历locatebin
,我认为它使代码更清晰。这是一种相当不透明的方式来实现我们想要的效果(除非你是数学家,而我不是),所以我试图让代码尽可能易读。我相信在这种情况下“可读”->“可维护”。
我的版本对输入数据没有做太多假设。我的要求是
list[tuple[Path]]
,所以我制作了一个不太关心内部数据的版本。只要您传递一个可哈希的Iterable[Iterable[T]]
位置T
,您就没问题。
关于结果:
我的版本似乎在处理大型数据集时表现更好。当数据集变小时,它的表现会略有下降(在最坏的情况下,比最佳速度慢 2.5 倍)。对于中等大小的数据集,最好使用 @Rik Poggi 的版本。对于小型数据集,最好使用 @alexis 的版本。
from itertools import chain
from typing import Iterable, TypeVar
T = TypeVar('T')
def merge_intersections(lists: Iterable[Iterable[T]]) -> list[set[T]]:
"""
Merges lists based on whether or not they intersect. Returns a list
of mutually disjoint sets.
How it works:
The general idea is to iterate over the sequences and merge them in
their "intersection-bins". To do so, for each sequence, we look for
items that were already encountered during previous iteration. If we
don't find any, this means the current sequence might be disjoint
from the rest, so we put it in its own bin. If, on the other hand,
we find previously encountered items, this means one or more bins
already exist for these items. If more than one bin exists we merge
them (updating the reference of all contained items). Finally we put
the current sequence's items in the bin.
Usage:
::
>>> result = merge_intersections([[1, 2], [2, 4], [7, 8], [3, 2], [4, 5]])
>>> result = sorted(sorted(s) for s in result)
[[1, 2, 3, 4, 5], [7, 8]]
"""
bins: dict[T: set[T]] = dict()
bin_refs: dict[T: T] = dict()
for lst in lists:
if not lst:
continue
# Gather the bin refs of all items in the list that we have
# already seen.
encountered_items_bin_refs = {
bin_refs[item]
for item in lst
if item in bin_refs
}
if len(encountered_items_bin_refs) >= 1:
# Some of the items in `lst` have already been seen in a
# previous iteration. They are therefore already attached
# to a bin. Select any of their corresponding bin ref.
bin_ref = encountered_items_bin_refs.pop()
# If the previously-seen items were not all attached to the
# same bin, their respective bins need to be merged into
# the selected one.
if len(encountered_items_bin_refs) > 0:
to_merge_bins = [bins.pop(ref) for ref in encountered_items_bin_refs]
bins[bin_ref].update(chain(*to_merge_bins))
for item in chain(*to_merge_bins):
bin_refs[item] = bin_ref
bins[bin_ref].update(lst)
else:
# None of the items in `lst` have already been seen in a
# previous iteration. Therefore, we can safely pick any
# item as our new bin ref and create the corresponding bin.
bin_ref = next(iter(lst))
bins[bin_ref] = set(lst)
for item in lst:
bin_refs[item] = bin_ref
return list(bins.values())
基准测试结果:
Timing with: >> Niklas << Benchmark
Info: 5000 lists, average size 305, max size 999
(from file: ./lists/timing_1.txt)
0.200 (1x) -- takeshi
0.328 (1.6x) -- alexis
0.473 (2.4x) -- agf
0.746 (3.7x) -- Rik. Poggi
0.776 (3.9x) -- Niks rewrite
0.792 (4x) -- Niklas B.
1.419 (7.1x) -- agf (optimized)
1.480 (7.4x) -- ChessMaster
1.579 (7.9x) -- katrielalex
1.627 (8.2x) -- steabert
1.938 (9.7x) -- robert king
9.273 (46x) -- Sven Marnach
33.779 (169x) -- hochl
Timing with: >> Niklas << Benchmark
Info: 4500 lists, average size 305, max size 999
(from file: ./lists/timing_2.txt)
0.158 (1x) -- takeshi
0.166 (1x) -- agf
0.231 (1.5x) -- Niks rewrite
0.232 (1.5x) -- Rik. Poggi
0.233 (1.5x) -- Niklas B.
0.268 (1.7x) -- alexis
0.408 (2.6x) -- ChessMaster
0.422 (2.7x) -- agf (optimized)
1.408 (8.9x) -- katrielalex
1.483 (9.4x) -- steabert
1.924 (12x) -- robert king
8.622 (55x) -- Sven Marnach
25.952 (164x) -- hochl
Timing with: >> Niklas << Benchmark
Info: 4500 lists, average size 98, max size 999
(from file: ./lists/timing_3.txt)
0.059 (1x) -- takeshi
0.068 (1.1x) -- agf
0.094 (1.6x) -- alexis
0.095 (1.6x) -- Rik. Poggi
0.095 (1.6x) -- Niklas B.
0.097 (1.6x) -- Niks rewrite
0.183 (3.1x) -- ChessMaster
0.192 (3.2x) -- agf (optimized)
0.425 (7.2x) -- katrielalex
0.586 (9.9x) -- robert king
0.787 (13x) -- steabert
2.827 (48x) -- Sven Marnach
9.162 (155x) -- hochl
Timing with: >> Sven << Benchmark
Info: 200 lists, average size 10, max size 10
0.000 (1x) -- alexis
0.001 (1.3x) -- ChessMaster
0.001 (1.8x) -- agf (optimized)
0.001 (1.9x) -- takeshi
0.002 (3.7x) -- steabert
0.002 (3.7x) -- robert king
0.002 (4.2x) -- katrielalex
0.002 (4.6x) -- Rik. Poggi
0.002 (4.8x) -- agf
0.002 (5.1x) -- Niklas B.
0.002 (5.5x) -- hochl
0.003 (6.1x) -- Niks rewrite
0.003 (7.7x) -- Sven Marnach
Timing with: >> Agf << Benchmark
Info: 2000 lists, average size 253, max size 500
0.030 (1x) -- Rik. Poggi
0.035 (1.2x) -- ChessMaster
0.036 (1.2x) -- agf
0.036 (1.2x) -- agf (optimized)
0.037 (1.2x) -- Niks rewrite
0.038 (1.3x) -- Niklas B.
0.060 (2x) -- hochl
0.074 (2.5x) -- takeshi
0.118 (3.9x) -- alexis
0.520 (17x) -- katrielalex
0.528 (18x) -- steabert
0.694 (23x) -- robert king
34.746 (1158x) -- Sven Marnach
测试结果:
### Going to test: takeshi ###
test_coverage (__main__.MergeTestCase)
Check coverage original data ... ok
test_disjoint (__main__.MergeTestCase)
Check disjoint-ness of merged results ... ok
test_subset (__main__.MergeTestCase)
Check that every original data is a subset ... ok
----------------------------------------------------------------------
Ran 3 tests in 0.008s
OK
解决方案 16:
我的解决方案适用于小列表,并且无依赖性,可读性强。
def merge_list(starting_list):
final_list = []
for i,v in enumerate(starting_list[:-1]):
if set(v)&set(starting_list[i+1]):
starting_list[i+1].extend(list(set(v) - set(starting_list[i+1])))
else:
final_list.append(v)
final_list.append(starting_list[-1])
return final_list
基准测试:
列表 = [[1,2,3],[3,5,6],[8,9,10],[11,12,13]]
%timeit merge_list(列表)
100000 次循环,3 次中最佳:每次循环 4.9 µs
解决方案 17:
可以使用并查集算法在 O(n) 内解决此问题。给定数据的前两行,并查集要使用的边为以下对:(0,1)、(1,3)、(1,0)、(0,3)、(3,4)、(4,5)、(5,10)
解决方案 18:
使用flag
以确保获得最终的互斥结果
def merge(lists):
while(1):
flag=0
for i in range(0,len(lists)):
for j in range(i+1,len(lists)):
if len(intersection(lists[i],lists[j]))!=0:
lists[i]=union(lists[i],lists[j])
lists.remove(lists[j])
flag+=1
break
if flag==0:
break
return lists
解决方案 19:
from itertools import combinations
def merge(elements_list):
d = {index: set(elements) for index, elements in enumerate(elements_list)}
while any(not set.isdisjoint(d[i], d[j]) for i, j in combinations(d.keys(), 2)):
merged = set()
for i, j in combinations(d.keys(), 2):
if not set.isdisjoint(d[i], d[j]):
d[i] = set.union(d[i], d[j])
merged.add(j)
for k in merged:
d.pop(k)
return [v for v in d.values() if v]
lst = [[65, 17, 5, 30, 79, 56, 48, 62],
[6, 97, 32, 93, 55, 14, 70, 32],
[75, 37, 83, 34, 9, 19, 14, 64],
[43, 71],
[],
[89, 49, 1, 30, 28, 3, 63],
[35, 21, 68, 94, 57, 94, 9, 3],
[16],
[29, 9, 97, 43],
[17, 63, 24]]
print(merge(lst))
扫码咨询,免费领取项目管理大礼包!