Tutorials/i1Net/mgen: mgen_oml.2.patch

File mgen_oml.2.patch, 14.6 KB (added by nilanjan, 6 years ago)
  • include/CWriteOml.h

    diff -Naur mgen.orig/include/CWriteOml.h MGEN_SRC.oml/include/CWriteOml.h
    old new  
     1#ifndef _CWRITEOML_H_
     2#define _CWRITEOML_H_
     3
     4#include <iostream>
     5#include <vector>
     6#include <map>
     7#include <string.h>
     8#include "oml2/omlc.h"
     9
     10#include <boost/lexical_cast.hpp>
     11
     12
     13#define FALSE (unsigned int) 0
     14#define TRUE  (unsigned int) 1
     15
     16
     17class CWriteOml
     18{
     19  private:
     20    unsigned int bReady;
     21    unsigned int _MeasurementPoints;
     22    std::string mHostName;
     23    OmlValueU* _values;
     24
     25    std::map<std::string, std::pair<OmlValueT, OmlValueU*> >            _KTVMap;
     26    std::map<std::string, std::pair<OmlValueT, OmlValueU*> >::iterator  _KTVMapIter;
     27
     28
     29    OmlMPDef* mp_def;
     30    OmlMP* _mp_handle;
     31    std::string _db_filename;
     32    std::string _server_name;
     33
     34    void createMeasurementPoint(OmlMPDef* pOmlMPDef, std::string str, OmlValueT type);
     35
     36  public:
     37    CWriteOml();
     38    ~CWriteOml();
     39
     40    void CWriteOML(std::string db_filename, std::string server_name);
     41    void init(std::string db_filename, std::string server_name);
     42    void start( std::vector< std::pair<std::string, OmlValueT> >& oml_key_list );
     43    void set_key(std::string key_str, void* val_ptr);
     44    void insert();
     45    void stop();
     46
     47    unsigned int isReady() { return bReady; }
     48
     49};
     50
     51#endif
  • include/mgen.h

    diff -Naur mgen.orig/include/mgen.h MGEN_SRC.oml/include/mgen.h
    old new  
    55#include "mgenFlow.h"
    66#include "mgenGlobals.h"
    77#include "mgenMsg.h"
     8#include "CWriteOml.h"
     9
     10
     11#define DBG_OUT(x) std::cerr << "[" << __FILE__ << "] " << #x << " = " << x << std::endl;
    812
    913class MgenController
    1014{
     
    104108class Mgen
    105109{
    106110  public:
     111   CWriteOml oml;
     112
    107113    enum {SCRIPT_LINE_MAX = 8192};  // maximum script line length
    108114   
    109115    Mgen(ProtoTimerMgr&         timerMgr,
  • makefiles/Makefile.common

    diff -Naur mgen.orig/makefiles/Makefile.common MGEN_SRC.oml/makefiles/Makefile.common
    old new  
    4141           $(COMMON)/mgenTransport.cpp $(COMMON)/mgenPattern.cpp \
    4242           $(COMMON)/mgenPayload.cpp \
    4343           $(COMMON)/mgenSequencer.cpp \
     44           $(COMMON)/CWriteOml.cpp \
    4445           $(COMMON)/gpsPub.cpp $(COMMON)/mgenAppSinkTransport.cpp
    4546         
    4647MGEN_OBJ = $(MGEN_SRC:.cpp=.o)
  • makefiles/Makefile.linux

    diff -Naur mgen.orig/makefiles/Makefile.linux MGEN_SRC.oml/makefiles/Makefile.linux
    old new  
    77#
    88SYSTEM_INCLUDES = -I/usr/X11R6/include
    99SYSTEM_LDFLAGS = -L/usr/X11R6/lib
    10 SYSTEM_LIBS = -ldl -lpthread -lpcap
     10SYSTEM_LIBS = -ldl -lpthread -lpcap -loml2 -locomm
    1111
    1212# 2) System specific capabilities
    1313# Must choose appropriate for the following:
  • src/common/CWriteOml.cpp

    diff -Naur mgen.orig/src/common/CWriteOml.cpp MGEN_SRC.oml/src/common/CWriteOml.cpp
    old new  
     1
     2#include "CWriteOml.h"
     3#include <iostream>
     4#include <vector>
     5#include <map>
     6#include <string.h>
     7#include "oml2/omlc.h"
     8
     9//#include <boost/lexical_cast.hpp>
     10
     11
     12CWriteOml::CWriteOml()
     13{
     14}
     15
     16void CWriteOml::CWriteOML(std::string db_filename, std::string server_name)
     17{
     18  bReady = FALSE;
     19  init(db_filename, server_name);
     20}   
     21
     22CWriteOml::~CWriteOml()
     23{
     24  omlc_close();
     25}
     26
     27
     28void CWriteOml::init(std::string db_filename, std::string server_name)
     29{
     30  _db_filename = db_filename;
     31  _server_name = server_name;
     32
     33  std::string fname;
     34  int argc;
     35  const char** argv;
     36  std::vector<char*> arg_vector;
     37
     38  //char chostname[32];
     39  //for (int i = 0; i < 32;++i)
     40  //  chostname[i] = '\0';
     41  //gethostname(chostname, 31);
     42  //mHostName = std::string(chostname);
     43
     44  std::string mode(server_name.c_str());
     45
     46  if (mode == "file")
     47 {
     48   fname = db_filename;//  + "_" +  mHostName;
     49   std::cout << fname << std::endl;
     50   argc = 7;
     51   const char* argv_file[] = {"./spectrum", "--oml-id",(const char*)"et", "--oml-exp-id",db_filename.c_str(), "--oml-file",fname.c_str()};
     52   argv = argv_file;
     53  }
     54  else
     55  {
     56    // Following two lines were being optimized out
     57    //const char* argv_server[] = {"./spectrum", "--oml-id",(const char*)"et", "--oml-domain",db_filename.c_str(), "--oml-collect", server_name.c_str() };
     58    //argv = argv_server;
     59
     60    arg_vector.push_back((char*)"./spectrum");
     61    arg_vector.push_back((char*)"--oml-id");
     62    arg_vector.push_back((char*)"et");
     63    arg_vector.push_back((char*)"--oml-domain");
     64    arg_vector.push_back((char*)db_filename.c_str());
     65    arg_vector.push_back((char*)"--oml-collect");
     66    arg_vector.push_back((char*)server_name.c_str());
     67    argv = (const char**)&arg_vector[0];
     68
     69    argc = arg_vector.size(); // argc = 7;
     70
     71  }
     72
     73  int result = omlc_init ("_mp_", &argc, argv, NULL);
     74  if (result == -1) {
     75    std::cerr << "Could not initialize OML\n";
     76    exit (1);
     77  }
     78
     79}
     80
     81
     82void CWriteOml::start( std::vector<std::pair<std::string, OmlValueT> >& _OmlKeys )
     83{
     84  int result;
     85
     86  _MeasurementPoints = _OmlKeys.size();
     87
     88  mp_def = new OmlMPDef [(sizeof(OmlMPDef) * (_MeasurementPoints + 1) )];
     89
     90  // define measurement points
     91  unsigned int idx;
     92  for (idx = 0; idx < _MeasurementPoints; ++idx)
     93    createMeasurementPoint(&mp_def[idx], _OmlKeys.at(idx).first, (OmlValueT)_OmlKeys.at(idx).second);
     94  createMeasurementPoint(&mp_def[idx],      "NULL",        (OmlValueT)0);
     95
     96  _mp_handle = omlc_add_mp (_db_filename.c_str(), mp_def); // using db_filename as tag name for measurement point
     97
     98  if (_mp_handle == NULL) {
     99    std::cerr << "Error: could not register Measurement Point \"data\"";
     100    exit (1);
     101  }
     102
     103  result = omlc_start();
     104  if (result == -1) {
     105    std::cerr << "Error starting up OML measurement streams\n";
     106    exit (1);
     107  }
     108
     109  // allocate memory measurement points
     110  _values = (OmlValueU*) malloc(sizeof(OmlValueU) * _MeasurementPoints);
     111  memset((void*)_values, 0, sizeof(OmlValueU) * _MeasurementPoints );
     112
     113  // create oml key <==> (type,value) mapping
     114  _KTVMap.clear();
     115  for (unsigned int idx = 0; idx < _MeasurementPoints; ++idx) {
     116    std::pair<OmlValueT, OmlValueU*> TV (_OmlKeys.at(idx).second, (OmlValueU*)&_values[idx] );
     117   
     118    std::pair<std::string, std::pair<OmlValueT, OmlValueU*> > KTV( _OmlKeys.at(idx).first, TV );
     119    _KTVMap.insert( KTV );
     120  }
     121
     122  bReady = TRUE;
     123}
     124
     125
     126void CWriteOml::set_key(std::string key_str, void* val_ptr)
     127{
     128
     129  _KTVMapIter = _KTVMap.find(key_str);
     130  if (_KTVMapIter == _KTVMap.end()) {
     131    std::cerr << key_str << " not found" << std::endl;
     132    return;  // key not found so return and do nothing
     133  }
     134
     135  //key found to look at type are call appropriate oml intrinsic function
     136  std::pair<OmlValueT, OmlValueU*> TV = _KTVMapIter->second;
     137  switch( TV.first ) {
     138  case OML_INT32_VALUE :
     139    omlc_set_int32   ( *(TV.second), (int32_t) (*((int32_t*)val_ptr)));
     140    break;
     141
     142  case OML_UINT32_VALUE :
     143    omlc_set_uint32   ( *(TV.second), (uint32_t) (*((uint32_t*)val_ptr)));
     144    break;
     145
     146  case OML_INT64_VALUE :
     147    omlc_set_int64   ( *(TV.second), (int64_t) (*((int64_t*)val_ptr)));
     148    break;
     149  case OML_DOUBLE_VALUE :
     150    omlc_set_double   ( *(TV.second), (double) (*((double*)val_ptr)));
     151    break;
     152  case OML_STRING_VALUE :
     153    omlc_set_string( *(TV.second), (char*)val_ptr);
     154    break;
     155    // add other cases here
     156  default :
     157    std::cerr << "OML - unrecognizeg type, value: " << TV.first << " , " << TV.second << std::endl;
     158    break;
     159  }
     160
     161  return;
     162}
     163
     164void CWriteOml::createMeasurementPoint(OmlMPDef* pOmlMPDef, std::string str, OmlValueT type)
     165{
     166  char* cptr;
     167  if (str == "NULL") {
     168    pOmlMPDef->name = NULL;
     169    pOmlMPDef->param_types = type;
     170  }
     171  else {
     172    cptr = new char[str.size()+1];
     173    strcpy (cptr, str.c_str());
     174    pOmlMPDef->name = cptr;
     175    pOmlMPDef->param_types = type;
     176  }
     177}
     178
     179void CWriteOml::insert()
     180{
     181  omlc_inject (_mp_handle, _values);
     182}
     183
     184void CWriteOml::stop()
     185{
     186  omlc_close();
     187  free(_values);
     188}
  • src/common/mgenApp.cpp

    diff -Naur mgen.orig/src/common/mgenApp.cpp MGEN_SRC.oml/src/common/mgenApp.cpp
    old new  
    1616#include <unistd.h>
    1717#include <fcntl.h>
    1818#endif // UNIX
     19
     20#include <boost/algorithm/string.hpp>
     21
     22
     23
    1924MgenApp::MgenApp()
    2025  :  mgen(GetTimerMgr(), GetSocketNotifier()),
    2126     control_pipe(ProtoPipe::MESSAGE), control_remote(false),
     
    5661            "     [queue <queueSize>][broadcast {on|off}]\n"
    5762            "     [convert <binaryLog>][debug <debugLevel>]\n"
    5863            "     [gpskey <gpsSharedMemoryLocation>]\n"
    59             "     [boost] [reuse {on|off}]\n");
     64            "     [boost] [reuse {on|off}]\n"
     65        "     [oml <server:port[,omlFile]>]\n");
     66
    6067}  // end MgenApp::Usage()
    6168
    6269
     
    8087    "+gpskey",    // Override default gps shared memory location
    8188    "+logdata",    // log optional data attribute? default ON
    8289    "+loggpsdata", // log gps data? default ON
     90    "+oml",        // oml: send data to this address:port
    8391    NULL
    8492};
    8593
     
    447455      if (!dispatcher.BoostPriority())
    448456    fprintf(stderr,"Unable to boost process priority.\n");
    449457    }
     458    else if (!strncmp("oml", lowerCmd, len))
     459    {
     460      std::string temp( val );
     461      std::vector<std::string> sToken;
     462      boost::split(sToken, temp, boost::is_any_of(",") );
     463
     464#if 1
     465
     466      std::string omlDbFilename("oml_mgen_test");
     467      std::string omlServerName;
     468
     469      omlServerName = sToken.at(0);
     470      if (sToken.size() == 2) {
     471    omlDbFilename = sToken.at(1);
     472      }
     473
     474      mgen.oml.init(omlDbFilename, omlServerName );
     475      std::vector< std::pair<std::string, OmlValueT> > _omlKeys; // this is just used here to hold all identifiers and values in one container
     476                                                                 // before passing to start().
     477
     478      _omlKeys.push_back( std::make_pair("proto",    OML_STRING_VALUE));
     479      _omlKeys.push_back( std::make_pair("flowid",   OML_INT32_VALUE));
     480      _omlKeys.push_back( std::make_pair("seq_num",  OML_INT32_VALUE));
     481      _omlKeys.push_back( std::make_pair("src_addr", OML_STRING_VALUE));
     482      _omlKeys.push_back( std::make_pair("src_port", OML_INT32_VALUE));
     483      _omlKeys.push_back( std::make_pair("dst_addr", OML_STRING_VALUE));
     484      _omlKeys.push_back( std::make_pair("dst_port", OML_INT32_VALUE));
     485      _omlKeys.push_back( std::make_pair("data_len", OML_INT32_VALUE));
     486
     487
     488      //_omlKeys.push_back( std::make_pair("a_double64",    OML_DOUBLE_VALUE) );
     489      //_omlKeys.push_back( std::make_pair("a____int32",   OML_INT32_VALUE));
     490      //_omlKeys.push_back( std::make_pair("a___string",   OML_STRING_VALUE));
     491      //_omlKeys.push_back( std::make_pair("ExecTime",    OML_DOUBLE_VALUE));
     492      mgen.oml.start( _omlKeys );
     493#endif
     494    }
    450495    else if (!strncmp("help", lowerCmd, len))
    451496    {
    452497      fprintf(stderr, "mgen: version %s\n", MGEN_VERSION);
  • src/common/mgenTransport.cpp

    diff -Naur mgen.orig/src/common/mgenTransport.cpp MGEN_SRC.oml/src/common/mgenTransport.cpp
    old new  
    305305    {
    306306    case SEND_EVENT:
    307307      {
    308           if (mgen.GetLogTx())
    309           {
    310               theMsg->LogSendEvent(mgen.GetLogFile(),
    311                                    mgen.GetLogBinary(),
    312                                    mgen.GetLocalTime(),
    313                                    buffer,
    314                                    mgen.GetLogFlush(),
    315                                    theTime);
    316           }
    317           break;
     308
     309#if 1
     310    if ( mgen.oml.isReady() == TRUE ) {
     311      std::string protocol_string ( MgenEvent::GetStringFromProtocol(protocol) );
     312      unsigned int u32_flowid   = theMsg->GetFlowId();
     313      unsigned int u32_seqnum   = theMsg->GetSeqNum();
     314      std::string str_srcaddr   = theMsg->GetSrcAddr().GetHostString();
     315      unsigned int u32_srcport  = theMsg->GetSrcAddr().GetPort();
     316      std::string str_dstaddr   = theMsg->GetDstAddr().GetHostString();
     317      unsigned int u32_dstport  = theMsg->GetDstAddr().GetPort();
     318      unsigned int u32_datalen  = theMsg->GetMgenMsgLen();
     319
     320      mgen.oml.set_key("proto",     (void*)protocol_string.c_str() );
     321      mgen.oml.set_key("flowid",    (void*)&u32_flowid );
     322      mgen.oml.set_key("seq_num",   (void*)&u32_seqnum );
     323      mgen.oml.set_key("src_addr",  (void*)str_srcaddr.c_str() );
     324      mgen.oml.set_key("src_port",  (void*)&u32_srcport );
     325      mgen.oml.set_key("dst_addr",  (void*)str_dstaddr.c_str() );
     326      mgen.oml.set_key("dst_port",  (void*)&u32_dstport );
     327      mgen.oml.set_key("data_len",  (void*)&u32_datalen );
     328      mgen.oml.insert();
     329    }
     330#endif
     331
     332    if (mgen.GetLogTx())
     333        {
     334      theMsg->LogSendEvent(mgen.GetLogFile(),
     335                   mgen.GetLogBinary(),
     336                   mgen.GetLocalTime(),
     337                   buffer,
     338                   mgen.GetLogFlush(),
     339                   theTime);
     340        }
     341    break;
    318342      }
    319343    case RECV_EVENT:
    320344      {
     
    328352                               mgen.GetLogFlush(),
    329353                               theTime);
    330354
     355#if 1
     356      if ( mgen.oml.isReady() == TRUE ) {
     357        std::string protocol_string ( MgenEvent::GetStringFromProtocol(protocol) );
     358        unsigned int u32_flowid   = theMsg->GetFlowId();
     359        unsigned int u32_seqnum   = theMsg->GetSeqNum();
     360        std::string str_srcaddr   = theMsg->GetSrcAddr().GetHostString();
     361        unsigned int u32_srcport  = theMsg->GetSrcAddr().GetPort();
     362        std::string str_dstaddr   = theMsg->GetDstAddr().GetHostString();
     363        unsigned int u32_dstport  = theMsg->GetDstAddr().GetPort();
     364        unsigned int u32_datalen  = theMsg->GetMsgLen();
     365
     366        mgen.oml.set_key("proto",     (void*)protocol_string.c_str() );
     367        mgen.oml.set_key("flowid",    (void*)&u32_flowid );
     368        mgen.oml.set_key("seq_num",   (void*)&u32_seqnum );
     369        mgen.oml.set_key("src_addr",  (void*)str_srcaddr.c_str() );
     370        mgen.oml.set_key("src_port",  (void*)&u32_srcport );
     371        mgen.oml.set_key("dst_addr",  (void*)str_dstaddr.c_str() );
     372        mgen.oml.set_key("dst_port",  (void*)&u32_dstport );
     373        mgen.oml.set_key("data_len",  (void*)&u32_datalen );
     374        mgen.oml.insert();
     375      }
     376#endif
     377
     378
    331379          // Don't we want rapr to get the message regardless of logging??
    332380          // Could this possibly have been broken too? strange... ljt
    333381          if (mgen.GetController())