77 void start(uint32_t thread_count) {
81 if (queue_.enabled()) {
84 startInternal(thread_count);
91 if (!queue_.enabled()) {
103 return (queue_.pushBack(item));
111 return (queue_.pushFront(item));
118 return (queue_.count());
126 auto id = std::this_thread::get_id();
127 if (checkThreadId(
id)) {
141 auto id = std::this_thread::get_id();
142 if (checkThreadId(
id)) {
145 return (queue_.wait(seconds));
171 return (queue_.enabled());
181 return (queue_.paused());
188 queue_.setMaxQueueSize(max_queue_size);
195 return (queue_.getMaxQueueSize());
202 return (threads_.size());
211 return (queue_.getQueueStat(which));
219 void startInternal(uint32_t thread_count) {
224 sigaddset(&sset, SIGCHLD);
225 sigaddset(&sset, SIGINT);
226 sigaddset(&sset, SIGHUP);
227 sigaddset(&sset, SIGTERM);
228 pthread_sigmask(SIG_BLOCK, &sset, &osset);
229 queue_.enable(thread_count);
231 for (uint32_t i = 0; i < thread_count; ++i) {
232 threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run,
this));
236 pthread_sigmask(SIG_SETMASK, &osset, 0);
240 pthread_sigmask(SIG_SETMASK, &osset, 0);
244 void stopInternal() {
245 auto id = std::this_thread::get_id();
246 if (checkThreadId(
id)) {
247 isc_throw(MultiThreadingInvalidOperation,
"thread pool stop called by worker thread");
250 for (
auto const& thread : threads_) {
259 bool checkThreadId(std::thread::id
id) {
260 for (
auto const& thread : threads_) {
261 if (
id == thread->get_id()) {
279 template <
typename Item,
typename QueueContainer = std::queue<Item>>
280 struct ThreadPoolQueue {
285 : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
286 unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
298 void registerThread() {
299 std::lock_guard<std::mutex> lock(mutex_);
305 void unregisterThread() {
306 std::lock_guard<std::mutex> lock(mutex_);
314 void setMaxQueueSize(
size_t max_queue_size) {
315 std::lock_guard<std::mutex> lock(mutex_);
316 max_queue_size_ = max_queue_size;
323 std::lock_guard<std::mutex> lock(mutex_);
324 return (max_queue_size_);
338 bool pushBack(
const Item& item) {
344 std::lock_guard<std::mutex> lock(mutex_);
345 if (max_queue_size_ != 0) {
346 while (queue_.size() >= max_queue_size_) {
351 queue_.push_back(item);
365 bool pushFront(
const Item& item) {
370 std::lock_guard<std::mutex> lock(mutex_);
371 if ((max_queue_size_ != 0) &&
372 (queue_.size() >= max_queue_size_)) {
375 queue_.push_front(item);
396 std::unique_lock<std::mutex> lock(mutex_);
399 if (paused_ && working_ == 0 && unavailable_ == 0) {
400 wait_threads_cv_.notify_all();
403 if (working_ == 0 && queue_.empty()) {
404 wait_cv_.notify_all();
407 cv_.wait(lock, [&]() {
return (!enabled_ || (!queue_.empty() && !paused_));});
412 size_t length = queue_.size();
416 Item item = queue_.front();
427 std::lock_guard<std::mutex> lock(mutex_);
428 return (queue_.size());
436 std::unique_lock<std::mutex> lock(mutex_);
438 wait_cv_.wait(lock, [&]() {
return (working_ == 0 && queue_.empty());});
448 bool wait(uint32_t seconds) {
449 std::unique_lock<std::mutex> lock(mutex_);
451 bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
452 [&]() {
return (working_ == 0 && queue_.empty());});
461 void pause(
bool wait) {
462 std::unique_lock<std::mutex> lock(mutex_);
466 wait_threads_cv_.wait(lock, [&]() {
return (working_ == 0 && unavailable_ == 0);});
474 std::unique_lock<std::mutex> lock(mutex_);
484 double getQueueStat(
size_t which) {
485 std::lock_guard<std::mutex> lock(mutex_);
494 isc_throw(InvalidParameter,
"supported statistic for "
495 <<
"10/100/1000 only, not " << which);
503 std::lock_guard<std::mutex> lock(mutex_);
504 queue_ = QueueContainer();
512 void enable(uint32_t thread_count) {
513 std::lock_guard<std::mutex> lock(mutex_);
515 unavailable_ = thread_count;
523 std::lock_guard<std::mutex> lock(mutex_);
553 QueueContainer queue_;
559 std::condition_variable cv_;
562 std::condition_variable wait_cv_;
565 std::condition_variable wait_threads_cv_;
570 std::atomic<bool> enabled_;
575 std::atomic<bool> paused_;
579 size_t max_queue_size_;
585 uint32_t unavailable_;
599 queue_.registerThread();
600 for (
bool work =
true; work; work = queue_.enabled()) {
610 queue_.unregisterThread();
614 std::vector<boost::shared_ptr<std::thread>> threads_;
617 ThreadPoolQueue<WorkItemPtr, Container> queue_;