ACE/FAQ/APG/ThreadPools

Материал из Wiki.crossplatform.ru

Перейти к: навигация, поиск

Содержание


Futures

// $Id: Futures.cpp 94312 2011-07-11 00:39:42Z schmidt $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_time.h"
#include "ace/Task.h"
#include "ace/Unbounded_Queue.h"
#include "ace/Synch.h"
#include "ace/SString.h"
#include "ace/Method_Request.h"
#include "ace/Future.h"
#include "ace/Activation_Queue.h"
#include "ace/Condition_T.h"
 
#define OUTSTANDING_REQUESTS 20
 
// Listing 2 code/ch16
class CompletionCallBack: public ACE_Future_Observer<ACE_CString*>
{
public:
  virtual void update (const ACE_Future<ACE_CString*> & future)
  {
    ACE_CString *result = 0;
 
    // Block for the result.
    future.get (result);
    ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ()));
    delete result;
  }
};
// Listing 2
// Listing 1 code/ch16
class LongWork : public ACE_Method_Request
{
public:
  virtual int call (void)
  {
    ACE_TRACE ("LongWork::call");
    ACE_DEBUG
      ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n")));
    ACE_OS::sleep (1);
 
    char buf[1024];
    ACE_OS::strcpy (buf, "Completed assigned task\n");
    ACE_CString *msg;
    ACE_NEW_RETURN
      (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1);
    result_.set (msg);
    return 0;
  }
 
  ACE_Future<ACE_CString*> &future (void)
  {
    ACE_TRACE ("LongWork::future");
    return result_;
  }
 
  void attach (CompletionCallBack *cb)
  {
    result_.attach (cb);
  }
 
private:
  ACE_Future<ACE_CString*> result_;
};
// Listing 1
 
class Exit : public ACE_Method_Request
{
public:
  virtual int call (void)
  {
    ACE_TRACE ("Exit::call");
    return -1;
  }
};
 
class Worker;
 
class IManager
{
public:
  virtual ~IManager (void) { }
 
  virtual int return_to_work (Worker *worker) = 0;
};
 
// Listing 3 code/ch16
class Worker: public ACE_Task<ACE_MT_SYNCH>
{
public:
  Worker (IManager *manager)
    : manager_(manager), queue_ (msg_queue ())
  { }
 
  int perform (ACE_Method_Request *req)
  {
    ACE_TRACE ("Worker::perform");
    return this->queue_.enqueue (req);
  }
 
  virtual int svc (void)
  {
    thread_id_ = ACE_Thread::self ();
    while (1)
      {
        ACE_Method_Request *request = this->queue_.dequeue();
        if (request == 0)
          return -1;
 
        // Invoke the request
        int result = request->call ();
        if (result == -1)
          break;
 
        // Return to work.
        this->manager_->return_to_work (this);
      }
 
    return 0;
  }
 
  ACE_thread_t thread_id (void);
 
private:
  IManager *manager_;
  ACE_thread_t thread_id_;
  ACE_Activation_Queue queue_;
};
// Listing 3
 
ACE_thread_t Worker::thread_id (void)
{
  return thread_id_;
}
 
// Listing 4 code/ch16
class Manager : public ACE_Task_Base, private IManager
{
public:
  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
 
  Manager ()
    : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
  {
    ACE_TRACE ("Manager");
  }
 
  int perform (ACE_Method_Request *req)
  {
    ACE_TRACE ("perform");
    return this->queue_.enqueue (req);
  }
 
  int svc (void)
  {
    ACE_TRACE ("svc");
 
    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
 
    // Create pool when you get in the first time.
    create_worker_pool ();
 
    while (!done ())
      {
        ACE_Time_Value tv ((long)MAX_TIMEOUT);
        tv += ACE_OS::time (0);
 
        // Get the next message
        ACE_Method_Request *request = this->queue_.dequeue (&tv);
        if (request == 0)
          {
            shut_down ();
            break;
          }
 
        // Choose a worker.
        Worker *worker = choose_worker ();
 
        // Ask the worker to do the job.
        worker->perform (request);
      }
 
    return 0;
  }
 
  int shut_down (void);
 
  virtual int return_to_work (Worker *worker)
  {
    ACE_GUARD_RETURN
      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
    ACE_DEBUG
      ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n")));
    this->workers_.enqueue_tail (worker);
    this->workers_cond_.signal ();
 
    return 0;
  }
 
private:
  Worker *choose_worker (void)
  {
    ACE_GUARD_RETURN
      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0)
 
      while (this->workers_.is_empty ())
        workers_cond_.wait ();
 
    Worker *worker = 0;
    this->workers_.dequeue_head (worker);
    return worker;
  }
 
  int create_worker_pool (void)
  {
    ACE_GUARD_RETURN
      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
    for (int i = 0; i < POOL_SIZE; i++)
      {
        Worker *worker;
        ACE_NEW_RETURN (worker, Worker (this), -1);
        this->workers_.enqueue_tail (worker);
        worker->activate ();
      }
 
    return 0;
  }
 
  int done (void)
  {
    return (shutdown_ == 1);
  }
 
  ACE_thread_t thread_id (Worker *worker)
  {
    return worker->thread_id ();
  }
 
private:
  int shutdown_;
  ACE_Thread_Mutex workers_lock_;
  ACE_Condition<ACE_Thread_Mutex> workers_cond_;
  ACE_Unbounded_Queue<Worker* > workers_;
  ACE_Activation_Queue queue_;
};
// Listing 4
 
int
Manager::shut_down (void)
{
  ACE_TRACE ("Manager::shut_down");
  ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin ();
  Worker **worker_ptr = 0;
  do
    {
      iter.next (worker_ptr);
      Worker *worker = (*worker_ptr);
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"),
                  thread_id (worker)));
 
      Exit *req;
      ACE_NEW_RETURN (req, Exit(), -1);
 
      // Send the hangup message
      worker->perform (req);
 
      // Wait for the exit.
      worker->wait ();
 
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) Worker %d shut down.\n"),
                  thread_id (worker)));
 
      delete req;
      delete worker;
 
    }
  while (iter.advance ());
 
  shutdown_ = 1;
 
  return 0;
}
 
// Listing 5 code/ch16
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  Manager tp;
  tp.activate ();
 
  ACE_Time_Value tv;
  tv.msec (100);
 
  // Wait for a few seconds every time you send a message.
  CompletionCallBack cb;
  LongWork workArray[OUTSTANDING_REQUESTS];
  for (int i = 0; i < OUTSTANDING_REQUESTS; i++)
    {
      workArray[i].attach (&cb);
      ACE_OS::sleep (tv);
      tp.perform (&workArray[i]);
    }
 
  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}
// Listing 5
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

Thread Pool

// $Id: LF_ThreadPool.cpp 94310 2011-07-09 19:10:06Z schmidt $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/Task.h"
#include "ace/Containers.h"
#include "ace/Synch.h"
#include "ace/Condition_T.h"
 
// Listing 4 code/ch16
class Follower
{
public:
  Follower (ACE_Thread_Mutex &leader_lock)
    : cond_(leader_lock)
  {
    owner_ = ACE_Thread::self ();
  }
 
  //FUZZ: disable check_for_lack_ACE_OS
  int wait (void)
  {
    return this->cond_.wait ();
  }
 
  int signal (void)
  {
    return this->cond_.signal ();
  }
  //FUZZ: enable check_for_lack_ACE_OS
 
  ACE_thread_t owner (void)
  {
    return this->owner_;
  }
 
private:
  ACE_Condition<ACE_Thread_Mutex> cond_;
  ACE_thread_t owner_;
};
// Listing 4
// Listing 1 code/ch16
class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>
{
public:
  LF_ThreadPool () : shutdown_(0), current_leader_(0)
  {
    ACE_TRACE ("LF_ThreadPool::TP");
  }
 
  virtual int svc (void);
 
  void shut_down (void)
  {
    shutdown_ = 1;
  }
 
private:
  int become_leader (void);
 
  Follower *make_follower (void);
 
  int elect_new_leader (void);
 
  int leader_active (void)
  {
    ACE_TRACE ("LF_ThreadPool::leader_active");
    return this->current_leader_ != 0;
  }
 
  void leader_active (ACE_thread_t leader)
  {
    ACE_TRACE ("LF_ThreadPool::leader_active");
    this->current_leader_ = leader;
  }
 
  void process_message (ACE_Message_Block *mb);
 
  int done (void)
  {
    return (shutdown_ == 1);
  }
 
private:
  int shutdown_;
  ACE_thread_t current_leader_;
  ACE_Thread_Mutex leader_lock_;
  ACE_Unbounded_Queue<Follower*> followers_;
  ACE_Thread_Mutex followers_lock_;
  static long LONG_TIME;
};
// Listing 1
// Listing 2 code/ch16
int
LF_ThreadPool::svc (void)
{
  ACE_TRACE ("LF_ThreadPool::svc");
  while (!done ())
    {
      become_leader ();  // Block until this thread is the leader.
 
      ACE_Message_Block *mb = 0;
      ACE_Time_Value tv (LONG_TIME);
      tv += ACE_OS::gettimeofday ();
 
      // Get a message, elect new leader, then process message.
      if (this->getq (mb, &tv) < 0)
        {
          if (elect_new_leader () == 0)
            break;
          continue;
        }
 
      elect_new_leader ();
      process_message (mb);
    }
 
  return 0;
}
// Listing 2
// Listing 3 code/ch16
int
LF_ThreadPool::become_leader (void)
{
  ACE_TRACE ("LF_ThreadPool::become_leader");
 
  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  if (leader_active ())
    {
      Follower *fw = make_follower ();
      {
        // Wait until told to do so.
        while (leader_active ())
          fw->wait ();
      }
 
      delete fw;
    }
 
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader\n")));
 
  // Mark yourself as the active leader.
  leader_active (ACE_Thread::self ());
  return 0;
}
 
Follower*
LF_ThreadPool::make_follower (void)
{
  ACE_TRACE ("LF_ThreadPool::make_follower");
 
  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);
  Follower *fw;
  ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);
  this->followers_.enqueue_tail (fw);
  return fw;
}
// Listing 3
// Listing 5 code/ch16
int
LF_ThreadPool::elect_new_leader (void)
{
  ACE_TRACE ("LF_ThreadPool::elect_new_leader");
 
  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  leader_active (0);
 
  // Wake up a follower
  if (!followers_.is_empty ())
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex,
                        follower_mon,
                        this->followers_lock_,
                        -1);
      // Get the old follower.
      Follower *fw;
      if (this->followers_.dequeue_head (fw) != 0)
        return -1;
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) Resigning and Electing %d\n"),
                  fw->owner ()));
      return (fw->signal () == 0) ? 0 : -1;
    }
  else
    {
      ACE_DEBUG
        ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left\n")));
      return -1;
    }
}
// Listing 5
 
void
LF_ThreadPool::process_message (ACE_Message_Block *mb)
{
  ACE_TRACE ("LF_ThreadPool::process_message");
  int msgId;
  ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
  mb->release ();
 
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Started processing message:%d\n"),
              msgId));
  ACE_OS::sleep (1);
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Finished processing message:%d\n"),
              msgId));
}
 
long LF_ThreadPool::LONG_TIME = 5L;
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  LF_ThreadPool tp;
  tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);
 
  // Wait for a few seconds...
  ACE_OS::sleep (2);
  ACE_Time_Value tv (1L);
 
  ACE_Message_Block *mb = 0;
  for (int i = 0; i < 30; i++)
    {
      ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);
      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
      ACE_OS::sleep (tv);
 
      // Add a new work item.
      tp.putq (mb);
    }
 
  ACE_Thread_Manager::instance ()->wait ();
 
  ACE_OS::sleep (10);
 
  return 0;
}
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

Request Handler

/**
 * $Id: Request_Handler.h 80826 2008-03-04 14:51:23Z wotte $
 *
 * Sample code from The ACE Programmer's Guide,
 * copyright 2003 Addison-Wesley. All Rights Reserved.
 */
 
#ifndef __REQUEST_HANDLER_H_
#define __REQUEST_HANDLER_H_
 
#include "ace/Svc_Handler.h"
#include "ace/SOCK_Stream.h"
 
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
class ACE_Thread_Manager;
ACE_END_VERSIONED_NAMESPACE_DECL
 
class Request_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
  {
    // = TITLE
    //   This class is the Svc_Handler used by <Acceptor>.
  public:
    Request_Handler (ACE_Thread_Manager *tm = 0);
    // The default constructor makes sure the right reactor is used.
 
  protected:
    virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
    virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask = 0);
 
  private:
    size_t  nr_msgs_rcvd_;
  };
 
#endif /* __REQUEST_HANDLER_H_ */

Thread Pool Reactor

// == == == == == == == == == == == == == == == == == == == == == == ==
// $Id: TP_Reactor.cpp 82610 2008-08-12 19:46:36Z parsons $
// Stolen from $ACE_ROOT/tests/Thread_Pool_Reactor_Test.cpp
// Thread_Pool_Reactor_Test.cpp, v 1.29 2001/03/20 01:07:21 irfan Exp
// = AUTHOR
//      Irfan Pyarali <irfan@cs.wustl.edu> and
//      Nanbor Wang <nanbor@cs.wustl.edu>
// == == == == == == == == == == == == == == == == == == == == == == ==
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_unistd.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Acceptor.h"
#include "ace/Thread_Manager.h"
#include "ace/TP_Reactor.h"
#include "ace/Truncate.h"
 
#include "Request_Handler.h"
 
// Accepting end point.  This is actually "localhost:10010", but some
// platform couldn't resolve the name so we use the IP address
// directly here.
static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010");
 
// Total number of server threads.
static size_t svr_thrno = 5;
 
// Total number of client threads.
static size_t cli_runs = 2;
 
// Total connection attemps of a client thread.
static size_t cli_conn_no = 2;
 
// Total requests a client thread sends.
static size_t cli_req_no = 5;
 
// Delay before a thread sending the next request (in msec.)
static int req_delay = 50;
 
 
typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR;
 
 
Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr)
    : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr),
    nr_msgs_rcvd_(0)
{
  this->reactor (ACE_Reactor::instance ());
}
 
int
Request_Handler::handle_input (ACE_HANDLE fd)
{
  ACE_TCHAR buffer[BUFSIZ];
  ACE_TCHAR len = 0;
  ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR));
 
  if (result > 0
      && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR))
      == static_cast<ssize_t> (len * sizeof (ACE_TCHAR)))
    {
      ++this->nr_msgs_rcvd_;
 
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) svr input; fd: 0x%x; input: %s\n"),
                  fd,
                  buffer));
      if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0)
        ACE_Reactor::instance()->end_reactor_event_loop ();
      return 0;
    }
  else
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Request_Handler: 0x%x peer closed (0x%x)\n"),
                this, fd));
  return -1;
}
 
int
Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) svr close; fd: 0x%x, rcvd %d msgs\n"),
              fd,
              this->nr_msgs_rcvd_));
 
  if (this->nr_msgs_rcvd_ != cli_req_no)
    ACE_ERROR((LM_ERROR,
               ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"),
               this,
               cli_req_no,
               this->nr_msgs_rcvd_));
 
  this->destroy ();
  return 0;
}
 
// Listing 2 code/ch16
static int
reactor_event_hook (ACE_Reactor *)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) handling events ....\n")));
 
  return 0;
}
 
class ServerTP : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Running the event loop\n")));
 
    int result =
      ACE_Reactor::instance ()->run_reactor_event_loop
        (&reactor_event_hook);
 
    if (result == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("(%t) %p\n"),
                         ACE_TEXT ("Error handling events")),
                        0);
 
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Done handling events.\n")));
 
    return 0;
  }
};
// Listing 2
 
class Client: public ACE_Task_Base
  {
  public:
    Client()
        :addr_(rendezvous)
    {}
 
    virtual int svc()
    {
      ACE_OS::sleep (3);
      const ACE_TCHAR *msg =
        ACE_TEXT ("Message from Connection worker");
 
      ACE_TCHAR buf [BUFSIZ];
      buf[0] =
        ACE_Utils::truncate_cast<ACE_TCHAR> (ACE_OS::strlen (msg) + 1);
      ACE_OS::strcpy (&buf[1], msg);
 
      for (size_t i = 0; i < cli_runs; i++)
        send_work_to_server(buf);
 
      shut_down();
 
      return 0;
    }
 
  private:
    void send_work_to_server(ACE_TCHAR* arg)
    {
      ACE_SOCK_Stream stream;
      ACE_SOCK_Connector connect;
      ACE_Time_Value delay (0, req_delay);
      size_t len = * reinterpret_cast<ACE_TCHAR *> (arg);
 
      for (size_t i = 0 ; i < cli_conn_no; i++)
        {
          if (connect.connect (stream, addr_) < 0)
            {
              ACE_ERROR ((LM_ERROR,
                          ACE_TEXT ("(%t) %p\n"),
                          ACE_TEXT ("connect")));
              continue;
            }
 
          for (size_t j = 0; j < cli_req_no; j++)
            {
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("Sending work to server on handle 0x%x, req %d\n"),
                          stream.get_handle (),
                          j+1));
              if (stream.send_n (arg,
                                 (len + 1) * sizeof (ACE_TCHAR)) == -1)
                {
                  ACE_ERROR ((LM_ERROR,
                              ACE_TEXT ("(%t) %p\n"),
                              ACE_TEXT ("send_n")));
                  continue;
                }
              ACE_OS::sleep (delay);
            }
 
          stream.close ();
        }
 
    }
 
    void shut_down()
    {
      ACE_SOCK_Stream stream;
      ACE_SOCK_Connector connect;
 
      if (connect.connect (stream, addr_) == -1)
        ACE_ERROR ((LM_ERROR,
                    ACE_TEXT ("(%t) %p Error while connecting\n"),
                    ACE_TEXT ("connect")));
 
      const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown");
 
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("shutdown stream handle = %x\n"),
                  stream.get_handle ()));
 
      if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1)
        ACE_ERROR ((LM_ERROR,
                    ACE_TEXT ("(%t) %p\n"),
                    ACE_TEXT ("send_n")));
 
      stream.close ();
    }
  private:
    ACE_INET_Addr addr_;
  };
// Listing 1 code/ch16
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_TP_Reactor sr;
  ACE_Reactor new_reactor (&sr);
  ACE_Reactor::instance (&new_reactor);
 
  ACCEPTOR acceptor;
  ACE_INET_Addr accept_addr (rendezvous);
 
  if (acceptor.open (accept_addr) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("open")),
                      1);
 
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Spawning %d server threads...\n"),
              svr_thrno));
 
  ServerTP serverTP;
  serverTP.activate (THR_NEW_LWP | THR_JOINABLE,
                     ACE_Utils::truncate_cast<int> (svr_thrno));
 
  Client client;
  client.activate ();
 
  ACE_Thread_Manager::instance ()->wait ();
 
  return 0;
}
// Listing 1
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

Task Thread Pool

// $Id: Task_ThreadPool.cpp 94310 2011-07-09 19:10:06Z schmidt $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_time.h"
#include "ace/Task.h"
#include "ace/Synch.h"
#include "ace/SString.h"
 
// Listing 2 code/ch16
class Workers : public ACE_Task<ACE_MT_SYNCH>
{
public:
  Workers ()
  { }
 
  virtual int svc (void)
  {
    while (1)
      {
        ACE_Message_Block *mb = 0;
        if (this->getq (mb) == -1)
          {
            ACE_DEBUG ((LM_INFO,
                        ACE_TEXT ("(%t) Shutting down\n")));
            break;
          }
 
        // Process the message.
        process_message (mb);
      }
 
    return 0;
  }
  // Listing 2
 
private:
  void process_message (ACE_Message_Block *mb)
  {
    ACE_TRACE ("Workers::process_message");
    int msgId;
    ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
    mb->release ();
 
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Started processing message %d\n"),
                msgId));
    ACE_OS::sleep (3);
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Finished processing message %d\n"),
                msgId));
  }
};
 
// Listing 1 code/ch16
class Manager : public ACE_Task<ACE_MT_SYNCH>
{
public:
  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
 
  Manager () : shutdown_(0)
  {
    ACE_TRACE ("Manager::Manager");
  }
 
  int svc (void)
  {
    ACE_TRACE ("Manager::svc");
 
    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
 
    // Create pool.
    Workers pool;
    pool.activate (THR_NEW_LWP | THR_JOINABLE, POOL_SIZE);
 
    while (!done ())
      {
        ACE_Message_Block *mb = 0;
        ACE_Time_Value tv ((long)MAX_TIMEOUT);
        tv += ACE_OS::time (0);
 
        // Get a message request.
        if (this->getq (mb, &tv) < 0)
          {
            pool.msg_queue ()->deactivate ();
            pool.wait ();
            break;
          }
 
        // Ask the worker pool to do the job.
        pool.putq (mb);
      }
 
    return 0;
  }
 
private:
  int done (void);
 
  int shutdown_;
};
// Listing 1
 
int Manager::done (void)
{
  return (shutdown_ == 1);
}
 
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  Manager tp;
  tp.activate ();
 
  // Wait for a moment every time you send a message.
  ACE_Time_Value tv;
  tv.msec (100);
 
  ACE_Message_Block *mb = 0;
  for (int i = 0; i < 30; i++)
    {
      ACE_NEW_RETURN
        (mb, ACE_Message_Block(sizeof(int)), -1);
 
      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
 
      ACE_OS::sleep (tv);
 
      // Add a new work item.
      tp.putq (mb);
    }
 
  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

Thread Pool

// $Id: ThreadPool.cpp 94310 2011-07-09 19:10:06Z schmidt $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_time.h"
#include "ace/Task.h"
#include "ace/Containers.h"
#include "ace/Synch.h"
#include "ace/SString.h"
#include "ace/Method_Request.h"
#include "ace/Future.h"
#include "ace/Activation_Queue.h"
#include "ace/Condition_T.h"
 
class Worker;
 
class IManager
{
public:
  virtual ~IManager (void) { }
 
  virtual int return_to_work (Worker *worker) = 0;
};
 
// Listing 2 code/ch16
class Worker : public ACE_Task<ACE_MT_SYNCH>
{
public:
  Worker (IManager *manager) : manager_(manager) { }
 
  virtual int svc (void)
  {
    ACE_Thread_ID id;
    thread_id_ = id;
    while (1)
      {
        ACE_Message_Block *mb = 0;
        if (this->getq (mb) == -1)
          ACE_ERROR_BREAK
            ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));
        if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
          {
            ACE_DEBUG ((LM_INFO,
                        ACE_TEXT ("(%t) Shutting down\n")));
            mb->release ();
            break;
          }
        // Process the message.
        process_message (mb);
        // Return to work.
        this->manager_->return_to_work (this);
      }
 
    return 0;
  }
  // Listing 2
 
  const ACE_Thread_ID& thread_id (void)
  {
    return this->thread_id_;
  }
 
private:
  void process_message (ACE_Message_Block *mb)
  {
    ACE_TRACE ("Worker::process_message");
    int msgId;
    ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
    mb->release ();
 
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Started processing message %d\n"),
                msgId));
    ACE_OS::sleep (3);
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Finished processing message %d\n"),
                msgId));
  }
 
  IManager *manager_;
  ACE_Thread_ID thread_id_;
};
 
// Listing 1 code/ch16
class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
{
public:
  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
 
  Manager ()
    : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
  {
    ACE_TRACE ("Manager::Manager");
  }
 
  int svc (void)
  {
    ACE_TRACE ("Manager::svc");
 
    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
 
    // Create pool.
    create_worker_pool ();
 
    while (!done ())
      {
        ACE_Message_Block *mb = 0;
        ACE_Time_Value tv ((long)MAX_TIMEOUT);
        tv += ACE_OS::time (0);
 
        // Get a message request.
        if (this->getq (mb, &tv) < 0)
          {
            shut_down ();
            break;
          }
 
        // Choose a worker.
        Worker *worker = 0;
        {
          ACE_GUARD_RETURN (ACE_Thread_Mutex,
                            worker_mon, this->workers_lock_, -1);
 
          while (this->workers_.is_empty ())
            workers_cond_.wait ();
 
          this->workers_.dequeue_head (worker);
        }
 
        // Ask the worker to do the job.
        worker->putq (mb);
      }
 
    return 0;
  }
 
  int shut_down (void);
 
  const ACE_Thread_ID& thread_id (Worker *worker);
 
  virtual int return_to_work (Worker *worker)
  {
    ACE_GUARD_RETURN (ACE_Thread_Mutex,
                      worker_mon, this->workers_lock_, -1);
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Worker %t returning to work.\n")));
    this->workers_.enqueue_tail (worker);
    this->workers_cond_.signal ();
 
    return 0;
  }
 
private:
  int create_worker_pool (void)
  {
    ACE_GUARD_RETURN (ACE_Thread_Mutex,
                      worker_mon,
                      this->workers_lock_,
                      -1);
    for (int i = 0; i < POOL_SIZE; i++)
      {
        Worker *worker = 0;
        ACE_NEW_RETURN (worker, Worker (this), -1);
        this->workers_.enqueue_tail (worker);
        worker->activate ();
      }
 
    return 0;
  }
 
  int done (void);
 
private:
  int shutdown_;
  ACE_Thread_Mutex workers_lock_;
  ACE_Condition<ACE_Thread_Mutex> workers_cond_;
  ACE_Unbounded_Queue<Worker* > workers_;
};
// Listing 1
 
int Manager::done (void)
{
  return (shutdown_ == 1);
}
 
int
Manager::shut_down (void)
{
  ACE_TRACE ("Manager::shut_down");
  ACE_Unbounded_Queue<Worker* >::ITERATOR iter =
    this->workers_.begin ();
  Worker **worker_ptr = 0;
  do
    {
      iter.next (worker_ptr);
      Worker *worker = (*worker_ptr);
      ACE_Thread_ID id = thread_id (worker);
      char buf [65];
      id.to_string (buf);
      ACE_DEBUG ((LM_DEBUG,
                 ACE_TEXT ("(%t) Attempting shutdown of %C\n"),
                 buf));
 
      // Send the hangup message.
      ACE_Message_Block *mb = 0;
      ACE_NEW_RETURN
        (mb,
         ACE_Message_Block(0,
                           ACE_Message_Block::MB_HANGUP),
         -1);
      worker->putq (mb);
 
      // Wait for the exit.
      worker->wait ();
 
      ACE_ASSERT (worker->msg_queue ()->is_empty ());
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) Worker %C shut down.\n"),
                  buf));
      delete worker;
    }
  while (iter.advance ());
 
  shutdown_ = 1;
 
  return 0;
}
 
const ACE_Thread_ID&
Manager::thread_id (Worker *worker)
{
  return worker->thread_id ();
}
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  Manager tp;
  tp.activate ();
 
  // Wait for a moment every time you send a message.
  ACE_Time_Value tv;
  tv.msec (100);
 
  ACE_Message_Block *mb = 0;
  for (int i = 0; i < 30; i++)
    {
      ACE_NEW_RETURN
        (mb, ACE_Message_Block(sizeof(int)), -1);
 
      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
 
      ACE_OS::sleep (tv);
 
      // Add a new work item.
      tp.putq (mb);
    }
 
  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */