ACE/FAQ/APG/ThreadManagement

Материал из Wiki.crossplatform.ru

Перейти к: навигация, поиск

Содержание


[править] Async Cancel

// $Id: Async_Cancel.cpp 94362 2011-08-04 15:54:26Z mesnier_p $
 
#include "ace/OS_NS_unistd.h"
#include "ace/Task.h"
#include "ace/Log_Msg.h"
 
#if defined (ACE_HAS_PTHREADS) && !defined (ACE_LACKS_PTHREAD_CANCEL)
// Only works on Pthreads...
 
// Listing 1 code/ch13
class CanceledTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Starting thread\n")));
 
    if (this->set_cancel_mode () < 0)
      return -1;
 
    while (1)
      {
        // Put this thread in a compute loop.. no
        // cancellation points are available.
      }
  }
 
  int set_cancel_mode (void)
  {
    cancel_state new_state;
 
    // Set the cancel state to asynchronous and enabled.
    new_state.cancelstate = PTHREAD_CANCEL_ENABLE;
    new_state.canceltype  = PTHREAD_CANCEL_ASYNCHRONOUS;
    if (ACE_Thread::setcancelstate (new_state, 0) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("cancelstate")), -1);
    return 0;
  }
};
// Listing 1
// Listing 2 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  CanceledTask task;
  task.activate ();
  ACE_OS::sleep (1);
  ACE_Thread_Manager::instance ()->cancel_task (&task, 1);
  task.wait ();
 
  return 0;
}
// Listing 2
 
#else  /* ACE_HAS_PTHREADS */
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts ("This example works on Pthreads platforms.\n");
  return 0;
}
#endif /* ACE_HAS_PTHREADS */

[править] Coop Cancel

// $Id: Coop_Cancel.cpp 84565 2009-02-23 08:20:39Z johnnyw $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_time.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Task.h"
#include "ace/Log_Msg.h"
 
// Listing 1 code/ch13
class CanceledTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
 
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n")));
 
    // Cache our ACE_Thread_Manager pointer.
    ACE_Thread_Manager *mgr = this->thr_mgr ();
    while (1)
      {
        if (mgr->testcancel (mgr->thr_self ()))
          return 0;
 
        ACE_Message_Block *mb = 0;
        ACE_Time_Value tv (0, 1000);
        tv += ACE_OS::time (0);
        int result = this->getq (mb, &tv);
        if (result == -1 && errno == EWOULDBLOCK)
          continue;
        else
          {
            // Do real work.
          }
      }
 
    ACE_NOTREACHED (return 0);
  }
};
// Listing 1
 
// Listing 2 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  CanceledTask task;
  task.activate ();
 
  ACE_OS::sleep (1);
 
  ACE_Thread_Manager::instance ()->cancel_task (&task);
  task.wait ();
  return 0;
}
// Listing 2
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

[править] ExitHandler

// $Id: ExitHandler.cpp 84565 2009-02-23 08:20:39Z johnnyw $
 
// Listing 1 code/ch13
#include "ace/Task.h"
#include "ace/Log_Msg.h"
 
class ExitHandler : public ACE_At_Thread_Exit
{
public:
  virtual void apply (void)
  {
    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) is exiting\n")));
 
    // Shut down all devices.
  }
};
// Listing 1
// Listing 2 code/ch13
class HA_CommandHandler : public ACE_Task_Base
{
public:
  HA_CommandHandler(ExitHandler& eh) : eh_(eh)
  { }
 
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n")));
 
    this->thr_mgr ()->at_exit (eh_);
 
    // Do something.
 
    // Forcefully exit.
    ACE_Thread::exit ();
 
    // NOT REACHED
    return 0;
  }
 
private:
  ExitHandler& eh_;
};
// Listing 2
// Listing 3 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ExitHandler eh;
 
  HA_CommandHandler handler (eh);
  handler.activate ();
 
  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}
// Listing 3
#if 0
// Listing 4 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ExitHandler eh;
  ACE_Thread_Manager tm;
 
  HA_CommandHandler handler (eh);
  handler.thr_mgr (&tm);
  handler.activate ();
 
  tm.wait();
  return 0;
}
// Listing 4
#endif

[править] Pool

// $Id: Pool.cpp 91626 2010-09-07 10:59:20Z johnnyw $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/Task.h"
#include "ace/Log_Msg.h"
 
// Listing 1 code/ch13
class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH>
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n")));
    ACE_Message_Block *mb = 0;
    if (this->getq (mb) == -1)
      return -1;
    // ... do something with the message.
    return 0;
  }
};
// Listing 1
// Listing 2 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  HA_CommandHandler handler;
 
  // Create 4 threads.
  handler.activate (THR_NEW_LWP | THR_JOINABLE, 4);
  handler.wait ();
  return 0;
}
// Listing 2
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

[править] Priorities

// $Id: Priorities.cpp 80826 2008-03-04 14:51:23Z wotte $
 
#include "ace/config-lite.h"
 
#if defined (ACE_HAS_THREADS)
 
#include "ace/Task.h"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_unistd.h"
 
// Listing 2 code/ch13
class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH>
{
public:
  HA_CommandHandler (const char *name) : name_ (name)
  { }
 
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up %C\n"),
                name_));
 
    ACE_OS::sleep (2);
    ACE_Message_Block *mb = 0;
    while (this->getq (mb) != -1)
      {
        if (mb->msg_type () == ACE_Message_Block::MB_BREAK)
          {
            mb->release ();
            break;
          }
        process_message (mb);
        mb->release ();
      }
    return 0;
  }
 
  void process_message (ACE_Message_Block *)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Processing message %C\n"),
                name_));
    // Simulate compute bound task.
    for (int i = 0; i < 100; i++)
      ;
  }
 
private:
  const char *name_;
};
// Listing 2
 
#if !defined (ACE_THR_PRI_OTHER_MAX)
// This should be fixed in ACE... There's no _MAX, _MIN values for
// thread priorities.
#if defined (ACE_WIN32)
#  define ACE_THR_PRI_OTHER_MAX ((ACE_THR_PRI_OTHER_DEF) + 1)
#elif defined (VXWORKS)
#  define ACE_THR_PRI_OTHER_MAX 0
#endif
#endif
 
// Listing 1 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  HA_CommandHandler hp_handler ("HighPriority");
  hp_handler.activate (THR_NEW_LWP | THR_JOINABLE,
                       1, 1, ACE_THR_PRI_OTHER_MAX);
 
  HA_CommandHandler lp_handler ("LowPriority");
  lp_handler.activate (THR_NEW_LWP | THR_JOINABLE,
                       1, 1, ACE_THR_PRI_OTHER_DEF);
 
  ACE_Message_Block mb;
  for (int i = 0; i < 100; i++)
    {
      ACE_Message_Block *mb_hp, *mb_lp;
      mb_hp = mb.clone ();
      mb_lp = mb.clone ();
      hp_handler.putq (mb_hp);
      lp_handler.putq (mb_lp);
    }
 
  ACE_Message_Block stop (0, ACE_Message_Block::MB_BREAK);
  hp_handler.putq (stop.clone ());
  lp_handler.putq (stop.clone ());
  hp_handler.wait ();
  lp_handler.wait ();
 
  return 0;
}
// Listing 1
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

[править] SecurityContext.h

/**
 * $Id: SecurityContext.h 80826 2008-03-04 14:51:23Z wotte $
 *
 * Sample code from The ACE Programmer's Guide,
 * copyright 2003 Addison-Wesley. All Rights Reserved.
 */
 
#ifndef __SECURITYCONTEXT_H_
#define __SECURITYCONTEXT_H_
 
struct SecurityContext
  {
    const char * user;
  };
 
#endif /* __SECURITYCONTEXT_H_ */

[править] Signals

// $Id: Signals.cpp 80826 2008-03-04 14:51:23Z wotte $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_time.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Task.h"
#include "ace/Log_Msg.h"
#include "ace/Signal.h"
#include "ace/Sig_Handler.h"
 
// Listing 1 code/ch13
class SignalableTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
  virtual int handle_signal (int signum,
                             siginfo_t *  = 0,
                             ucontext_t * = 0)
  {
    if (signum == SIGUSR1)
      {
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("(%t) received a %S signal\n"),
                    signum));
        handle_alert ();
      }
    return 0;
  }
 
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Starting thread\n")));
 
    while (1)
      {
        ACE_Message_Block* mb = 0;
        ACE_Time_Value tv (0, 1000);
        tv += ACE_OS::time (0);
        int result = this->getq (mb, &tv);
        if (result == -1 && errno == EWOULDBLOCK)
          continue;
        else
          process_message (mb);
      }
 
      ACE_NOTREACHED (return 0);
    }
 
  void handle_alert ();
  void process_message (ACE_Message_Block *mb);
};
// Listing 1
 
void
SignalableTask::process_message (ACE_Message_Block *)
{
}
 
void
SignalableTask::handle_alert ()
{
}
 
// Listing 2 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  SignalableTask handler;
  handler.activate (THR_NEW_LWP | THR_JOINABLE , 5);
 
  ACE_Sig_Handler sh;
  sh.register_handler (SIGUSR1, &handler);
 
  ACE_OS::sleep (1);
 
  ACE_Thread_Manager::instance () ->
    kill_grp (handler.grp_id (), SIGUSR1);
  handler.wait ();
  return 0;
}
// Listing 2
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

[править] Signals2

// $Id: Signals2.cpp 84565 2009-02-23 08:20:39Z johnnyw $
 
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
 
#include "ace/OS_NS_time.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Task.h"
#include "ace/Log_Msg.h"
#include "ace/Signal.h"
#include "ace/Sig_Handler.h"
 
class SignalableTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
  virtual int handle_signal (int signum,
                             siginfo_t *  = 0,
                             ucontext_t * = 0)
  {
    if (signum == SIGUSR1)
      {
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("(%t) received a %S signal\n"),
                    signum));
        handle_alert();
      }
 
    return 0;
  }
 
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Starting thread\n")));
 
    while (1)
      {
        ACE_Message_Block* mb = 0;
        ACE_Time_Value tv (0, 1000);
        tv += ACE_OS::time (0);
 
        int result = this->getq(mb, &tv);
 
        if (result == -1 && errno == EWOULDBLOCK)
          continue;
        else
          process_message (mb);
      }
 
    ACE_NOTREACHED (return 0);
  }
 
  void handle_alert ();
  void process_message (ACE_Message_Block *mb);
};
 
void
SignalableTask::process_message (ACE_Message_Block *)
{
  return;
}
 
void
SignalableTask::handle_alert (void)
{
  return;
}
 
// Listing 1 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Main thread\n")));
  SignalableTask handler;
  handler.activate (THR_NEW_LWP | THR_JOINABLE, 5);
 
  ACE_Sig_Handler sh;
  sh.register_handler (SIGUSR1, &handler);
 
  ACE_OS::sleep (1);      // Allow threads to start
 
  for (int i = 0; i < 5; i++)
    ACE_OS::kill (ACE_OS::getpid (), SIGUSR1);
  handler.wait ();
  return 0;
}
// Listing 1
 
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}
 
#endif /* ACE_HAS_THREADS */

[править] Start Hook

// $Id: Start_Hook.cpp 84565 2009-02-23 08:20:39Z johnnyw $
 
#include "ace/Thread_Hook.h"
#include "ace/Task.h"
#include "ace/Log_Msg.h"
 
#include "SecurityContext.h"
 
// Listing 1 code/ch13
class HA_ThreadHook : public ACE_Thread_Hook
{
public:
  virtual ACE_THR_FUNC_RETURN start (ACE_THR_FUNC func, void* arg)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%t) New Thread Spawned\n")));
 
    // Create the context on the thread's own stack.
    ACE_TSS<SecurityContext> secCtx;
    // Special initialization.
    add_sec_context_thr (secCtx);
 
    return (*func) (arg);
  }
 
  void add_sec_context_thr (ACE_TSS<SecurityContext> &secCtx);
};
// Listing 1
 
void
HA_ThreadHook::add_sec_context_thr(ACE_TSS<SecurityContext> &secCtx)
{
  secCtx->user = 0;
}
 
 
class HA_CommandHandler : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n")));
 
    // Do something.
 
    return 0;
  }
};
// Listing 2 code/ch13
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  HA_ThreadHook hook;
  ACE_Thread_Hook::thread_hook (&hook);
 
  HA_CommandHandler handler;
  handler.activate ();
  handler.wait();
  return 0;
}
// Listing 2

[править] State

// $Id: State.cpp 91813 2010-09-17 07:52:52Z johnnyw $
 
#include "ace/Task.h"
 
class HA_CommandHandler : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG
      ((LM_DEBUG, ACE_TEXT ("(%t) Handler Thread running\n")));
    return 0;
  }
};
 
 
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Main Thread running\n")));
// Listing 1 code/ch13
  HA_CommandHandler handler;
  int result = handler.activate (THR_NEW_LWP |
                                 THR_JOINABLE |
                                 THR_SUSPENDED);
  ACE_ASSERT (result == 0);
 
  ACE_UNUSED_ARG (result);
 
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) The current thread count is %d\n"),
              handler.thr_count ()));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) The group identifier is %d\n"),
              handler.grp_id ()));
  handler.resume ();
  handler.wait ();
// Listing 1
  return 0;
}