ACE/FAQ/APG/Streams

Материал из Wiki.crossplatform.ru

Перейти к: навигация, поиск

Содержание


An answering machine based on a one-way ACE Stream

/**
 * $Id: Answerer.cpp 94345 2011-07-24 04:29:17Z mesnier_p $
 *
 * Streams Listing 01
 *
 * An answering machine based on a one-way ACE_Stream
 */
 
#include "ace/OS_NS_string.h"
#include "ace/Stream.h"
#include "ace/Message_Block.h"
#include "ace/FILE_IO.h"
 
#include "MessageInfo.h"
#include "Message.h"
#include "BasicTask.h"
#include "EndTask.h"
#include "Util.h"
#include "RecordingDevice.h"
 
// Listing 21 code/ch18
class AnswerIncomingCall : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("AnswerIncomingCall::process()");
 
    if (message->recorder ()->answer_call () < 0)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("AnswerIncomingCall")),
                        -1);
      return 0;
  }
};
// Listing 21
 
// Listing 22 code/ch18
class GetCallerId : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("GetCallerId::process()");
 
    CallerId *id;
    id = message->recorder ()->retrieve_callerId ();
    if (!id)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("GetCallerId")),
                        -1);
 
    message->caller_id (id);
    return 0;
  }
};
// Listing 22
 
// Listing 23 code/ch18
class PlayOutgoingMessage : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("PlayOutgoingMessage::process()");
 
    ACE_FILE_Addr outgoing_message =
      this->get_outgoing_message (message);
 
    int pmrv =
      message->recorder ()->play_message (outgoing_message);
    if (pmrv < 0)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("PlayOutgoingMessage")),
                        -1);
    return 0;
  }
 
  ACE_FILE_Addr get_outgoing_message (Message *)
  {
    // Exclude 23
    return ACE_FILE_Addr (ACE_TEXT ("/tmp/outgoing_message"));
    // Exclude 23
  }
};
// Listing 23
 
// Listing 24 code/ch18
class RecordIncomingMessage : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("RecordIncomingMessage::process()");
 
    ACE_FILE_Addr incoming_message =
      this->get_incoming_message_queue ();
 
    MessageType *type =
      message->recorder ()->record_message (incoming_message);
    if (!type)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("RecordIncomingMessage")),
                        -1);
    message->incoming_message (incoming_message, type);
    return 0;
  }
 
  ACE_FILE_Addr get_incoming_message_queue (void)
  {
    // Exclude 24
    return ACE_FILE_Addr (ACE_TEXT ("/tmp/incoming_message"));
    // Exclude 24
  }
};
// Listing 24
 
// Listing 25 code/ch18
class ReleaseDevice : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("ReleaseDevice::process()");
    message->recorder ()->release ();
    return 0;
  }
};
// Listing 25
 
// Listing 26 code/ch18
class EncodeMessage : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("EncodeMessage::process()");
 
    ACE_FILE_Addr &incoming = message->addr ();
    ACE_FILE_Addr addr = this->get_message_destination (message);
 
    if (message->is_text ())
      Util::convert_to_unicode (incoming, addr);
    else if (message->is_audio ())
      Util::convert_to_mp3 (incoming, addr);
    else if (message->is_video ())
      Util::convert_to_mpeg (incoming, addr);
 
    message->addr (addr);
    return 0;
  }
 
  ACE_FILE_Addr get_message_destination (Message *)
  {
    // Exclude 26
    return ACE_FILE_Addr (ACE_TEXT ("/tmp/encoded_message"));
    // Exclude 26
  }
};
// Listing 26
 
// Listing 27 code/ch18
class SaveMetaData : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("SaveMetaData::process()");
 
    ACE_TString path (message->addr ().get_path_name ());
    path += ACE_TEXT (".xml");
 
    ACE_FILE_Connector connector;
    ACE_FILE_IO file;
    ACE_FILE_Addr addr (path.c_str ());
    if (connector.connect (file, addr) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("create meta-data file")),
                        0);
 
    file.truncate (0);
    this->write (file, "<Message>\n");
    // ...
    this->write (file, "</Message>\n");
    file.close ();
    return 0;
  }
 
private:
  //FUZZ: disable check_for_lack_ACE_OS
  int write (ACE_FILE_IO &file, const char *str)
  {
  //FUZZ: enable check_for_lack_ACE_OS
    return file.send (str, ACE_OS::strlen (str));
  }
};
// Listing 27
 
// Listing 28 code/ch18
class NotifySomeone : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE ("NotifySomeone::process()");
 
    // Format an email to tell someone about the
    // newly received message.
    // ...
 
    // Display message information in the logfile
    ACE_DEBUG ((LM_INFO,
                ACE_TEXT ("New message from %s ")
                ACE_TEXT ("received and stored at %s\n"),
                message->caller_id ()->string (),
                message->addr ().get_path_name ()));
    return 0;
  }
};
// Listing 28
 
// Listing 10 code/ch18
class RecordingStream : public ACE_Stream<ACE_MT_SYNCH>
{
public:
  typedef ACE_Stream<ACE_MT_SYNCH> inherited;
  typedef ACE_Module<ACE_MT_SYNCH> Module;
 
  RecordingStream () : inherited()
  { }
// Listing 10
 
  //FUZZ: disable check_for_lack_ACE_OS
  // Listing 1000 code/ch18
  virtual int open (void *arg,
                    Module *head = 0, Module *tail = 0)
  {
  //FUZZ: enable check_for_lack_ACE_OS
    if (tail == 0)
      ACE_NEW_RETURN (tail,
                      Module (ACE_TEXT ("End Module"), new TheEndTask ()),
                      -1);
    this->inherited::open (arg, head, tail);
    // Listing 1000
 
    // Listing 1001 code/ch18
    Module *answerIncomingCallModule;
    ACE_NEW_RETURN (answerIncomingCallModule,
                    Module (ACE_TEXT ("Answer Incoming Call"),
                            new AnswerIncomingCall ()),
                    -1);
 
    // Listing 11 code/ch18
    Module *getCallerIdModule;
    ACE_NEW_RETURN (getCallerIdModule,
                    Module (ACE_TEXT ("Get Caller ID"), new GetCallerId ()),
                    -1);
    // Listing 11
 
    Module *playOGMModule;
    ACE_NEW_RETURN (playOGMModule,
                    Module (ACE_TEXT ("Play Outgoing Message"),
                            new PlayOutgoingMessage ()),
                    -1);
 
    Module *recordModule;
    ACE_NEW_RETURN (recordModule,
                    Module (ACE_TEXT ("Record Incoming Message"),
                            new RecordIncomingMessage ()),
                    -1);
 
    Module *releaseModule;
    ACE_NEW_RETURN (releaseModule,
                    Module (ACE_TEXT ("Release Device"),
                            new ReleaseDevice ()),
                    -1);
 
    Module *conversionModule;
    ACE_NEW_RETURN (conversionModule,
                    Module (ACE_TEXT ("Encode Message"),
                            new EncodeMessage ()),
                    -1);
 
    Module *saveMetaDataModule;
    ACE_NEW_RETURN (saveMetaDataModule,
                    Module (ACE_TEXT ("Save Meta-Data"),
                            new SaveMetaData ()),
                    -1);
 
    Module *notificationModule;
    ACE_NEW_RETURN (notificationModule,
                    Module (ACE_TEXT ("Notify Someone"),
                            new NotifySomeone ()),
                    -1);
    // Listing 1001
 
    // Listing 12 code/ch18
    if (this->push (notificationModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n"),
                         ACE_TEXT ("notificationModule")),
                        -1);
    if (this->push (saveMetaDataModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n"),
                         ACE_TEXT ("saveMetaDataModule")),
                        -1);
    if (this->push (conversionModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n"),
                         ACE_TEXT ("conversionModule")),
                        -1);
    if (this->push (releaseModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n"),
                         ACE_TEXT ("releaseModule")),
                        -1);
    if (this->push (recordModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n"),
                         ACE_TEXT ("recordModule")),
                        -1);
    if (this->push (playOGMModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n"),
                         ACE_TEXT ("playOGMModule")),
                        -1);
    if (this->push (getCallerIdModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n"),
                         ACE_TEXT ("getCallerIdModule")),
                        -1);
    if (this->push (answerIncomingCallModule) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("Failed to push %p\n")
                         ACE_TEXT ("answerIncomingCallModule")),
                        -1);
    // Listing 12
 
    return 0;
  }
 
  // Listing 13 code/ch18
  int record (RecordingDevice *recorder)
  {
    ACE_Message_Block * mb = 0;
    ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(Message)), -1);
 
    Message *message = (Message *)mb->wr_ptr ();
    mb->wr_ptr (sizeof(Message));
 
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("RecordingStream::record() - ")
                ACE_TEXT ("message->recorder(recorder)\n")));
    message->recorder (recorder);
 
    int rval = this->put (mb);
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("RecordingStream::record() - ")
                ACE_TEXT ("this->put() returns %d\n"),
                rval));
    return rval;
  }
  // Listing 13
};
 
 
// Listing 1 code/ch18
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  RecordingDevice *recorder =
    RecordingDeviceFactory::instantiate (argc, argv);
  // Listing 1
 
  // Listing 2 code/ch18
  RecordingStream *recording_stream;
  ACE_NEW_RETURN (recording_stream, RecordingStream, -1);
 
  if (recording_stream->open (0) < 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("RecordingStream->open()")),
                      0);
  // Listing 2
 
  // Listing 3 code/ch18
  for (;;)
    {
      ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("Waiting for incoming message\n")));
      RecordingDevice *activeRecorder =
        recorder->wait_for_activity ();
 
      ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("Initiating recording process\n")));
 
      recording_stream->record (activeRecorder);
    }
  // Listing 3
 
  ACE_NOTREACHED (return 0;)
}

BasicTask.h

/* -*- C++ -*- */
// $Id: BasicTask.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef BASIC_TASK_H
#define BASIC_TASK_H
 
#include "ace/Task_T.h"
#include "ace/ace_wchar.h"
 
// Listing 100 code/ch18
class BasicTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
  typedef ACE_Task<ACE_MT_SYNCH> inherited;
 
  BasicTask () : inherited()
  { }
 
  virtual int open (void * = 0)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("BasicTask::open() starting ")
                ACE_TEXT ("%d threads\n"),
                this->desired_threads ()));
 
    return this->activate (THR_NEW_LWP | THR_JOINABLE,
                           this->desired_threads ());
  }
  // Listing 100
 
  // Listing 101 code/ch18
  int put (ACE_Message_Block *message,
           ACE_Time_Value *timeout)
  {
    return this->putq (message, timeout);
  }
  // Listing 101
 
  // Listing 1020 code/ch18
  virtual int svc (void)
  {
    for (ACE_Message_Block *message = 0; ; )
      {
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("BasicTask::svc() - ")
                    ACE_TEXT ("waiting for work\n" )));
 
        if (this->getq (message) == -1)
          ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                             ACE_TEXT ("getq")),
                            -1);
        // Listing 1020
 
        // Listing 1021 code/ch18
        if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
          {
            if (this->putq (message) == -1)
              {
                ACE_ERROR ((LM_ERROR,
                            ACE_TEXT ("%p\n"),
                            ACE_TEXT ("Task::svc() putq")));
                message->release ();
              }
            break;
          }
        // Listing 1021
 
        // Listing 1022 code/ch18
        Message *recordedMessage =
          (Message *)message->rd_ptr ();
 
        if (this->process (recordedMessage) == -1)
          {
            message->release ();
            ACE_ERROR_RETURN ((LM_ERROR,
                               ACE_TEXT ("%p\n"),
                               ACE_TEXT ("process")),
                              -1);
          }
        // Listing 1022
 
        // Listing 1023 code/ch18
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("BasicTask::svc() - ")
                    ACE_TEXT ("Continue to next stage\n" )));
        if (this->next_step (message) < 0)
          {
            ACE_ERROR ((LM_ERROR,
                        ACE_TEXT ("%p\n"),
                        ACE_TEXT ("put_next failed")));
            message->release ();
            break;
          }
        // Listing 1023
      }
 
    return 0;
  }
 
  // Listing 103 code/ch18
  virtual int close (u_long flags)
  {
    int rval = 0;
 
    if (flags == 1)
      {
        ACE_Message_Block *hangup = new ACE_Message_Block ();
        hangup->msg_type (ACE_Message_Block::MB_HANGUP);
        if (this->putq (hangup) == -1)
          {
            hangup->release ();
            ACE_ERROR_RETURN ((LM_ERROR,
                               ACE_TEXT ("%p\n"),
                               ACE_TEXT ("Task::close() putq")),
                              -1);
          }
 
        rval = this->wait ();
      }
 
    return rval;
  }
  // Listing 103
 
  // Listing 105 code/ch18
protected:
  virtual int next_step (ACE_Message_Block *message_block)
  {
    return this->put_next (message_block);
  }
  // Listing 105
 
  // Listing 104 code/ch18
  virtual int process (Message *message) = 0;
 
  virtual int desired_threads (void)
  {
    return 1;
  }
};
// Listing 104
 
#endif /* BASIC_TASK_H */

Command.h

/* -*- C++ -*- */
// $Id: Command.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef COMMAND_H
#define COMMAND_H
 
#include "ace/SString.h"
#include "ace/Message_Block.h"
 
// Listing 01 code/ch18
class Command : public ACE_Data_Block
{
public:
  // Result Values
  enum {
    RESULT_PASS    = 1,
    RESULT_SUCCESS = 0,
    RESULT_FAILURE = -1
  };
 
  // Commands
  enum {
    CMD_UNKNOWN            = -1,
    CMD_ANSWER_CALL        = 10,
    CMD_RETRIEVE_CALLER_ID,
    CMD_PLAY_MESSAGE,
    CMD_RECORD_MESSAGE
  } commands;
 
  int flags_;
  int command_;
 
  void *extra_data_;
 
  int numeric_result_;
  ACE_TString result_;
};
// Listing 01
 
#endif /* COMMAND_H */

CommandModule.cpp

// $Id: CommandModule.cpp 80826 2008-03-04 14:51:23Z wotte $
 
#include "CommandModule.h"
 
// Listing 01 code/ch18
CommandModule::CommandModule (const ACE_TCHAR *module_name,
                              CommandTask *writer,
                              CommandTask *reader,
                              ACE_SOCK_Stream *peer)
  : inherited(module_name, writer, reader, peer)
{ }
// Listing 01
 
// Listing 02 code/ch18
ACE_SOCK_Stream &CommandModule::peer (void)
{
  ACE_SOCK_Stream *peer = (ACE_SOCK_Stream *)this->arg ();
  return *peer;
}
// Listing 02

CommandModule.h

/* -*- C++ -*- */
// $Id: CommandModule.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef COMMAND_MODULE_H
#define COMMAND_MODULE_H
 
#include "ace/Module.h"
#include "ace/SOCK_Stream.h"
#include "CommandTask.h"
 
// Listing 01 code/ch18
class CommandModule : public ACE_Module<ACE_MT_SYNCH>
{
public:
  typedef ACE_Module<ACE_MT_SYNCH> inherited;
  typedef ACE_Task<ACE_MT_SYNCH> Task;
 
  CommandModule (const ACE_TCHAR *module_name,
                   CommandTask *writer,
                   CommandTask *reader,
                   ACE_SOCK_Stream *peer);
 
  ACE_SOCK_Stream &peer (void);
};
// Listing 01
 
#endif /* COMMAND_MODULE_H */

CommandStream.cpp

// $Id: CommandStream.cpp 94310 2011-07-09 19:10:06Z schmidt $
 
#include "ace/Log_Msg.h"
#include "ace/OS_Memory.h"
#include "CommandStream.h"
#include "Command.h"
#include "CommandModule.h"
#include "CommandTasks.h"
 
// Gotcha: superclass' open() won't open head/tail modules
// Gotcha!! Must open the stream before pushing modules!
 
// Listing 01 code/ch18
int CommandStream::open (void *arg,
                         ACE_Module<ACE_MT_SYNCH> *head,
                         ACE_Module<ACE_MT_SYNCH> *tail)
{
  ACE_TRACE ("CommandStream::open(peer)");
 
  if (this->inherited::open (arg, head, tail) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                       ACE_TEXT ("Failed to open superclass")),
                      -1);
  // Listing 01
 
  // Listing 02 code/ch18
  CommandModule *answerCallModule;
  ACE_NEW_RETURN (answerCallModule,
                  AnswerCallModule (this->peer_),
                  -1);
 
  CommandModule *retrieveCallerIdModule;
  ACE_NEW_RETURN (retrieveCallerIdModule,
                  RetrieveCallerIdModule (this->peer_),
                  -1);
 
  CommandModule *playMessageModule;
  ACE_NEW_RETURN (playMessageModule,
                  PlayMessageModule (this->peer_),
                  -1);
 
  CommandModule *recordMessageModule;
  ACE_NEW_RETURN (recordMessageModule,
                  RecordMessageModule (this->peer_),
                  -1);
  // Listing 02
 
  // Listing 03 code/ch18
  if (this->push (answerCallModule) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("Failed to push %p\n"),
                       answerCallModule->name()),
                      -1);
 
  if (this->push (retrieveCallerIdModule) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("Failed to push %p\n"),
                       retrieveCallerIdModule->name()),
                      -1);
 
  if (this->push (playMessageModule) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("Failed to push %p\n"),
                       playMessageModule->name()),
                      -1);
 
  if (this->push (recordMessageModule) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("Failed to push %p\n"),
                       recordMessageModule->name()),
                      -1);
  // Listing 03
  return 0;
}
 
// Listing 04 code/ch18
Command *CommandStream::execute (Command *command)
{
  ACE_Message_Block *mb = 0;
  ACE_NEW_RETURN (mb, ACE_Message_Block (command), 0);
  if (this->put (mb) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("Fail on put command %d: %p\n"),
                       command->command_,
                       ACE_TEXT ("")),
                      0);
 
  this->get (mb);
  command = (Command *)mb->data_block ();
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Command (%d) returns (%d)\n"),
              command->command_,
              command->numeric_result_));
 
  return command;
}
// Listing 04

CommandStream.h

/* -*- C++ -*- */
// $Id: CommandStream.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef COMMAND_STREAM_H
#define COMMAND_STREAM_H
 
#include "ace/Module.h"
#include "ace/Stream.h"
#include "ace/SOCK_Stream.h"
#include "ace/Synch_Traits.h"
 
#include "Command.h"
 
// A CommandStream is a bidirectional ACE_Stream implementing a chain
// of commands. A message will move down the stream until a
// CommandModule is capable of processing it. After processing, it
// will move on down the stream to the end. Data received from the
// tail will likewise move up the stream until the downstream's
// partner module is encoutered. The retrieved data will be processed
// and continue on up the stream.
 
// Listing 01 code/ch18
class CommandStream : public ACE_Stream<ACE_MT_SYNCH>
{
public:
  typedef ACE_Stream<ACE_MT_SYNCH> inherited;
 
  CommandStream (ACE_SOCK_Stream *peer)
    : inherited (), peer_(peer) { }
 
  virtual int open (void *arg,
                    ACE_Module<ACE_MT_SYNCH> *head = 0,
                    ACE_Module<ACE_MT_SYNCH> *tail = 0);
 
  Command *execute (Command *command);
 
private:
  CommandStream () { }
 
  ACE_SOCK_Stream *peer_;
};
// Listing 01
 
#endif /* COMMAND_STREAM_H */

CommandTask.cpp

// $Id: CommandTask.cpp 94310 2011-07-09 19:10:06Z schmidt $
 
#include "CommandTask.h"
 
// Listing 01 code/ch18
CommandTask::CommandTask (int command)
  : inherited (), command_(command)
{ }
// Listing 01
 
// Listing 02 code/ch18
int CommandTask::open (void *)
{
  return this->activate ();
}
// Listing 02
 
// Listing 03 code/ch18
int CommandTask::put (ACE_Message_Block *message,
                      ACE_Time_Value *timeout)
{
  return this->putq (message, timeout);
}
// Listing 03
 
// Listing 04 code/ch18
int CommandTask::process (Command *)
{
  ACE_TRACE ("CommandTask::process()");
  return Command::RESULT_FAILURE;
}
// Listing 04
 
// Listing 05 code/ch18
int CommandTask::close (u_long flags)
{
  int rval = 0;
  if (flags == 1)
    {
      ACE_Message_Block *hangup = new ACE_Message_Block;
      hangup->msg_type (ACE_Message_Block::MB_HANGUP);
      if (this->putq (hangup->duplicate ()) == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("%p\n"),
                             ACE_TEXT ("Task::close() putq")),
                            -1);
        }
 
      hangup->release ();
      rval = this->wait ();
    }
 
  return rval;
}
// Listing 05
 
// Listing 06 code/ch18
// Listing 061 code/ch18
int CommandTask::svc (void)
{
  ACE_Message_Block *message;
 
  for (;;)
    {
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("CommandTask::svc() - ")
                  ACE_TEXT ("%s waiting for work\n"),
                  this->module ()->name ()));
 
      if (this->getq (message) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           ACE_TEXT ("%p\n"),
                           ACE_TEXT ("getq")),
                          -1);
 
      if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
        {
          if (this->putq (message->duplicate ()) == -1)
            {
              ACE_ERROR_RETURN ((LM_ERROR,
                                 ACE_TEXT ("%p\n"),
                                 ACE_TEXT ("Task::svc() putq")),
                                -1);
            }
 
          message->release ();
          break;
        }
      // Listing 061
 
      // Listing 062 code/ch18
      Command *command = (Command *)message->data_block ();
 
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("CommandTask::svc() - ")
                  ACE_TEXT ("%s got work request %d\n"),
                  this->module ()->name (),
                  command->command_));
 
      if (command->command_ != this->command_)
        {
          this->put_next (message->duplicate ());
        }
      // Listing 062
      // Listing 063 code/ch18
      else
        {
          int result = this->process (command);
          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("CommandTask::svc() - ")
                      ACE_TEXT ("%s work request %d result is %d\n"),
                      this->module ()->name (),
                      command->command_,
                      result));
 
          if (result == Command::RESULT_FAILURE)
            {
              command->numeric_result_ = -1;
            }
          // Listing 063
          // Listing 064 code/ch18
          else if (result == Command::RESULT_PASS)
            {
              this->put_next (message->duplicate ());
            }
          // Listing 064
          // Listing 065 code/ch18
          else // result == Command::RESULT_SUCCESS
            {
              if (this->is_writer ())
                {
                  this->sibling ()->putq
                    (message->duplicate ());
                }
              // Listing 065
              // Listing 066 code/ch18
              else // this->is_reader ()
                {
                  this->put_next (message->duplicate ());
                }
              // Listing 066
            } // result == ...
        }     // command->command_ ? = this->command_
 
      // Listing 067 code/ch18
      message->release ();
    }   // for (;;)
 
  return 0;
}
// Listing 067
// Listing 06

CommandTask.h

/* -*- C++ -*- */
// $Id: CommandTask.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef COMMAND_TASK_H
#define COMMAND_TASK_H
 
#include "ace/Task.h"
#include "ace/Module.h"
 
#include "Command.h"
 
// Listing 01 code/ch18
class CommandTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
  typedef ACE_Task<ACE_MT_SYNCH> inherited;
 
  virtual ~CommandTask () { }
 
  virtual int open (void * = 0 );
 
  int put (ACE_Message_Block *message,
           ACE_Time_Value *timeout);
 
  virtual int svc (void);
 
  virtual int close (u_long flags);
 
protected:
  CommandTask (int command);
 
  virtual int process (Command *message);
 
  int command_;
};
// Listing 01
 
 
#endif /* COMMAND_TASK_H */

CommandTasks.cpp

// $Id: CommandTasks.cpp 80826 2008-03-04 14:51:23Z wotte $
 
#include "ace/FILE_Addr.h"
#include "ace/FILE_Connector.h"
#include "ace/FILE_IO.h"
 
#include "Command.h"
#include "CommandTasks.h"
#include "RecordingDevice_Text.h"
 
// Listing 011 code/ch18
AnswerCallModule::AnswerCallModule (ACE_SOCK_Stream *peer)
  : CommandModule (ACE_TEXT ("AnswerCall Module"),
                   new AnswerCallDownstreamTask (),
                   new AnswerCallUpstreamTask (),
                   peer)
{ }
// Listing 011
// Listing 012 code/ch18
AnswerCallDownstreamTask::AnswerCallDownstreamTask (void)
  : CommandTask(Command::CMD_ANSWER_CALL)
{ }
// Listing 012
// Listing 013 code/ch18
int AnswerCallDownstreamTask::process (Command *command)
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (downstream)\n")));
 
  TextListenerAcceptor *acceptor =
    (TextListenerAcceptor *)command->extra_data_;
 
  CommandModule *module =
    (CommandModule*)this->module ();
 
  command->numeric_result_ =
    acceptor->accept (module->peer ());
 
  acceptor->release ();
  return Command::RESULT_SUCCESS;
}
// Listing 013
// Listing 014 code/ch18
AnswerCallUpstreamTask::AnswerCallUpstreamTask (void)
  : CommandTask(Command::CMD_ANSWER_CALL)
{ }
// Listing 014
// Listing 015 code/ch18
int AnswerCallUpstreamTask::process (Command *)
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (upstream)\n")));
 
  return Command::RESULT_SUCCESS;
}
// Listing 015
 
// Listing 021 code/ch18
RetrieveCallerIdModule::RetrieveCallerIdModule
  (ACE_SOCK_Stream *peer)
    : CommandModule (ACE_TEXT ("RetrieveCallerId Module"),
                     new RetrieveCallerIdDownstreamTask (),
                     new RetrieveCallerIdUpstreamTask (),
                     peer)
{ }
// Listing 021
// Listing 022 code/ch18
RetrieveCallerIdDownstreamTask::RetrieveCallerIdDownstreamTask
  (void)
    : CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
{ }
 
int RetrieveCallerIdDownstreamTask::process (Command *)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Retrieving Caller ID data\n")));
 
  return Command::RESULT_SUCCESS;
}
// Listing 022
// Listing 023 code/ch18
RetrieveCallerIdUpstreamTask::RetrieveCallerIdUpstreamTask
  (void)
    : CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
{ }
 
int RetrieveCallerIdUpstreamTask::process (Command *command)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Returning Caller ID data\n")));
 
  ACE_INET_Addr remote_addr;
 
  CommandModule *module =
    (CommandModule*)this->module ();
 
  module->peer ().get_remote_addr (remote_addr);
  ACE_TCHAR remote_addr_str[256];
  remote_addr.addr_to_string (remote_addr_str, 256);
  command->result_ = ACE_TString (remote_addr_str);
 
  return Command::RESULT_SUCCESS;
}
// Listing 023
 
PlayMessageModule::PlayMessageModule (ACE_SOCK_Stream *peer)
  : CommandModule (ACE_TEXT ("PlayMessage Module"),
                   new PlayMessageDownstreamTask (),
                   new PlayMessageUpstreamTask (),
                   peer)
{ }
 
PlayMessageDownstreamTask::PlayMessageDownstreamTask (void)
  : CommandTask(Command::CMD_PLAY_MESSAGE)
{ }
// Listing 032 code/ch18
int PlayMessageDownstreamTask::process (Command *command)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Play Outgoing Message\n")));
 
  ACE_FILE_Connector connector;
  ACE_FILE_IO file;
 
  ACE_FILE_Addr *addr =
    (ACE_FILE_Addr *)command->extra_data_;
 
  if (connector.connect (file, *addr) == -1)
    {
      command->numeric_result_ = -1;
    }
  else
    {
      command->numeric_result_ = 0;
 
      CommandModule *module =
        (CommandModule*)this->module ();
 
      char rwbuf[512];
      ssize_t rwbytes;
      while ((rwbytes = file.recv (rwbuf, 512)) > 0)
        {
          module->peer ().send_n (rwbuf, rwbytes);
        }
    }
 
  return Command::RESULT_SUCCESS;
}
// Listing 032
PlayMessageUpstreamTask::PlayMessageUpstreamTask (void)
  : CommandTask(Command::CMD_PLAY_MESSAGE)
{ }
 
int PlayMessageUpstreamTask::process (Command *command)
{
  ACE_FILE_Addr * addr =
    (ACE_FILE_Addr *)command->extra_data_;
 
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Outgoing message (%s) sent\n"),
              addr->get_path_name ()));
 
  return Command::RESULT_SUCCESS;
}
 
RecordMessageModule::RecordMessageModule (ACE_SOCK_Stream *peer)
  : CommandModule (ACE_TEXT ("RecordMessage Module"),
                   new RecordMessageDownstreamTask (),
                   new RecordMessageUpstreamTask (),
                   peer)
{ }
 
RecordMessageDownstreamTask::RecordMessageDownstreamTask (void)
  : CommandTask(Command::CMD_RECORD_MESSAGE)
{ }
 
int RecordMessageDownstreamTask::process (Command *)
{
  return Command::RESULT_SUCCESS;
}
 
RecordMessageUpstreamTask::RecordMessageUpstreamTask (void)
  : CommandTask(Command::CMD_RECORD_MESSAGE)
{ }
// Listing 033 code/ch18
int RecordMessageUpstreamTask::process (Command *command)
{
  // Collect whatever the peer sends and write into the
  // specified file.
  ACE_FILE_Connector connector;
  ACE_FILE_IO file;
 
  ACE_FILE_Addr *addr =
    (ACE_FILE_Addr *)command->extra_data_;
 
  if (connector.connect (file, *addr) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("create file")),
                      Command::RESULT_FAILURE);
  file.truncate (0);
 
  CommandModule *module =
    (CommandModule*)this->module ();
 
  ssize_t total_bytes = 0;
  char rwbuf[512];
  ssize_t rwbytes;
  while ((rwbytes = module->peer ().recv (rwbuf, 512)) > 0)
    {
      total_bytes += file.send_n (rwbuf, rwbytes);
    }
 
  file.close ();
 
  ACE_DEBUG ((LM_INFO,
              ACE_TEXT ("RecordMessageUpstreamTask ")
              ACE_TEXT ("- recorded %d byte message\n"),
              total_bytes));
 
  return Command::RESULT_SUCCESS;
}
// Listing 033

CommandTasks.h

/* -*- C++ -*- */
// $Id: CommandTasks.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef COMMAND_TASKS_H
#define COMMAND_TASKS_H
 
#include "ace/SOCK_Stream.h"
 
#include "Command.h"
#include "CommandTask.h"
#include "CommandModule.h"
 
// CommandModule and CommandTask objects that implement the command
// stream functions.
 
// Listing 011 code/ch18
class AnswerCallModule : public CommandModule
{
public:
  AnswerCallModule (ACE_SOCK_Stream * peer);
};
// Listing 011
// Listing 012 code/ch18
class AnswerCallDownstreamTask : public CommandTask
{
public:
  AnswerCallDownstreamTask ();
protected:
  virtual int process (Command *command);
};
// Listing 012
// Listing 013 code/ch18
class AnswerCallUpstreamTask : public CommandTask
{
public:
  AnswerCallUpstreamTask ();
protected:
  virtual int process (Command *command);
};
// Listing 013
 
// Listing 02 code/ch18
class RetrieveCallerIdModule : public CommandModule
{
public:
  RetrieveCallerIdModule (ACE_SOCK_Stream *peer);
};
class RetrieveCallerIdDownstreamTask : public CommandTask
{
public:
  RetrieveCallerIdDownstreamTask ();
protected:
  virtual int process (Command *command);
};
class RetrieveCallerIdUpstreamTask : public CommandTask
{
public:
  RetrieveCallerIdUpstreamTask ();
protected:
  virtual int process (Command *command);
};
// Listing 02
 
// Listing 03 code/ch18
class PlayMessageModule : public CommandModule
{
public:
  PlayMessageModule (ACE_SOCK_Stream *peer);
};
class PlayMessageDownstreamTask : public CommandTask
{
public:
  PlayMessageDownstreamTask ();
protected:
  virtual int process (Command *command);
};
class PlayMessageUpstreamTask : public CommandTask
{
public:
  PlayMessageUpstreamTask ();
protected:
  virtual int process (Command *command);
};
// Listing 03
 
// Listing 04 code/ch18
class RecordMessageModule : public CommandModule
{
public:
  RecordMessageModule (ACE_SOCK_Stream *peer);
};
class RecordMessageDownstreamTask : public CommandTask
{
public:
  RecordMessageDownstreamTask ();
protected:
  virtual int process (Command *command);
};
class RecordMessageUpstreamTask : public CommandTask
{
public:
  RecordMessageUpstreamTask ();
protected:
  virtual int process (Command *command);
};
// Listing 04
 
#endif /* COMMAND_TASKS_H */

EndTask.h

/* -*- C++ -*- */
// $Id: EndTask.h 94310 2011-07-09 19:10:06Z schmidt $
 
#ifndef END_TASK_H
#define END_TASK_H
 
// Listing 1 code/ch18
class TheEndTask : public BasicTask {
protected:
  virtual int process (Message *)
  {
    ACE_TRACE ("EndTask::process()");
    return 0;
  }
 
  virtual int next_step (ACE_Message_Block *mb)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("TheEndTask::next_step() - ")
                ACE_TEXT ("end of the line.\n")));
    mb->release ();
    return 0;
  }
};
// Listing 1
 
#endif /* END_TASK_H */

Message.h

/* -*- C++ -*- */
// $Id: Message.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef MESSAGE_H
#define MESSAGE_H
 
class RecordingDevice;
 
class Message
{
public:
  Message () : device_(0), type_(0), id_(0)
  { }
 
  ~Message ()
  { }
 
  RecordingDevice *recorder (void)
  {
    return this->device_;
  }
 
  void recorder (RecordingDevice *device)
  {
    this->device_ = device;
  }
 
  void type (MessageType *type)
  {
    this->type_ = type;
  }
 
  MessageType *type (void)
  {
    return this->type_;
  }
 
  void caller_id (CallerId *id)
  {
    this->id_ = id;
  }
 
  CallerId *caller_id (void)
  {
    return this->id_;
  }
 
  void addr (ACE_FILE_Addr &addr)
  {
    this->addr_ = addr;
  }
 
  void incoming_message (ACE_FILE_Addr &addr, MessageType *type)
  {
    this->addr_ = addr;
    this->type_ = type;
  }
 
  ACE_FILE_Addr &addr (void)
  {
    return this->addr_;
  }
 
  int is_text (void)
  {
    return this->type_->is_text ();
  }
 
  int is_audio (void)
  {
    return this->type_->is_audio ();
  }
 
  int is_video (void)
  {
    return this->type_->is_video ();
  }
 
private:
  RecordingDevice *device_;
  MessageType *type_;
  CallerId *id_;
  ACE_FILE_Addr addr_;
};
 
class AudioMessage : public Message
{ };
 
class VideoMessage : public Message
{ };
 
#endif /* MESSAGE_H */

MessageInfo.h

/* -*- C++ -*- */
// $Id: MessageInfo.h 94075 2011-05-23 06:57:42Z johnnyw $
 
#ifndef MESSAGE_INFO_H
#define MESSAGE_INFO_H
 
#include "ace/FILE_Addr.h"
#include "ace/SString.h"
 
/* Opaque class that represents a caller's ID */
class CallerId
{
public:
  CallerId () : id_ (ACE_TEXT ("UNKNOWN"))
  { }
 
  CallerId (ACE_TString id) : id_(id)
  { }
 
  const ACE_TCHAR * string(void)
  {
    return this->id_.c_str ();
  }
 
private:
  ACE_TString id_;
};
 
class MessageType
{
public:
  enum {
    // Known video codecs
    FIRST_VIDEO_CODEC = 1,
 
    DIVX,
    // ...
    LAST_VIDEO_CODEC,
 
    // Known audio codecs
    FIRST_AUDIO_CODEC,
 
    MP3,
    RAWPCM,
    // ...
    LAST_AUDIO_CODEC,
 
    // Known text codecs
    FIRST_TEXT_CODEC,
 
    RAWTEXT,
    XML,
 
    // ...
    LAST_TEXT_CODEC,
 
    LAST_CODEC
  };
 
  MessageType (int codec, const ACE_FILE_Addr& addr)
    : codec_(codec), addr_(addr)
  { }
 
  int get_codec (void)
  {
    return this->codec_;
  }
 
  ACE_FILE_Addr &get_addr (void)
  {
    return this->addr_;
  }
 
  int is_video (void)
  {
    return
      this->get_codec () > FIRST_VIDEO_CODEC &&
      this->get_codec () < LAST_VIDEO_CODEC;
  }
 
  int is_audio (void)
  {
    return
      this->get_codec () > FIRST_AUDIO_CODEC &&
      this->get_codec () < LAST_AUDIO_CODEC ;
  }
 
  int is_text (void)
  {
    return
      this->get_codec () > FIRST_TEXT_CODEC &&
      this->get_codec () < LAST_TEXT_CODEC ;
  }
 
private:
  int codec_;
  ACE_FILE_Addr addr_;
};
 
# endif /* MESSAGE_INFO_H */

RecordingDevice.h

/* -*- C++ -*- */
// $Id: RecordingDevice.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef RECORDING_DEVICE_H
#define RECORDING_DEVICE_H
 
#include "ace/FILE_Addr.h"
#include "ace/Event_Handler.h"
#include "ace/Log_Msg.h"
#include "ace/Reactor.h"
#include "ace/Semaphore.h"
 
class CallerId;
class MessageType;
 
class RecordingDevice
{
public:
  RecordingDevice ()
  {
    // Initialize the semaphore so that we don't block on the
    // first call to wait_for_activity().
  }
 
  virtual ~RecordingDevice ()
  {
  }
 
  virtual const ACE_TCHAR *get_name (void) const
  {
    return ACE_TEXT ("UNKNOWN");
  }
 
  virtual int init (int, ACE_TCHAR *[])
  {
    return 0;
  }
 
  // Answer the incoming call
  virtual int answer_call (void) = 0;
 
  // Fetch some form of caller identification at the hardware level.
  virtual CallerId *retrieve_callerId (void) = 0;
 
  // Fetch the message at the location specified by 'addr' and play
  // it for the caller.
  virtual int play_message (ACE_FILE_Addr &addr) = 0;
 
  // Record data from our physical device into the file at the
  // specified address. Return the number of bytes recorded.
  virtual MessageType *record_message (ACE_FILE_Addr &addr) = 0;
 
  // Release the RecordingDevice to accept another incoming call
  virtual void release (void)
  {
    this->release_semaphore ();
  }
 
  // Get the handler of the device so that wait_for_activity() can
  // wait for data to arrive.
  virtual ACE_Event_Handler *get_handler (void) const
  {
    return 0;
  }
 
  virtual RecordingDevice *wait_for_activity (void)
  {
    // Block on a semaphore until it says we're ready to do
    // work.
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("Waiting for semaphore\n")));
    this->acquire_semaphore ();
 
    // Use the reactor to wait for activity on our handle
    ACE_Reactor reactor;
 
    ACE_Event_Handler *handler = this->get_handler ();
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("Handler is %@\n"),
                (void *)handler));
 
    reactor.register_handler (this->get_handler (),
                              ACE_Event_Handler::READ_MASK);
 
    reactor.handle_events ();
    // Error-check this...
 
    // Leave the semaphore locked so that we'll block until
    // recording_complete() is invoked.
 
    return this;
  }
 
protected:
  void acquire_semaphore (void)
  {
    this->semaphore_.acquire ();
  }
 
  void release_semaphore (void)
  {
    // Reset the semaphore so that wait_for_activity() will
    // unblock.
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("Releasing semaphore\n")));
    this->semaphore_.release ();
  }
 
private:
  ACE_Semaphore semaphore_;
};
 
#include "RecordingDevice_Text.h"
#include "RecordingDevice_USRVM.h"
#include "RecordingDevice_QC.h"
 
#include "RecordingDeviceFactory.h"
 
#endif /* RECORDING_DEVICE_H */

RecordingDeviceFactory.cpp

// $Id: RecordingDeviceFactory.cpp 91813 2010-09-17 07:52:52Z johnnyw $
 
#include "RecordingDevice.h"
#include "RecordingDeviceFactory.h"
#include "RecordingDevice_Text.h"
 
RecordingDevice *RecordingDeviceFactory::instantiate (int argc,
                                                      ACE_TCHAR *argv[])
{
  RecordingDevice * device = 0;
 
  // Determine the implementation based on the values of argv
  // Exclude 2
  device = new TextListenerAcceptor ();
  // Exclude 2
 
  // Initialize the device with the remaining parameters.
  if (device->init (argc, argv) < 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("RecordingDeviceFactory::instantiate() - ")
                       ACE_TEXT ("%s->init(argc, argv)"),
                       device->get_name()),
                      0);
  return device;
}

RecordingDeviceFactory.h

/* -*- C++ -*- */
// $Id: RecordingDeviceFactory.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef RECORDING_DEVICE_FACTORY_H
#define RECORDING_DEVICE_FACTORY_H
 
class RecordingDevice;
 
/*
 * A factory class that creates an appropriate RecordingDevice
 * derivative based on command-line parameters.
 */
class RecordingDeviceFactory
{
public:
 
  // Instantiate the appropriate RecordingDevice implementation
  static RecordingDevice *instantiate (int argc, ACE_TCHAR *argv[]);
};
 
#endif /* RECORDING_DEVICE_FACTORY_H */

RecordingDevice_QC.h

// $Id: RecordingDevice_QC.h 80826 2008-03-04 14:51:23Z wotte $
 
class QuickCam : public RecordingDevice
  {
  };

RecordingDevice_Text.cpp

/*
 * $Id: RecordingDevice_Text.cpp 80826 2008-03-04 14:51:23Z wotte $
 *
 * A RecordingDevice that listens to a socket and collects text.
 */
 
#include "MessageInfo.h"
#include "RecordingDevice.h"
#include "RecordingDevice_Text.h"
#include "Util.h"
 
TextListenerAcceptor::TextListenerAcceptor (void)
  : ACE_Event_Handler(), RecordingDevice()
{ }
 
// ACE_Event_Handler interface
 
int TextListenerAcceptor::open (ACE_INET_Addr &addr)
{
  if (this->acceptor_.open (addr, 1) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("acceptor open")),
                      -1);
  return 0;
}
 
ACE_HANDLE TextListenerAcceptor::get_handle (void) const
{
  return this->acceptor_.get_handle ();
}
 
int TextListenerAcceptor::handle_input (ACE_HANDLE)
{
  ACE_DEBUG ((LM_INFO,
              ACE_TEXT ("TextListenerAcceptor - connection request\n" )));
  return 0;
}
 
int TextListenerAcceptor::accept (ACE_SOCK_Stream &peer)
{
  return this->acceptor_.accept (peer);
}
 
// RecordingDevice interface
 
// Open a listening socket on the port specified by argv.
int TextListenerAcceptor::init (int argc, ACE_TCHAR *argv[])
{
  ACE_UNUSED_ARG(argc);
 
  ACE_INET_Addr addr (argv[1]);
 
  if (this->open (addr) < 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("TextListener - open")),
                      -1);
  return 0;
}
 
ACE_Event_Handler *TextListenerAcceptor::get_handler (void) const
{
  return (ACE_Event_Handler *)this;
}
 
RecordingDevice *TextListenerAcceptor::wait_for_activity (void)
{
  this->RecordingDevice::wait_for_activity ();
  return new TextListener (this);
}
 
int TextListenerAcceptor::answer_call (void)
{
  return -1;
}
 
CallerId *TextListenerAcceptor::retrieve_callerId (void)
{
  return 0;
}
 
int TextListenerAcceptor::play_message (ACE_FILE_Addr &addr)
{
  ACE_UNUSED_ARG(addr);
  return 0;
}
 
MessageType *TextListenerAcceptor::record_message (ACE_FILE_Addr &addr)
{
  ACE_UNUSED_ARG(addr);
  return 0;
}
 
 
// Listing 01 code/ch18
TextListener::TextListener (TextListenerAcceptor *acceptor)
  : acceptor_(acceptor)
{
  ACE_TRACE ("TextListener ctor");
 
  ACE_NEW (this->command_stream_, CommandStream (&(this->peer_)));
  this->command_stream_->open (0);
}
// Listing 01
 
const ACE_TCHAR *TextListener::get_name (void) const
{
  return ACE_TEXT ("TextListener");
}
 
// Listing 02 code/ch18
int TextListener::answer_call (void)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("TextListener::answer_call()\n")));
 
  Command *c = new Command ();
  c->command_ = Command::CMD_ANSWER_CALL;
  c->extra_data_ = this->acceptor_;
 
  c = this->command_stream_->execute (c);
 
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("TextListener::answer_call() ")
              ACE_TEXT ("result is %d\n"),
              c->numeric_result_));
 
  return c->numeric_result_;
}
// Listing 02
 
// Listing 03 code/ch18
CallerId *TextListener::retrieve_callerId (void)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("TextListener::retrieve_callerId()\n")));
 
  Command *c = new Command ();
  c->command_ = Command::CMD_RETRIEVE_CALLER_ID;
 
  c = this->command_stream_->execute (c);
 
  CallerId *caller_id = new CallerId (c->result_);
  return caller_id;
}
// Listing 03
 
// Listing 04 code/ch18
int TextListener::play_message (ACE_FILE_Addr &addr)
{
  MessageType *type = Util::identify_message (addr);
  if (type->is_text ())
    {
      Command *c = new Command ();
      c->command_ = Command::CMD_PLAY_MESSAGE;
      c->extra_data_ = &addr;
      c = this->command_stream_->execute (c);
      return c->numeric_result_;
    }
 
  ACE_FILE_Addr temp (ACE_TEXT ("/tmp/outgoing_message.text"));
  ACE_FILE_IO *file;
  if (type->is_audio ())
    file = Util::audio_to_text (addr, temp);
  else if (type->is_video ())
    file = Util::video_to_text (addr, temp);
  else
    ACE_ERROR_RETURN
      ((LM_ERROR, ACE_TEXT ("Invalid message type %d\n"),
        type->get_codec ()), -1);
  int rval = this->play_message (temp);
  file->remove ();
  return rval;
}
// Listing 04
 
// Listing 05 code/ch18
MessageType *TextListener::record_message (ACE_FILE_Addr &addr)
{
  Command *c = new Command ();
  c->command_ = Command::CMD_RECORD_MESSAGE;
  c->extra_data_ = &addr;
  c = this->command_stream_->execute (c);
  if (c->numeric_result_ == -1)
    return 0;
 
  return new MessageType (MessageType::RAWTEXT, addr);
}
// Listing 05
 
// Listing 06 code/ch18
void TextListener::release (void)
{
  delete this;
}
// Listing 06

RecordingDevice_Text.h

/* -*- C++ -*- */
/*
 * $Id: RecordingDevice_Text.h 80826 2008-03-04 14:51:23Z wotte $
 *
 * A RecordingDevice that listens to a socket and collects text.
 */
 
#ifndef RECORDING_DEVICE_TEXT_H
#define RECORDING_DEVICE_TEXT_H
 
#include "ace/FILE_IO.h"
#include "ace/FILE_Connector.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Acceptor.h"
 
#include "CommandStream.h"
#include "MessageInfo.h"
#include "RecordingDevice.h"
 
class TextListenerAcceptor :
      public ACE_Event_Handler,
      public RecordingDevice
{
public:
  TextListenerAcceptor ();
 
  // ACE_Event_Handler interface
 
  int open (ACE_INET_Addr &addr);
 
  ACE_HANDLE get_handle (void) const;
 
  int handle_input (ACE_HANDLE);
 
  int accept (ACE_SOCK_Stream &peer);
 
  // RecordingDevice interface
 
  // Open a listening socket on the port specified by argv.
  int init (int argc, ACE_TCHAR *argv[]);
 
  ACE_Event_Handler *get_handler (void) const;
 
  virtual RecordingDevice *wait_for_activity (void);
 
  virtual int answer_call (void);
 
  virtual CallerId *retrieve_callerId (void);
 
  virtual int play_message (ACE_FILE_Addr &addr);
 
  virtual MessageType *record_message (ACE_FILE_Addr &addr);
 
private:
  ACE_SOCK_Acceptor acceptor_;
};
 
// Listing 01 code/ch18
class TextListener : public RecordingDevice
{
public:
  TextListener (TextListenerAcceptor *acceptor);
 
  virtual const ACE_TCHAR *get_name (void) const;
 
  int answer_call (void);
 
  CallerId *retrieve_callerId (void);
 
  int play_message (ACE_FILE_Addr &addr);
 
  MessageType *record_message (ACE_FILE_Addr &addr);
 
  virtual void release (void);
  // Listing 01
  // Listing 02 code/ch18
private:
  CommandStream *command_stream_;
  TextListenerAcceptor *acceptor_;
  ACE_SOCK_Stream peer_;
};
// Listing 02
 
#endif /* RECORDING_DEVICE_TEXT_H */

RecordingDevice_USRVM.h

// $Id: RecordingDevice_USRVM.h 80826 2008-03-04 14:51:23Z wotte $
 
class USRoboticsVoiceModem : public RecordingDevice
  {
  };

Util.h

/* -*- C++ -*- */
// $Id: Util.h 80826 2008-03-04 14:51:23Z wotte $
 
#ifndef UTIL_H
#define UTIL_H
 
#include "ace/FILE_Addr.h"
#include "ace/FILE_Connector.h"
#include "ace/FILE_IO.h"
 
class Util
{
public:
  static MessageType *identify_message (ACE_FILE_Addr &src)
  {
    // Determine the contents of the specified file
    return new MessageType (MessageType::RAWTEXT, src);
  }
 
  static ACE_FILE_IO *audio_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest)
  {
    ACE_FILE_Connector connector;
    ACE_FILE_IO *file = 0;
    if (connector.connect (*file, dest) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("create file")),
                        0);
 
    // Convert audio data to printable text
 
    return file;
  }
 
  static ACE_FILE_IO *video_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest)
  {
    ACE_FILE_Connector connector;
    ACE_FILE_IO *file = 0;
    if (connector.connect (*file, dest) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("create file")),
                        0);
 
    // Extract audio data from video file and convert to printable text
    return file;
  }
 
  static int convert_to_unicode (ACE_FILE_Addr &src, ACE_FILE_Addr &dest)
  {
    ACE_FILE_Connector connector;
    ACE_FILE_IO input;
    if (connector.connect (input, src) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("read file")),
                        0);
    ACE_FILE_IO output;
    if (connector.connect (output, dest) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("create file")),
                        0);
 
    char rwbuf[512];
    ssize_t rwbytes;
    while ((rwbytes = input.recv (rwbuf, 512)) > 0)
      {
        output.send_n (rwbuf, rwbytes);
      }
 
    input.close ();
    output.close ();
 
    // Convert arbirary text to unicode
    return 0;
  }
 
  static int convert_to_mp3 (ACE_FILE_Addr &, ACE_FILE_Addr &)
  {
    // Convert arbitrary audio data to some standard mp3 format
    return 0;
  }
 
  static int convert_to_mpeg (ACE_FILE_Addr &, ACE_FILE_Addr &)
  {
    // Convert arbitrary vidio data to some standard mpeg format
    return 0;
  }
};
 
#endif /* UTIL_H */