行计算简介和多核CPU编程Demo--Mountain Notebook

来源:百度文库 编辑:神马文学网 时间:2024/04/29 11:20:43
          并行计算简介和多核CPU编程Demo2006年是双核的普及年,双核处理器出货量开始超过单核处理器出货量;2006年的11月份Intel开始供货4核;AMD今年也将发布4核,并计划今年下半年发布8核;     按照Intel一个文档所说:"假定22纳米处理时帧上有一枚13毫米大小的处理器,其上有40亿个晶体管、48MB高速缓存,功耗为100W。利用如此数量的晶体管,我们可设计拥有12个较大内核、48个(多核)中型内核、或144个小型内核(许多个内核)的处理器。"而且Intel已经开发完成了一款80核心处理器原型,速度达到每秒一万亿次浮点运算。 
   以前的CPU升级,很多时候软件性能都能够自动地获得相应提升,而面对多核CPU,免费的午餐没有能会集成一些专门用途的核(很可能设计成比较通用的模式),比如GPU的核、图象处理的核、向量运算的核、)先来看一下单个CPU上的并行计算:
  单CPU上常见的并行计算:多级流水线(提高CPU频率的利器)、超标量执行(多条流水线并同时发送多条指令)、乱序执行(指令重排)、单指令流多数据流SIMD、超长指令字处理器(依赖于编译器分析)等并行计算简介
  并行平台的通信模型: 共享数据(POSIX、windows线程、OpenMP)、消息交换(MPI、PVM)
  并行算法模型: 数据并行模型、任务依赖图模型、工作池模型、管理者-工作者模型、消费者模型
  对于并行计算一个任务可能涉及到的问题: 任务分解、任务依赖关系、任务粒度分配、并发度、任务交互
  并行算法性能的常见度量值:  并行开销、加速比、效率(加速比/CPU数)、成本(并行运行时间*CPU数)一个简单的多核计算  演示中主要完成的工作是:(工作本身没有什么意义 主要是消耗一些时间来代表需要做的工作)
代码:
double Sum0(double* data,long data_count)
{
    double result=0;
    for (long i=0;i    {
        data[i]=sqrt(1-(data[i]*data[i]));
        result+=data[i];
    }
    return  result;
}然后用OpenMP工具(vc和icc编译器支持)(函数SumOpenMP)和一个自己手工写的线程工具来并行化该函数(函数SumWTP),并求出加速比;
(在多核CPU上执行Demo才可以看到多CPU并行的优势)
OpenMP是基于编译器命令的并行编程标准,使用的共享数据模型,现在可以用在C/C++、Fortan中;OpenMP命令提供了对并发、同步、数据读写的支持;//我测试用的编译器vc2005 
//需要在项目属性中打开多线程和OpenMP支持 //TestWTP.cpp#i nclude
#i nclude
#i nclude
#i nclude
#i nclude #define _IS_TEST_OpenMP
//要测试OpenMP需要编译器支持OpenMP,并在编译设置里面启用OpenMP
#ifdef _IS_TEST_OpenMP
  #i nclude
#endif//使用CWorkThreadPool在多个CPU上完成计算的简单Demo
#i nclude "WorkThreadPool.h"
double Sum0(double* data,long data_count);   //单线程执行
double SumWTP(double* data,long data_count); //根据CPU数动态多线程并行执行
#ifdef _IS_TEST_OpenMP
    double SumOpenMP(double* data,long data_count); //使用OpenMP来并行执行
#endif
const long g_data_count=200000;
double g_data[g_data_count];int main()
{
    long i;
    double start0, start1, start2;
    const long test_count=200*2;
    double sumresult;    //inti
    for (i=0;i        g_data[i]=rand()*(1.0/RAND_MAX);    //
    start0=(double)clock();
 sumresult=0;
    for( i=0; i    {
        sumresult+=Sum0(g_data,g_data_count);
    }
    start0=((double)clock()-start0)/CLOCKS_PER_SEC;
    printf (" ");
    printf ("  result  = %10.7f ",sumresult);
    printf ("  Seconds = %10.7f ",start0 );
#ifdef _IS_TEST_OpenMP
    start1=clock();
 sumresult=0;
    for( i=0; i    {
        sumresult+=SumOpenMP(g_data,g_data_count);
    }
    start1=((double)clock()-start1)/CLOCKS_PER_SEC;
    printf (" ");
    printf ("  result  = %10.7f ",sumresult);
    printf ("  Seconds = %10.7f ",start1);    printf (" ");
    printf ("%10.7f/%10.7f  = %2.4f ",start0,start1,start0/start1);
#endif    //
    start2=clock();
 sumresult=0;
    for( i=0; i    {
        sumresult+=SumWTP(g_data,g_data_count);
    }
    start2=((double)clock()-start2)/CLOCKS_PER_SEC;
    printf (" ",CWorkThreadPool::best_work_count());
    printf ("  result  = %10.7f ",sumresult);
    printf ("  Seconds = %10.7f ",start2);    printf (" ");
    printf ("%10.7f/%10.7f  = %2.4f ",start0,start2,start0/start2);
    
    
    printf ("  ---------  ok ! ---------");
    getchar();    return 0;
}
double Sum0(double* data,long data_count)
{
    double result=0;
    for (long i=0;i    {
        data[i]=sqrt(1-(data[i]*data[i]));
        result+=data[i];
    }
    return  result;
}#ifdef _IS_TEST_OpenMP
double SumOpenMP(double* data,long data_count)
{
    double result=0;
    #pragma omp parallel for schedule(static)
    for (long i=0;i    {
        data[i]=sqrt(1-(data[i]*data[i]));
        result+=data[i];
    }
    return result;
}
#endifstruct TWorkData
{
    long    ibegin;
    long    iend;
    double* data;
    double  result;
};void sum_callback(TWorkData* wd)
{
    wd->result=Sum0( &wd->data[wd->ibegin],(wd->iend-wd->ibegin) );
}double SumWTP(double* data,long data_count)
{
    static long work_count=CWorkThreadPool::best_work_count();
    static std::vector   work_list(work_count);
    static std::vector  pwork_list(work_count);
    long i;
   
    static bool IS_inti=false;
    if (!IS_inti)//分配任务
    {
        for (i=0;i        {
            work_list[i].data=data;
            if (0==i) work_list[i].ibegin=0;
            else work_list[i].ibegin=work_list[i-1].iend;
            work_list[i].iend=data_count*(i+1)/work_count;
        }
        for (i=0;i            pwork_list[i]=&work_list[i];
        IS_inti=true;
    }    //执行任务
    CWorkThreadPool::work_execute((TThreadCallBack)sum_callback,(void**)&pwork_list[0],pwork_list.size());    double result=0;
    for (i=0;i        result+=work_list[i].result;    return result;
}//CWorkThreadPool的声明文件 WorkThreadPool.h/////////////////////////////////////////////////////////////
//工作线程池 CWorkThreadPool
//用于把一个任务拆分成多个线程任务,从而可以使用多个CPU
//HouSisong@263.net
////////////////////////////
//todo:改成任务领取模式
//todo:修改辅助线程优先级,继承自主线程
//要求:1.任务分割时分割的任务量比较接近
//      2.任务也不要太小,否则线程的开销可能会大于并行的收益
//      3.任务数最好是CPU数的倍数
//      4.主线程不能以过高优先级运行,否则其他辅助线程可能得不到时间片#ifndef _WorkThreadPool_H_
#define _WorkThreadPool_H_typedef void (*TThreadCallBack)(void * pData);class CWorkThreadPool
{
public:
    static long best_work_count();  //返回最佳工作分割数,现在的实现为返回CPU个数
    static void work_execute(const TThreadCallBack work_proc,void** word_data_list,int work_count);  //并行执行工作,并等待所有工作完成   
    static void work_execute(const TThreadCallBack* work_proc_list,void** word_data_list,int work_count); //同上,但不同的work调用不同的函数
    static void work_execute_single_thread(const TThreadCallBack work_proc,void** word_data_list,int work_count)  //单线程执行工作,并等待所有工作完成;用于调试等 
 {
  for (long i=0;i   work_proc(word_data_list[i]);
 }
    static void work_execute_single_thread(const TThreadCallBack* work_proc_list,void** word_data_list,int work_count)  //单线程执行工作,并等待所有工作完成;用于调试等 
 {
  for (long i=0;i   work_proc_list[i](word_data_list[i]);
 }
};
 #endif //_WorkThreadPool_H_
//CWorkThreadPool的实现文件 WorkThreadPool.cpp/////////////////////////////////////////////////////////////
//工作线程池 TWorkThreadPool#i nclude
#i nclude
#i nclude "windows.h"
#i nclude "WorkThreadPool.h"//#define _IS_SetThreadAffinity_ 
//定义该标志则执行不同的线程绑定到不同的CPU,减少线程切换开销; 不鼓励
class TWorkThreadPool;//线程状态
enum TThreadState{ thrStartup=0, thrReady,  thrBusy, thrTerminate, thrDeath };class TWorkThread
{
public:
    volatile HANDLE             thread_handle;
    volatile enum TThreadState  state;
    volatile TThreadCallBack    func;
    volatile void *             pdata;  //work data    
    volatile HANDLE             waitfor_event;
    TWorkThreadPool*            pool;
    volatile DWORD              thread_ThreadAffinityMask;    TWorkThread() { memset(this,0,sizeof(TWorkThread));  }
};void do_work_end(TWorkThread* thread_data);
void __cdecl thread_dowork(TWorkThread* thread_data) //void __stdcall thread_dowork(TWorkThread* thread_data)
{
    volatile TThreadState& state=thread_data->state;
 #ifdef _IS_SetThreadAffinity_
  SetThreadAffinityMask(GetCurrentThread(),thread_data->thread_ThreadAffinityMask);
 #endif
    state = thrStartup;    while(true)
    {
        WaitForSingleObject(thread_data->waitfor_event, -1);
        if(state == thrTerminate)
            break;        state = thrBusy;
        volatile TThreadCallBack& func=thread_data->func;
        if (func!=0)
            func((void *)thread_data->pdata);
        do_work_end(thread_data);
    }
    state = thrDeath;
    _endthread();
    //ExitThread(0);
}class TWorkThreadPool
{
private:
    volatile HANDLE             thread_event;
    volatile HANDLE             new_thread_event;
    std::vector    work_threads;
 mutable long                 cpu_count;
 inline long get_cpu_count() const {
  if (cpu_count>0) return cpu_count;  SYSTEM_INFO SystemInfo;
  GetSystemInfo(&SystemInfo);
  cpu_count=SystemInfo.dwNumberOfProcessors;
  return cpu_count;
 }
    inline long passel_count() const { return (long)work_threads.size()+1; }
    void inti_threads() {
  long best_count =get_cpu_count();        long newthrcount=best_count - 1;
        work_threads.resize(newthrcount);
        thread_event = CreateSemaphore(0, 0,newthrcount , 0);
        new_thread_event = CreateSemaphore(0, 0,newthrcount , 0);
        long i;
        for( i= 0; i < newthrcount; ++i)
        {
            work_threads[i].waitfor_event=thread_event;
            work_threads[i].state = thrTerminate;
            work_threads[i].pool=this;
   work_threads[i].thread_ThreadAffinityMask=1<<(i+1);
            work_threads[i].thread_handle =(HANDLE)_beginthread((void (__cdecl *)(void *))thread_dowork, 0, (void*)&work_threads[i]);
           //CreateThread(0, 0, (LPTHREAD_START_ROUTINE)thread_dowork,(void*) &work_threads[i], 0, &thr_id);
   //todo: _beginthread 的错误处理
        }
  #ifdef _IS_SetThreadAffinity_
   SetThreadAffinityMask(GetCurrentThread(),0x01);
  #endif
        for(i = 0; i < newthrcount; ++i)
        {
            while(true) {
                if (work_threads[i].state == thrStartup) break;
                else Sleep(0);
            }
            work_threads[i].state = thrReady;
        }
    }
    void free_threads(void)
    {
        long thr_count=(long)work_threads.size();
        long i;
        for(i = 0; i         {
            while(true) { 
                if (work_threads[i].state == thrReady) break;
                else Sleep(0);
            }
            work_threads[i].state=thrTerminate;
        }
        if (thr_count>0)
            ReleaseSemaphore(thread_event,thr_count, 0);
        for(i = 0; i         {
            while(true) { 
                if (work_threads[i].state == thrDeath) break;
                else Sleep(0);
            }
        }
        CloseHandle(thread_event);
        CloseHandle(new_thread_event);
        work_threads.clear();
    }
    void passel_work(const TThreadCallBack* work_proc,int work_proc_inc,void** word_data_list,int work_count)    {
     if (work_count==1)
     {
   (*work_proc)(word_data_list[0]);
     }
     else
     {
   const TThreadCallBack* pthwork_proc=work_proc;
   pthwork_proc+=work_proc_inc;
  
   long i;
   long thr_count=(long)work_threads.size();
   for(i = 0; i < work_count-1; ++i)
   {
    work_threads[i].func  = *pthwork_proc;
    work_threads[i].pdata  =word_data_list[i+1];
    work_threads[i].state = thrBusy;
    pthwork_proc+=work_proc_inc;
   }
   for(i =  work_count-1; i < thr_count; ++i)
   {
    work_threads[i].func  = 0;
    work_threads[i].pdata  =0;
    work_threads[i].state = thrBusy;
   }
   if (thr_count>0)
    ReleaseSemaphore(thread_event,thr_count, 0);   //current thread do a work
   (*work_proc)(word_data_list[0]);
   //wait for work finish 
   for(i = 0; i    {
    while(true) { 
     if (work_threads[i].state == thrReady) break;
     else Sleep(0);
    }
   }
   std::swap(thread_event,new_thread_event);
  }
    }
    void private_work_execute(TThreadCallBack* pwork_proc,int work_proc_inc,void** word_data_list,int work_count)    {       
     while (work_count>0)
        {
            long passel_work_count;
            if (work_count>=passel_count())
                passel_work_count=passel_count();
            else
                passel_work_count=work_count;            passel_work(pwork_proc,work_proc_inc,word_data_list,passel_work_count);   pwork_proc+=(work_proc_inc*passel_work_count);
            word_data_list=&word_data_list[passel_work_count];
            work_count-=passel_work_count;
        }
    }
public:
   explicit TWorkThreadPool():thread_event(0),work_threads(),cpu_count(0) {   inti_threads();    }
    ~TWorkThreadPool() {  free_threads(); }
    inline long best_work_count() const { return passel_count(); }
    inline void DoWorkEnd(TWorkThread* thread_data){
        thread_data->waitfor_event=new_thread_event;
        thread_data->func=0;
        thread_data->state = thrReady;
    }    inline void work_execute(TThreadCallBack* pwork_proc,void** word_data_list,int work_count)    {  
  private_work_execute(pwork_proc,1,word_data_list,work_count);
    }
    inline void work_execute(TThreadCallBack work_proc,void** word_data_list,int work_count)    {  
  private_work_execute(&work_proc,0,word_data_list,work_count);
    }
};
void do_work_end(TWorkThread* thread_data)
{
    thread_data->pool->DoWorkEnd(thread_data);
}//TWorkThreadPool end;
////////////////////////////////////////TWorkThreadPool g_work_thread_pool;//工作线程池long CWorkThreadPool::best_work_count() {  return g_work_thread_pool.best_work_count();  }void CWorkThreadPool::work_execute(const TThreadCallBack work_proc,void** word_data_list,int work_count)
{
 g_work_thread_pool.work_execute(work_proc,word_data_list,work_count);
}void CWorkThreadPool::work_execute(const TThreadCallBack* work_proc_list,void** word_data_list,int work_count)
{
 g_work_thread_pool.work_execute((TThreadCallBack*)work_proc_list,word_data_list,work_count);
}