TOP-K 问题的终极算法 - BFPRT 算法

TOP-K 问题的终极算法 - BFPRT 算法

TOP-K 问题,从一堆无序数据里面找到前 K 大(当然也可以是前 K 小的数。我可以用堆排序或者快速排序可以做到,但是时间复杂度为 O(NlogN), 这里就不多说了。BFPRT 算法,该算法于1973年由 Blum、Floyd、Pratt、Rivest 和 Tarjan 联合发明,其中蕴含的深刻思想改变了世界。BFPRT 算法解决了这样一个问题,在时间复杂度 O(N)内,从无序的数组中找到第 K 小的数。

1.BFPRT 算法的核心思想

假设 BFPRT 算法的函数是selectK(des []int, start int, end int, i int) int,该函数的功能为在arr中找到第k小的数,然后返回该数。selectK(des []int, start int, end int, i int) int的过程如下:

  1. 将arr中的n个元素划分成n/5组,每组5个元素,如果最后的组不够5个元素,那么最后剩下的元素为一组(n%5个元素)。
  2. 对每个组进行插入排序,只针对每个组最多5个元素之间的组内排序,组与组之间并不排序。排序后找到每个组的中位数,如果组的元素个数为偶数,这里规定找到下中位数(位置处于排好序数组右边的中位数)。
  3. 步骤2中一共会找到n/5个中位数,让这些中位数组成一个新的数组,记为 midArray 。递归调用selectK (midArray, 0, len(midArray)-1, len(midArray)/2),意义是找到 mArr 数组中的中位数,即 midArray 中第(midArray.length/2)小的数。
  4. 假设步骤3中递归调用selectK (midArray, 0, len(midArray)-1, len(midArray)/2)后,返回的数为 x 。根据这个 x 划分整个 arr 数组(partition过程),划分的过程为:在arr中,比 x 小的数都在 x 的左边,大于 x 的数都在 x 的右边,x 在中间。划分完成后,x 在 arr 中的位置记为 i。
  5. 如果 i==k ,说明x为整个数组中第 k 小的数,直接返回。
    • 如果 i<k,说明 x 处在第 k 小的数的左边,应该在 x 的右边寻找第 k 小的数,所以递归调用selectK函数,在右半区寻找第 i 小的数。
    • 如果 i>k,说明 x 处在第 k 小的数的右边,应该在 x 的左边寻找第k小的数,所以递归调用selectK函数,在左半区寻找第 k 小的数。

2.BFPRT 算法怎么找到第K大的数

BFPRT 算法又叫中位数的中位数算法,一次中位数的运算可以找到第50%的数,根据当前中位数在数组中的位置K’,可以判断当前第50%大的数是不是第K
的数。如果 K’ < K ,则往右半区查找。因为在合并后,K’位置左边的数一定比所有小于K’位置的数小,K’位置右边的数一定比所有小于K’位置的数大。所以当 K’ = K 时,就是第 K 大的数。

3.BFPRT 算法实现

主函数过程如下:从当前数组中找到第 K 大中位数,然后对数组进行划分,比 x 小的数都在x的左边,大于 x 的数都在 x 的右边,x在中间。x在arr中的位置记为i。如果 i==k,说明x为整个数组中第 k 小的数,直接返回。如果 i<k,说明 x 处在第 k 小的数的左边,应该在 x 的右边寻找第 k 小的数,所以递归调用selectK函数,在右半区寻找第 i 小的数。如果 i>k,说明 x 处在第 k 小的数的右边,应该在 x 的左边寻找第k小的数,所以递归调用selectK函数,在左半区寻找第 k 小的数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func selectK(arr []int, start int, end int, i int) int {
if start == end {
return arr[start]
}
midNumber := getMinNumberByArray(arr, start, end)
rangeMid := getRangMidPosition(arr, start, end, midNumber)
//println("=========================")
//fmt.Println("arr = ",arr)
//fmt.Println("midNumber = ",midNumber)
//println("pivotRange = ", rangeMid[0], " ", rangeMid[1])
if i >= rangeMid[0] && i <= rangeMid[1] {
return arr[i]
} else if i >= rangeMid[0] {
return selectK(arr, rangeMid[1]+1, end, i)
} else {
return selectK(arr, start, rangeMid[0]-1, i)
}
}

从原始数组中找到算出中位数数组的长度大小,如果最后剩余长度小于5,则单独划分成一组,通过getMid函数,得到每小组的中位数。然后再对中位数数组继续调用selectK方法,找到中位数组的中位数,直到start == end为止,返回的数 x 就是当前数组中位数的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func getMinNumberByArray(arr []int, begin, end int) int {
var midArray []int
num := end - begin + 1
if num%5 == 0 {
midArray = make([]int, num/5)
} else {
midArray = make([]int, num/5+1)
}
for i := 0; i < len(midArray); i++ {
s := begin + i*5
e := s + 4
midArray[i] = getMid(arr, s, int(math.Min(float64(e), float64(end))))
}

return selectK(midArray, 0, len(midArray)-1, len(midArray)/2)
}

得到当前段数组的中位数过程,首先进行插入排序,保证这是一个有序数组,如果当前数组是偶数,sum/2得出的就直接是下中位数,如果数组不是偶数,则使用下中位数mid := sum/2 + sum%2

1
2
3
4
5
6
func getMid(arr []int, s int, e int) int {
insertSort(arr, s, e)
sum := s + e
mid := sum/2 + sum%2
return arr[mid]
}

插入排序过程如下

1
2
3
4
5
6
7
8
9
10
11
func insertSort(arr []int, s int, e int) {
for i := s + 1; i != e+1; i++ {
for j := i; j != s; j-- {
if arr[j-1] > arr[j] {
arr[j-1], arr[j] = arr[j], arr[j-1]
} else {
break
}
}
}
}

得到数组的中位数后,然后对数组进行划分,比 x 小的数都在 x 的左边,大于 x 的数都在x的右边,值为 x 在中间 small 到 big的区域,这样就不用重复计算这些数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func getRangMidPosition(arr []int, start int, end int, midNumber int) [2]int {
small := start
big := end
cur := start
for cur != big+1 {
if arr[cur] < midNumber {
arr[cur], arr[small] = arr[small], arr[cur]
cur++
small++
} else if arr[cur] > midNumber {
arr[cur], arr[big] = arr[big], arr[cur]
big--
} else {
cur++
}
}
return [2]int{small, big}
}

我们通过一组数据来具体的说明算法是如何工作的,下面给出一组测试案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Test_getMinKByBF(t *testing.T) {
type args struct {
arr []int
k int
}
tests := []struct {
name string
args args
want int
}{
{name: "", args: struct {
arr []int
k int
}{arr: []int{6, 9, 1, 3, 1, 2, 2, 5, 6, 1, 3, 5, 9, 7, 2, 5, 6, 1, 9}, k: 16}, want: 7},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getMinKByBFPRT(tt.args.arr, tt.args.k); got != tt.want {
t.Errorf("getMinKByBFPRT() = %v, want %v", got, tt.want)
}
})
}
}

因此数组是从 0 开始的,那么在数组中的位置是 15

1
2
3
4
5
func getMinKByBFPRT(arr []int, k int) int {
des := make([]int, len(arr))
copy(des, arr)
return selectK(des, 0, len(arr)-1, k-1)
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
=========================
arr = [2 3 5 6]
midNumber = 5
pivotRange = 2 2
i = 2
=========================
arr = [1 1 3 1 1 2 2 2 3 5 5 5 7 9 6 9 6 9 6]
midNumber = 5
pivotRange = 9 11
i = 15
=========================
arr = [7 9]
midNumber = 9
pivotRange = 1 1
i = 1
=========================
arr = [1 1 3 1 1 2 2 2 3 5 5 5 6 6 7 6 9 9 9]
midNumber = 9
pivotRange = 16 18
i = 15
=========================
arr = [1 1 3 1 1 2 2 2 3 5 5 5 6 6 6 7 9 9 9]
midNumber = 6
pivotRange = 12 14
i = 15
  1. 首先找到无序数组6, 9, 1, 3, 1, 2, 2, 5, 6, 1, 3, 5, 9, 7, 2, 5, 6, 1, 9的中位数,进入递归,递归后的划分原来的中位数数组为[2 3 5 6],并且已经被中位数 5 划分了,返回 5,5 就是原来无序数组的中位数。
  2. 从递归回到主函数后,根据找到的中位数 5 ,划分数组,划分后的位置在[9,11]比我们要找到第 16 大数小,所以排除左边区域,从右边开始找。
  3. 和上面解释一样,找到了 新无序数组的中位数组的中位数,返回 9 ,位置在[16,18]比 15 大 ,往前查找,最终找到了位置[12,14]比 15 大,往前后查找,start == end,此时start位置 15 就是第 16 小的数。
#

评论

`
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×