`
bianku
  • 浏览: 72396 次
  • 性别: Icon_minigender_1
  • 来自: 常州
社区版块
存档分类
最新评论

并行排序算法

阅读更多
今天早晨看到 蛙蛙池塘 的这篇博客 谁能把这个程序的性能提升一倍?---并行排序算法 。促使我写了一个并行排序算法,这个排序算法充分利用多核CPU进行并行计算,从而提高排序的效率。 

先简单说一下蛙蛙池塘 给的A,B,C 三种算法(见上面引用的那篇博客),A算法将耗时的平方和开平方计算放到比较函数中,导致Array.Sort 时,每次亮亮比较都要执行平方和开平方计算,其平均算法复杂度为 O(nlog2n) 。 而B 将平方和开平方计算提取出来,算法复杂度降低到 O(n) ,这也就是为什么B比A效率要高很多的缘故。C 和 B 相比,将平方函数替换成了 x*x ,由于少了远程函数调用和Pow函数本身的开销,效率有提高了不少。我在C的基础上编写了D算法,D算法采用并行计算技术,在我的双核笔记本电脑上数据量比较大的情况下,其排序效率较C要提高30%左右。 

下面重点介绍这个并行排序算法。算法思路其实很简单,就是将要排序的数组按照处理器数量等分成若干段,然后用和处理器数量等同的线程并行对各个小段进行排序,排序结束和,再在单一线程中对这若干个已经排序的小段进行归并排序,最后输出完整的排序结果。考虑到和.Net 2.0 兼容,我没有用微软提供的并行库,而是用多线程来实现。 

下面是测试结果: 



n A B C D 
32768 0.7345 0.04122 0.0216 0.0254 
65535 1.5464 0.08863 0.05139 0.05149 
131072 3.2706 0.1858 0.118 0.108 
262144 6.8423 0.4056 0.29586 0.21849 
524288 15.0342 0.9689 0.7318 0.4906 
1048576 31.6312 1.9978 1.4646 1.074 
2097152 66.9134 4.1763 3.0828 2.3095 



从测试结果上看,当要排序的数组长度较短时,并行排序的效率甚至还没有不进行并行排序高,这主要是多线程的开销造成的。当数组长度增大到25万以上时,并行排序的优势开始体现出来,随着数组长度的增长,排序时间最后基本稳定在但线程排序时间的 74% 左右,其中并行排序的消耗大概在50%左右,归并排序的消耗在 14%左右。由此也可以推断,如果在4CPU的机器上,其排序时间最多可以减少到单线程的 14 + 25 = 39%。8 CPU 为 14 + 12.5 = 26.5% 

目前这个算法在归并算法上可能还有提高的余地,如果哪位高手能够进一步提高这个算法,不妨贴出来一起交流交流。 

下面分别给出并行排序和归并排序的代码: 

并行排序类 ParallelSort 

Paralletsort 类是一个通用的泛型,调用起来非常简单,下面给一个简单的int型数组的排序示例: 



class IntComparer : IComparer < int > 
{ 
IComparer Members #region IComparer<int> Members 

public int Compare( int x, int y) 
{ 
return x.CompareTo(y); 
} 

#endregion 
} 

public void SortInt( int [] array) 
{ 
Sort.ParallelSort < int > parallelSort = new Sort.ParallelSort < int > (); 
parallelSort.Sort(array, new IntComparer()); 
} 

只要实现一个T类型两两比较的接口,然后调用ParallelSort 的 Sort 方法就可以了,是不是很简单? 

下面是 ParallelSort类的代码 

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace Sort 
{ 
/**/ /// <summary> 
/// ParallelSort 
/// </summary> 
/// <typeparam name="T"></typeparam> 
public class ParallelSort < T > 
{ 
enum Status 
{ 
Idle = 0 , 
Running = 1 , 
Finish = 2 , 
} 

class ParallelEntity 
{ 
public Status Status; 
public T[] Array; 
public IComparer < T > Comparer; 

public ParallelEntity(Status status, T[] array, IComparer < T > comparer) 
{ 
Status = status; 
Array = array; 
Comparer = comparer; 
} 
} 

private void ThreadProc(Object stateInfo) 
{ 
ParallelEntity pe = stateInfo as ParallelEntity; 

lock (pe) 
{ 
pe.Status = ParallelSort < T > .Status.Running; 

Array.Sort(pe.Array, pe.Comparer); 

pe.Status = ParallelSort < T > .Status.Finish; 
} 
} 

public void Sort(T[] array, IComparer < T > comparer) 
{ 
// Calculate process count 
int processorCount = Environment.ProcessorCount; 

// If array.Length too short, do not use Parallel sort 
if (processorCount == 1 || array.Length < processorCount) 
{ 
Array.Sort(array, comparer); 
return ; 
} 

// Split array 
ParallelEntity[] partArray = new ParallelEntity[processorCount]; 

int remain = array.Length; 
int partLen = array.Length / processorCount; 

// Copy data to splited array 
for ( int i = 0 ; i < processorCount; i ++ ) 
{ 
if (i == processorCount - 1 ) 
{ 
partArray[i] = new ParallelEntity(Status.Idle, new T[remain], comparer); 
} 
else 
{ 
partArray[i] = new ParallelEntity(Status.Idle, new T[partLen], comparer); 

remain -= partLen; 
} 

Array.Copy(array, i * partLen, partArray[i].Array, 0 , partArray[i].Array.Length); 
} 

// Parallel sort 
for ( int i = 0 ; i < processorCount - 1 ; i ++ ) 
{ 
ThreadPool.QueueUserWorkItem( new WaitCallback(ThreadProc), partArray[i]); 
} 

ThreadProc(partArray[processorCount - 1 ]); 
} 

private static void A(Vector[] vectors) 
{ 
Array.Sort(vectors, new VectorComparer()); 
} 

private static void B(Vector[] vectors) 
{ 
int n = vectors.Length; 
for ( int i = 0 ; i < n; i ++ ) 
{ 
Vector c1 = vectors[i]; 
c1.T = Math.Sqrt(Math.Pow(c1.X, 2 ) 
+ Math.Pow(c1.Y, 2 ) 
+ Math.Pow(c1.Z, 2 ) 
+ Math.Pow(c1.W, 2 )); 
} 
Array.Sort(vectors, new VectorComparer2()); 
} 

private static void C(Vector[] vectors) 
{ 
int n = vectors.Length; 
for ( int i = 0 ; i < n; i ++ ) 
{ 
Vector c1 = vectors[i]; 
c1.T = Math.Sqrt(c1.X * c1.X 
+ c1.Y * c1.Y 
+ c1.Z * c1.Z 
+ c1.W * c1.W); 
} 
Array.Sort(vectors, new VectorComparer2()); 
} 

private static void D(Vector[] vectors) 
{ 
int n = vectors.Length; 
for ( int i = 0 ; i < n; i ++ ) 
{ 
Vector c1 = vectors[i]; 
c1.T = Math.Sqrt(c1.X * c1.X 
+ c1.Y * c1.Y 
+ c1.Z * c1.Z 
+ c1.W * c1.W); 
} 

Sort.ParallelSort < Vector > parallelSort = new Sort.ParallelSort < Vector > (); 
parallelSort.Sort(vectors, new VectorComparer2()); 
} 

} 
}  
 
// Wait all threads finish 
for ( int i = 0 ; i < processorCount; i ++ ) 
{ 
while ( true ) 
{ 
lock (partArray[i]) 
{ 
if (partArray[i].Status == ParallelSort < T > .Status.Finish) 
{ 
break ; 
} 
} 

Thread.Sleep( 0 ); 
} 
} 

// Merge sort 
MergeSort < T > mergeSort = new MergeSort < T > (); 

List < T[] > source = new List < T[] > (processorCount); 

foreach (ParallelEntity pe in partArray) 
{ 
source.Add(pe.Array); 
} 

mergeSort.Sort(array, source, comparer); 
} 
} 
} 



多路归并排序类 MergeSort 

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 

namespace Sort 
{ 
/**/ /// <summary> 
/// MergeSort 
/// </summary> 
/// <typeparam name="T"></typeparam> 
public class MergeSort < T > 
{ 
public void Sort(T[] destArray, List < T[] > source, IComparer < T > comparer) 
{ 
// Merge Sort 
int [] mergePoint = new int [source.Count]; 

for ( int i = 0 ; i < source.Count; i ++ ) 
{ 
mergePoint[i] = 0 ; 
} 

int index = 0 ; 

while (index < destArray.Length) 
{ 
int min = - 1 ; 

for ( int i = 0 ; i < source.Count; i ++ ) 
{ 
if (mergePoint[i] >= source[i].Length) 
{ 
continue ; 
} 

if (min < 0 ) 
{ 
min = i; 
} 
else 
{ 
if (comparer.Compare(source[i][mergePoint[i]], source[min][mergePoint[min]]) < 0 ) 
{ 
min = i; 
} 
} 
} 

if (min < 0 ) 
{ 
continue ; 
} 

destArray[index ++ ] = source[min][mergePoint[min]]; 
mergePoint[min] ++ ; 
} 
} 

} 
} 



主函数及测试代码 在蛙蛙池塘 代码基础上修改 



using System; 
using System.Collections.Generic; 
using System.Diagnostics; 

namespace Vector4Test 
{ 
public class Vector 
{ 
public double W; 
public double X; 
public double Y; 
public double Z; 
public double T; 
} 

internal class VectorComparer : IComparer < Vector > 
{ 
public int Compare(Vector c1, Vector c2) 
{ 
if (c1 == null || c2 == null ) 
throw new ArgumentNullException( " Both objects must not be null " ); 
double x = Math.Sqrt(Math.Pow(c1.X, 2 ) 
+ Math.Pow(c1.Y, 2 ) 
+ Math.Pow(c1.Z, 2 ) 
+ Math.Pow(c1.W, 2 )); 
double y = Math.Sqrt(Math.Pow(c2.X, 2 ) 
+ Math.Pow(c2.Y, 2 ) 
+ Math.Pow(c2.Z, 2 ) 
+ Math.Pow(c2.W, 2 )); 
if (x > y) 
return 1 ; 
else if (x < y) 
return - 1 ; 
else 
return 0 ; 
} 
} 

internal class VectorComparer2 : IComparer < Vector > 
{ 
public int Compare(Vector c1, Vector c2) 
{ 
if (c1 == null || c2 == null ) 
throw new ArgumentNullException( " Both objects must not be null " ); 
if (c1.T > c2.T) 
return 1 ; 
else if (c1.T < c2.T) 
return - 1 ; 
else 
return 0 ; 
} 
} 

internal class Program 
{ 
private static void Print(Vector[] vectors) 
{ 
// foreach (Vector v in vectors) 
// { 
// Console.WriteLine(v.T); 
// } 
} 

private static void Main( string [] args) 
{ 
Vector[] vectors = GetVectors(); 

Console.WriteLine( string .Format( " n = {0} " , vectors.Length)); 

Stopwatch watch1 = new Stopwatch(); 
watch1.Start(); 
A(vectors); 
watch1.Stop(); 
Console.WriteLine( " A sort time: " + watch1.Elapsed); 
Print(vectors); 

vectors = GetVectors(); 
watch1.Reset(); 
watch1.Start(); 
B(vectors); 
watch1.Stop(); 
Console.WriteLine( " B sort time: " + watch1.Elapsed); 
Print(vectors); 

vectors = GetVectors(); 
watch1.Reset(); 
watch1.Start(); 
C(vectors); 
watch1.Stop(); 
Console.WriteLine( " C sort time: " + watch1.Elapsed); 
Print(vectors); 

vectors = GetVectors(); 
watch1.Reset(); 
watch1.Start(); 
D(vectors); 
watch1.Stop(); 
Console.WriteLine( " D sort time: " + watch1.Elapsed); 
Print(vectors); 

Console.ReadKey(); 
} 

private static Vector[] GetVectors() 
{ 
int n = 1 << 21 ; 
Vector[] vectors = new Vector[n]; 
Random random = new Random(); 

for ( int i = 0 ; i < n; i ++ ) 
{ 
vectors[i] = new Vector(); 
vectors[i].X = random.NextDouble(); 
vectors[i].Y = random.NextDouble(); 
vectors[i].Z = random.NextDouble(); 
vectors[i].W = random.NextDouble(); 
} 
return vectors; 

 

分享到:
评论

相关推荐

    各种并行排序算法的C语言实现代码

    本文将深入探讨并行排序算法的原理以及如何用C语言来实现。 一、并行排序的重要性 在大数据处理和高性能计算领域,排序是一项基础且耗时的任务。传统的串行排序算法如冒泡排序、插入排序、快速排序等在面对大规模...

    面向大数据的可扩展正则采样并行排序算法

    在大数据时代,随着数据量的极速增长,并行排序算法受到广泛关注。现有的并行排序算法普遍存在通信开销过大、负载不均衡等问题,导致算法难以大规模扩展。针对以上问题,提出一种大规模可扩展的正则采样并行排序...

    LogGP Analysis of Parallel Sorting Algorithms 并行排序算法的LogGP分析.doc

    ### 并行排序算法的LogGP分析 #### 摘要与引言 本文通过《LogGP Analysis of Parallel Sorting Algorithms并行排序算法的LogGP分析》文档探讨了并行计算模型——LogGP在工作站网络(NOW)中对并行排序算法进行性能...

    多核处理器中一种改进的并行排序算法.pdf

    并行排序算法在多核处理器环境下对于提升计算效率至关重要,特别是在大数据处理和高性能计算领域。传统的并行排序算法,如基于SIMD(Single Instruction Multiple Data)指令的算法,虽然能够生成较短的有序子序列,...

    java设计模式四大常用架构迭代模型并行排序算法.pdf

    Java 设计模式四大常用架构迭代模型并行排序算法 Java 设计模式是软件工程中的一种思想,旨在提高软件的重用性和可维护性。1995 年,著名的书籍《Design Pattern》出版,总结了 23 种经典的设计模式,成为软件设计...

    FPGA并行快速排序算法-位宽可设

    在本文中,我们将深入探讨基于FPGA的并行快速排序算法,特别关注“位宽可设”的特性。这种算法能够高效地处理大量数据,并且在硬件实现上具有很高的灵活性。我们将从以下几个方面来阐述这个主题: 一、快速排序算法...

    Verilog/C++实现排序算法:Verilog/C++实现排序算法:冒泡排序、选择排序、并行全比较排序、串行全比较排序

    本文将探讨如何使用这两种语言实现几种基本的排序算法:冒泡排序、选择排序,以及两种全比较排序(并行和串行)。 首先,让我们了解一下排序算法。排序是计算机科学中最基础的操作之一,它涉及到将一组数据按照特定...

    java设计模式_四大常用架构_迭代模型_并行排序算法

    - **并行排序算法**:随着计算机硬件的发展,多核处理器已经成为常态,利用多核处理器的并行计算能力可以显著提高排序等数据处理任务的性能。并行排序算法是利用多个处理器同时工作的能力来提高排序速度的一种算法。...

    多种排序的并行算法(具体)

    【多种排序的并行算法】 排序是计算机科学中不可或缺的一部分,尤其在大数据处理和高性能计算领域,高效排序算法显得尤为重要。...在设计并行排序算法时,需要充分考虑这些因素,以达到最佳的性能表现。

    FPGA并行全排序算法RTL代码

    在FPGA中,由于其硬件并行性,可以设计出能在同一时钟周期内对多个数据进行操作的并行排序算法,显著提升效率。 这个压缩包中的"RTL"目录可能包含了以下文件: 1. `top_module.v`:这是整个设计的顶层模块,它封装...

    用于并行计算的并行 Julia 排序算法_julia_代码_下载

    并行排序算法通过将任务分解到多个处理器或节点上同时进行,可以显著提高排序速度。 Julia 语言为并行计算提供了强大的支持。它内置了多线程和分布式计算框架,使得开发者可以方便地编写高效的并行程序。在 Julia ...

    高维度数据的并行快速排序算法.pptx

    #### 一、并行排序算法简介 **1.1 并行排序算法概述** - **并行排序的概念**:并行排序算法是一种利用多处理器协同工作的算法模式,它通过将排序任务拆分成更小的子任务,并将这些子任务分配给不同的处理器或线程...

    使用MPI实现的PSRS并行排序算法

    使用MPI计算的完整的PSRS(并行排序(parallel sorting by regular sampling))代码。并行计算课实验所用代码。

    各种 排序 的并行化

    因此,并行排序算法应运而生,通过利用多处理器或多核架构的并行计算能力,极大地提高了排序效率。 #### 二、枚举排序 ##### 2.1 枚举排序及其串行算法 枚举排序,也被称为秩排序,是一种基于比较次数的简单排序...

    整理最全资料:并行计算大作业:矩阵乘法,排序算法,代码+课件+报告超详细

    并行排序算法是并行计算中的另一大应用,目的是快速对大量数据进行排序。经典的并行排序算法有快速排序、归并排序和基数排序等的并行版本。其中,快速排序的并行化可以通过分治策略实现,归并排序则利用合并操作的...

    奇偶排序算法的并行实现

    利用mpi进行奇偶排序的实现,有较好的性能和可扩展性

    基于FPGA的并行全比较排序算法.pdf

    文章首先对传统的排序算法进行了分析,指出它们多依赖于软件的串行方式实现,如冒泡法、选择法、计数法等,这些方法效率较低,无法满足工程应用中对实时性的高要求。接着,文章提出了一种基于FPGA硬件技术的并行全...

    MPI实现并行的快速排序

    利用MPI实现快速排序的并行算法,算法使用C语言实现

Global site tag (gtag.js) - Google Analytics