The DejaVU Framework -- hush 3.0
[.] Papers Tutorials Examples Manuals Interfaces Sources Packages Resources ?

source: wrapper.c hush-3.0b4/auxiliary/net/thread


[.] - [up] [top] - index README make include source scripts
  // File: wrapper.c                        -*- C++ -*-
  // Author: Jacco van Ossenbruggen <jrvosse@cs.vu.nl>
  // Description:        see wrapper.h
  
  include <hush/source.h>
  
  include <net/thread/wrapper.h>
  
  include <errno.h>
  include <fcntl.h>
  include <signal.h>
  include <stdio.h>
  include <stdlib.h>
  include <string.h>
  include <stropts.h>
  include <time.h>
  include <unistd.h>
  include <wait.h>
  include <sys/types.h>
  
  include <hush/kit.h>
  include <hush/event.h>
  include <hush/assert.h>
  
  //#include <config.h>
  
  wrapper::wrapper(): 
  pid_(-1),                 
  status_(-1),         
  path_(NULL),         
  argv_(NULL),
  in(NULL),
  out(NULL),
  err(NULL),
  running_(0),
  delay_out_(1)
  {
      pipe_in_ [0] = -1; pipe_in_ [1] = -1;
      pipe_out_[0] = -1; pipe_out_[1] = -1;
      pipe_err_[0] = -1; pipe_err_[1] = -1;   
      err_buf[0] = 0;
      out_buf[0] = 0;
      
      delay_out();
  }
  
  wrapper::~wrapper() {
      if (1) {
          int w = waitpid(pid_, &status_, WNOHANG);
          if (w == 0) {
              close_pipes();
              kill(pid_, SIGKILL);
              wait(&w);                // Acknowlege death of child
          } else if (w == pid_ && status_ != 0){
              cerr << "Wrapped process " << path_ << " already stopped";
              cerr << " with status (oct) " << oct << status_ << endl;;
          }
          else if (w != pid_) {
              perror("wrapper");
              cerr << " wrapped proc: " << path_ << " probably died " << w << endl;
          }
      }
      delete in;
      delete out;
      delete err;   
      in = NULL; out = err = NULL;
      if (path_ )  delete[] path_; path_ = NULL;
      if (argv_ ) {
          int argC=0;
          while(argv_[argC]) delete[] argv_[argC++]; 
          delete[] argv_; argv_ = NULL;
      }
     
  }
  
  int wrapper::set_up_pipes()
  {
      // create pipes
      if (pipe(pipe_in_) || pipe(pipe_out_) || pipe(pipe_err_)) {
          cerr << "wrapper failed to create pipes to " << endl;;        
          perror(path_); return 0;
      }
      if (!delay_out(delay_out_)) {
          cerr << "Could not set delay/nodelay output mode for " << endl;
          cerr << path_ << endl;
          return 0;
      }
      
      if (in) delete in;                 in  = new ofstream(pipe_in_[1]);
      if (out) delete out;         out = new ifstream(pipe_out_[0]);
      if (err) delete err;           err = new ifstream(pipe_err_[0]);
      
      if ((!*in) || (!*out) || (!*err)) {
          cerr << "wrapper: Error in one of the three pipes" << endl;
          cerr << "of wrapper " << path_ << endl;
          return 0;
      } else {
          require(thekit());
          thekit()->bind(pipe_out_[0], this);
          thekit()->bind(pipe_err_[0], this);
          return 1;
      }
  }
  
  int wrapper::run(const char* const path, char* const * argV) 
  {
      save_args(path, argV);
      if (!set_up_pipes()) return 0;
      pid_ = fork();                // Fork to create process
      switch(pid_) {
        case -1:                        // error
          perror("wrapper: fork failed");
          cerr << "could not run " << path_ << endl;
          return 0;
        case  0:                // child
      {        close_pipes();        // Close pipe ends intended for parent
          
          int r1 = dup2(pipe_in_[0] ,0);
          int r2 = dup2(pipe_out_[1],1);
          int r3 = dup2(pipe_err_[1],2); 
          if ((r1 == -1) || (r2 == -1) || (r3 == -1)) {
              cerr << "wrapper child: "
                  "error in one of the three pipes..." << endl;
              cerr << "could not run " << path_ << endl;
              exit(-1);
          }
          
          if (-1 == execvp(path,argV)) {
              cerr << "wrapper: could not execvp: " << path_ << " ";
              for (int i=1; argV[i] != 0; i++) cerr << argV[i] << " ";
              perror(NULL); 
              _exit(-1);
          }
      }
        default:
          if (close(pipe_in_[0]) || 
              close(pipe_out_[1])|| 
              close(pipe_err_[1])) {
              cerr << "wrapper failed to close childs pipes" << endl;
              perror("path"); return 0;
          }; 
          int w=0;
          if ((w = waitpid(pid_, &status_, WNOHANG))) {
              perror(path_);
              close_pipes();
              return 0;
          }
          else {
              running_ = 1;
              return 1;
          }
      }
  }
  
  void wrapper::close_pipes() {
      require(thekit());
      thekit()->unbind(pipe_out_[0]);
      thekit()->unbind(pipe_err_[0]);
      if (close (pipe_in_[1]) || close (pipe_out_[0]) || close(pipe_err_[0])) {
          cerr << "Closing pipes failed" << endl;
          perror(path_);
      } else {
          out->close(); in->close(); err->close();        
      }
      pipe_in_[1] = pipe_out_[0] = pipe_err_[0] = -1;
  }
  
  int wrapper::running() const {
      int r = 0;
      if (!running_) r = 0;
      else if (waitpid(pid_, (int*) &status_, WNOHANG) == 0) r = 1;
      else r = 0;
      return (r);
  }
  
  int wrapper::sendsignal(int sig) {
      if (!running_) return 0;
      else {
          kill(pid_, sig);                                // send sig
          int w = waitpid(pid_, &status_, WNOHANG);        // proc still running?
          if (w == pid_) running_ = 0;                // ... No        
          else if (w != 0) {                                 // ... No and error
              cerr << "wrapper: waitpid returns " << w;
              cerr << " for pid : " << pid_ << endl;
              perror(path_);
              running_ = 0;
              return 0;
          }
          return 1;
      }
  }
  
  void wrapper::save_args(const char* const path, char* const * argV){
      if (path_ != NULL)  delete[] path_; path_ = NULL;
      if (argv_ != NULL) {
          int argC=0;
          while(argv_[argC] != (char*) 0) delete[] argv_[argC++]; 
          delete[] argv_;
          argv_ = NULL;
      }
      if (path != NULL) { 
          path_ = new char[1+strlen(path)]; 
          strcpy(path_, path); 
      }
      if (argV != NULL) { 
          int argC=0;
          while(argV[argC] != (char*)0) argC++; 
          argv_ = new char*[argC+1];
          argC=0;
          while(argV[argC] != (char*)0) {
              argv_[argC] = new char[1+strlen(argV[argC])] ;
              strcpy(argv_[argC], argV[argC]);
              argC++;
          }
          argv_[argC] = (char*)0;
      }
  }
  
  int wrapper::delay_out(int d) { 
      if (pipe_out_[0] == -1 || pipe_err_[0] == -1) {
          delay_out_ = d; 
          return 1;
      } else {
          delay_out_ = d; 
          int out_flags = fcntl(pipe_out_[0], F_GETFL, 0);
          int err_flags = fcntl(pipe_err_[0], F_GETFL, 0);
          if (out_flags == -1 || err_flags == -1) return 0;
          else {
              out_flags = (d ? (out_flags & ~O_NONBLOCK) : 
                           (out_flags | O_NONBLOCK));
              err_flags = (d ? (err_flags & ~O_NONBLOCK) : 
                           (err_flags | O_NONBLOCK));
              return (
                      fcntl(pipe_out_[0], F_SETFL, out_flags) != -1 &&
                      fcntl(pipe_err_[0], F_SETFL, err_flags) != -1
                      );
          }
      }
  }
  
  void wrapper::async_error(const char* const buf, int nbytes) {  
      int l=strlen(err_buf);
      for(int i = 0; i<nbytes; i++) {
          if(buf[i] != '\n' && buf[i] != 0) err_buf[l++] = buf[i];
          else {
              err_buf[l]=0;
              error_handler(err_buf);
              l = 0;
          }
      }
      err_buf[l]=0;
  }
  
  void wrapper::async_output(const char* const buf, int nbytes) {
      int l=strlen(out_buf);
      for(int i = 0; i<nbytes; i++) {
          if(buf[i] != '\n' && buf[i] != 0) out_buf[l++] = buf[i];
          else {
              out_buf[l]=0;
              output_handler(out_buf);
              l = 0;
          }
      }
      out_buf[l]=0;
  }
  
  int wrapper::delay_out() const { return delay_out_ ;}
  int wrapper::rerun(){ close_pipes(); return run(path_, argv_); }
  
  void wrapper::output_handler(const char* const s) {
          cout << "wrapper output_handler(\"" << s << "\")" << endl;
  }
  
  void wrapper::error_handler(const char* const s) {
          cout << "wrapper error_handler(\"" << s << "\")" << endl;
  }
  
  int wrapper::operator()() {
          const int fd = _event->fd();
          require(fd == pipe_out_[0] || fd == pipe_err_[0]);
  
          char buf[BUFSIZ]; int nbytes=0;
          while((nbytes=read(fd, buf, sizeof(buf))) > 0)
                  fd == pipe_out_[0] ? async_output(buf, nbytes):  async_error(buf,nbytes);
          return 0;
  }
  

[.] Papers Tutorials Examples Manuals Interfaces Sources Packages Resources ?
Hush Online Technology
hush@cs.vu.nl
09/09/98