线程池 痛定思痛。 2022-06-13 13:22 343阅读 0赞 1.所谓线程池,就是程序的初始化阶段,就预先创建一批线程,每个线程都做好准备干活; 2.然后有一个任务列表,一开始为空,当有任务来了,就往任务列表里面添加;这个任务列表由于那些线程们都会进行操作,所以需要做好同步工作。 3.任务列表里面有任务了,这时候那些等待的线程们就要抢活干了,怎么抢,使用各种线程同步手段(互斥量,临界区等),人品好的线程抢到任务后,从任务列表取出任务,就可以开始干活了。干完以后,就又继续回到初始等待状态,准备抢夺下一个任务。 /\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*/ 这样就好比你有一批小弟排队在那里等着,一旦有任务,他们会很守纪律的去抢着干,每个任务都会被一个小弟抢走,干完以后,小弟不用休息,继续等着抢下一个任务干活。这样当你的任务源源不断的到达,你的小弟们就一个个争先恐后的抢过来完成,绝不偷懒。 /\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*/ 相反,如果不使用线程池,每次等到任务来了,再临时创建线程。这样就相当于每次有任务时,你再临时招聘一个小弟过来,小弟完成任务后,就回家了。然后下次再有任务,又招聘一个小弟过来,完成任务后,回家。相比线程池,中间招聘小弟的时间就要额外耗费时间和精力了(创建和销毁线程中,cpu的时间,内存的分配)。 /\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*/ 所以,使用线程池,就省去了哪些额外的线程开销,从而连续的完成所有的任务。当然,线程池中用于线程同步的操作同样也有一定的消耗,但这个消耗是相对小的。另外,还可以对线程池中的线程根据当前的任务量进行动态的调整,从而更好的节省相关资源。 下面代码是网上找到的一个例子: threadpool.h /* Thread Pool implementation for unix / linux environments Copyright (C) 2008 Shobhit Gupta This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include <pthread.h> #include <semaphore.h> #include <iostream> #include <vector> using namespace std; /* WorkerThread class This class needs to be sobclassed by the user. */ class WorkerThread{ public: int id; unsigned virtual executeThis() { return 0; } WorkerThread(int id) : id(id) {} virtual ~WorkerThread(){} }; /* ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and ynchronizations between all threads. */ class ThreadPool{ public: ThreadPool(); ThreadPool(int maxThreadsTemp); virtual ~ThreadPool(); void destroyPool(int maxPollSecs); bool assignWork(WorkerThread *worker); bool fetchWork(WorkerThread **worker); void initializeThreads(); static void *threadExecute(void *param); static pthread_mutex_t mutexSync; static pthread_mutex_t mutexWorkCompletion; private: int maxThreads; pthread_cond_t condCrit; sem_t availableWork; sem_t availableThreads; //WorkerThread ** workerQueue; vector<WorkerThread *> workerQueue; int topIndex; int bottomIndex; int incompleteWork; int queueSize; }; threadpool.cpp /* Thread Pool implementation for unix / linux environments Copyright (C) 2008 Shobhit Gupta This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include <stdlib.h> #include "threadpool.h" using namespace std; pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER; ThreadPool::ThreadPool() { ThreadPool(2); } ThreadPool::ThreadPool(int maxThreads) { if (maxThreads < 1) maxThreads=1; //mutexSync = PTHREAD_MUTEX_INITIALIZER; //mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&mutexSync); this->maxThreads = maxThreads; this->queueSize = maxThreads; //workerQueue = new WorkerThread *[maxThreads]; workerQueue.resize(maxThreads, NULL); topIndex = 0; bottomIndex = 0; incompleteWork = 0; sem_init(&availableWork, 0, 0); sem_init(&availableThreads, 0, queueSize); pthread_mutex_unlock(&mutexSync); } void ThreadPool::initializeThreads() { for(int i = 0; i<maxThreads; ++i) { pthread_t tempThread; pthread_create(&tempThread, NULL, &ThreadPool::threadExecute, (void *) this ); //threadIdVec[i] = tempThread; } } ThreadPool::~ThreadPool() { workerQueue.clear(); } void ThreadPool::destroyPool(int maxPollSecs = 2) { while( incompleteWork>0 ) { //cout << "Work is still incomplete=" << incompleteWork << endl; sleep(maxPollSecs); } cout << "All Done!! Wow! That was a lot of work!" << endl; sem_destroy(&availableWork); sem_destroy(&availableThreads); pthread_mutex_destroy(&mutexSync); pthread_mutex_destroy(&mutexWorkCompletion); } bool ThreadPool::assignWork(WorkerThread *workerThread) { pthread_mutex_lock(&mutexWorkCompletion); incompleteWork++; //cout << "assignWork...incomapleteWork=" << incompleteWork << endl; pthread_mutex_unlock(&mutexWorkCompletion); sem_wait(&availableThreads); pthread_mutex_lock(&mutexSync); //workerVec[topIndex] = workerThread; workerQueue[topIndex] = workerThread; //cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl; if(queueSize !=1 ) topIndex = (topIndex+1) % (queueSize-1); sem_post(&availableWork); pthread_mutex_unlock(&mutexSync); return true; } bool ThreadPool::fetchWork(WorkerThread **workerArg) { sem_wait(&availableWork); pthread_mutex_lock(&mutexSync); WorkerThread * workerThread = workerQueue[bottomIndex]; workerQueue[bottomIndex] = NULL; *workerArg = workerThread; if(queueSize !=1 ) bottomIndex = (bottomIndex+1) % (queueSize-1); sem_post(&availableThreads); pthread_mutex_unlock(&mutexSync); return true; } void *ThreadPool::threadExecute(void *param) { WorkerThread *worker = NULL; while(((ThreadPool *)param)->fetchWork(&worker)) { if(worker) { worker->executeThis(); //cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl; delete worker; worker = NULL; } pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) ); //cout << "Thread " << pthread_self() << " has completed a Job !" << endl; ((ThreadPool *)param)->incompleteWork--; pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) ); } return 0; } main.cpp /* Thread Pool implementation for unix / linux environments Copyright (C) 2008 Shobhit Gupta This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include <iostream> #include "threadpool.h" using namespace std; #define ITERATIONS 200 class SampleWorkerThread : public WorkerThread { public: int id; unsigned virtual executeThis() { // Instead of sleep() we could do anytime consuming work here. //Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds) sleep(2); return(0); } SampleWorkerThread(int id) : WorkerThread(id), id(id) { // cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl; } ~SampleWorkerThread() { // cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl; } }; int main(int argc, char **argv) { //ThreadPool(N); //Create a Threadpool with N number of threads ThreadPool* myPool = new ThreadPool(25); myPool->initializeThreads(); //We will count time elapsed after initializeThreads() time_t t1=time(NULL); //Lets start bullying ThreadPool with tonnes of work !!! for(unsigned int i=0;i<ITERATIONS;i++){ SampleWorkerThread* myThread = new SampleWorkerThread(i); //cout << "myThread[" << myThread->id << "] = [" << myThread << "]" << endl; myPool->assignWork(myThread); } // destroyPool(int maxPollSecs) // Before actually destroying the ThreadPool, this function checks if all the pending work is completed. // If the work is still not done, then it will check again after maxPollSecs // The default value for maxPollSecs is 2 seconds. // And ofcourse the user is supposed to adjust it for his needs. myPool->destroyPool(2); time_t t2=time(NULL); cout << t2-t1 << " seconds elapsed\n" << endl; delete myPool; return 0; }
相关 Java 线程池、Runnable线程池、Callable线程池 线程池: 其实就是一个容纳多个线程的容器,其中的线程可以反复的使用,省去了频繁创建和销毁过程对象的操作,无需反复创建线程面消耗过多资源。 为什么要用线程池: 合理 青旅半醒/ 2023年02月26日 12:30/ 0 赞/ 71 阅读
相关 线程、线程池 创建线程的3种方法: package com.frank.threadPool.createThread; / @author 小石潭记 布满荆棘的人生/ 2022年10月22日 04:27/ 0 赞/ 405 阅读
相关 线程池 1.所谓线程池,就是程序的初始化阶段,就预先创建一批线程,每个线程都做好准备干活; 2.然后有一个任务列表,一开始为空,当有任务来了,就往任务列表里面添加;这个任务列表 痛定思痛。/ 2022年06月13日 13:22/ 0 赞/ 344 阅读
相关 线程池 西施越溪女,明艳光云海 最近用线程池和不用线程池做了个速度的测试,在这里备注下: 结果是速度不相上下; public static void main(Str 妖狐艹你老母/ 2022年05月20日 02:35/ 0 赞/ 295 阅读
相关 线程池 线程池 Java里面线程池的顶级接口是 java.util.concurrent.Executor , 但是严格意义上讲 Executor并不是一个线程池,而只是一个 迈不过友情╰/ 2022年03月06日 14:34/ 0 赞/ 418 阅读
相关 线程池 线程池 > 从字面义上来讲,是指管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中(Worker Queue)保存了所有等待执行的任务。工作者( 清疚/ 2021年12月11日 03:35/ 0 赞/ 400 阅读
相关 线程池 可preStart一个或全部core thread 0,小于core则来一个任务建一个线程(firstTask),队列,额外线程,拒绝 一个AtomicInteger的 今天药忘吃喽~/ 2021年11月23日 03:40/ 0 赞/ 427 阅读
相关 线程池 1、先创建线程池 import java.util.concurrent.ArrayBlockingQueue; import java.util.concu 拼搏现实的明天。/ 2021年11月09日 14:28/ 0 赞/ 435 阅读
还没有评论,来说两句吧...