cMsg Messaging System  5.2
All Classes Namespaces Files Functions Variables Friends
cMsgWrapper.cc
Go to the documentation of this file.
1 // to do
2 
3 // subscribe must lock list until done ?
4 
5 
6 
7 /*---------------------------------------------------------------------------*
8 * Copyright (c) 2005 Southeastern Universities Research Association, *
9 * Thomas Jefferson National Accelerator Facility *
10 * *
11 * This software was developed under a United States Government license *
12 * described in the NOTICE file included as part of this distribution. *
13 * *
14 * E.Wolin, 25-Feb-2005, Jefferson Lab *
15 * *
16 * Authors: Elliott Wolin *
17 * wolin@jlab.org Jefferson Lab, MS-6B *
18 * Phone: (757) 269-7365 12000 Jefferson Ave. *
19 * Fax: (757) 269-5519 Newport News, VA 23606 *
20 * *
21 *----------------------------------------------------------------------------*/
22 
23 
24 #include <cMsg.hxx>
25 #include <cMsgPrivate.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <unistd.h>
29 #include <pthread.h>
30 #include <vector>
31 #include <sstream>
32 
33 using namespace std;
34 using namespace cmsg;
35 
36 
37 
38 //-----------------------------------------------------------------------------
39 // local data and static functions
40 //-----------------------------------------------------------------------------
41 
42 
43 namespace cmsg {
44 
45 
47 typedef struct {
48  cMsgCallback *cb;
49  void *userArg;
50 } dispatcherStruct;
51 
52 
54 typedef struct {
55  void *domainId;
56  void *handle;
57  string subject;
58  string type;
59  dispatcherStruct *d;
60 } subscrStruct;
61 
62 
64 static vector<subscrStruct*> subscrVec;
65 
66 
68 static pthread_mutex_t subscrMutex = PTHREAD_MUTEX_INITIALIZER;
69 
70 
71 //-----------------------------------------------------------------------------
72 
73 
75 static void callbackDispatcher(void *msg, void *userArg) {
76  dispatcherStruct *ds = (dispatcherStruct*)userArg;
77  ds->cb->callback(new cMsgMessage(msg),ds->userArg);
78 }
79 
80 
81 //-----------------------------------------------------------------------------
82 
83 
95  /*
96 static bool subscriptionExists(void *domainId, const string &subject, const string &type,
97  cMsgCallback *cb, void *userArg) {
98 
99  bool itExists = false;
100 
101 
102  // search list for matching subscription
103  pthread_mutex_lock(&subscrMutex);
104  for(unsigned int i=0; i<subscrVec.size(); i++) {
105  if( (subscrVec[i]->domainId == domainId) &&
106  (subscrVec[i]->subject == subject) &&
107  (subscrVec[i]->type == type) &&
108  (subscrVec[i]->d->cb == cb) &&
109  (subscrVec[i]->d->userArg == userArg)
110  ) {
111  itExists=true;
112  break;
113  }
114  }
115  pthread_mutex_unlock(&subscrMutex);
116 
117 
118  // done searching
119  return(itExists);
120 }
121  */
122 
123 //-----------------------------------------------------------------------------
124 
125 
135 static void addSubscription(void *domainId, const string &subject, const string &type,
136  dispatcherStruct *d, void *handle) {
137 
138  subscrStruct *s = new subscrStruct();
139 
140  s->domainId=domainId;
141  s->subject=subject;
142  s->type=type;
143  s->d=d;
144  s->handle=handle;
145 
146  pthread_mutex_lock(&subscrMutex);
147  subscrVec.push_back(s);
148  pthread_mutex_unlock(&subscrMutex);
149 
150  return;
151 }
152 
153 
154 //-----------------------------------------------------------------------------
155 
156 
165 static bool deleteSubscription(void *domainId, void *handle) {
166 
167  bool deleted = false;
168 
169  pthread_mutex_lock(&subscrMutex);
170  vector<subscrStruct*>::iterator iter;
171  for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
172  if(((*iter)->domainId==domainId)&&((*iter)->handle==handle)) {
173  delete((*iter)->d);
174  delete(*iter);
175  subscrVec.erase(iter);
176  deleted=true;
177  break;
178  }
179  }
180  pthread_mutex_unlock(&subscrMutex);
181 
182  return(deleted);
183 }
184 
185 
186 //-----------------------------------------------------------------------------
187 
188 
194 static void deleteSubscriptions(void *domainId) {
195 
196  pthread_mutex_lock(&subscrMutex);
197  vector<subscrStruct*>::iterator iter;
198  for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
199  if((*iter)->domainId==domainId) {
200  delete((*iter)->d);
201  delete(*iter);
202  subscrVec.erase(iter);
203  }
204  }
205  pthread_mutex_unlock(&subscrMutex);
206 
207  return;
208 }
209 
210 
211 //-----------------------------------------------------------------------------
212 
213 } // namespace cmsg
214 
215 
216 
217 //-----------------------------------------------------------------------------
218 // cMsgException methods
219 //-----------------------------------------------------------------------------
220 
221 
225 cMsgException::cMsgException(void) : descr(""), returnCode(0) {}
226 
227 
228 //-----------------------------------------------------------------------------
229 
230 
236 cMsgException::cMsgException(const string &c) : descr(c), returnCode(0) {}
237 
238 
239 //-----------------------------------------------------------------------------
240 
241 
248 cMsgException::cMsgException(const string &c, int code) : descr(c), returnCode(code) {}
249 
250 
251 //-----------------------------------------------------------------------------
252 
253 
259 cMsgException::cMsgException(const cMsgException &e) : descr(e.descr), returnCode(e.returnCode) {}
260 
261 
262 //-----------------------------------------------------------------------------
263 
264 
269 }
270 
271 
272 //-----------------------------------------------------------------------------
273 
274 
280 string cMsgException::toString(void) const throw() {
281  stringstream ss;
282  ss << "?cMsgException returnCode = " << returnCode << " descr = " << descr << ends;
283  return(ss.str());
284 }
285 
286 
287 //-----------------------------------------------------------------------------
288 
289 
295 const char *cMsgException::what(void) const throw() {
296  return(toString().c_str());
297 }
298 
299 
300 //-----------------------------------------------------------------------------
301 // cMsgMessage methods
302 //-----------------------------------------------------------------------------
303 
304 
310 
311  myMsgPointer=cMsgCreateMessage();
312  if(myMsgPointer==NULL) {
313  throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
314  }
315 }
316 
317 
318 //-----------------------------------------------------------------------------
319 
320 
328 
329  myMsgPointer=cMsgCopyMessage(msg.myMsgPointer);
330  if(myMsgPointer==NULL) {
331  throw(cMsgException("?cMsgMessage copy constructor...unable to create message",CMSG_ERROR));
332  }
333 }
334 
335 
336 //-----------------------------------------------------------------------------
337 
338 
345 cMsgMessage::cMsgMessage(void *msgPointer) throw(cMsgException) {
346 
347  myMsgPointer=msgPointer;
348  if(myMsgPointer==NULL) {
349  throw(cMsgException("?cMsgMessage pointer constructor...unable to create message",CMSG_ERROR));
350  }
351 }
352 
353 
354 //-----------------------------------------------------------------------------
355 
356 
361  if(myMsgPointer!=NULL)cMsgFreeMessage(&myMsgPointer);
362 }
363 
364 
365 //-----------------------------------------------------------------------------
366 
367 
374 string cMsgMessage::getSubject(void) const throw(cMsgException) {
375 
376  const char *s;
377 
378  int stat;
379  if((stat=cMsgGetSubject(myMsgPointer,&s))!=CMSG_OK) {
380  throw(cMsgException(cMsgPerror(stat),stat));
381  }
382 
383  if(s==NULL) {
384  return("null");
385  } else {
386  return(string(s));
387  }
388 }
389 
390 
391 //-----------------------------------------------------------------------------
392 
393 
400 void cMsgMessage::setSubject(const string &subject) throw(cMsgException) {
401 
402  int stat;
403  if((stat=cMsgSetSubject(myMsgPointer,subject.c_str()))!=CMSG_OK) {
404  throw(cMsgException(cMsgPerror(stat),stat));
405  }
406 }
407 
408 
409 //-----------------------------------------------------------------------------
410 
411 
418 string cMsgMessage::getType(void) const throw(cMsgException) {
419 
420  const char *s;
421 
422  int stat;
423  if((stat=cMsgGetType(myMsgPointer,&s))!=CMSG_OK) {
424  throw(cMsgException(cMsgPerror(stat),stat));
425  }
426 
427  if(s==NULL) {
428  return("null");
429  } else {
430  return(string(s));
431  }
432 }
433 
434 
435 //-----------------------------------------------------------------------------
436 
437 
444 void cMsgMessage::setType(const string &type) throw(cMsgException) {
445 
446  int stat;
447  if((stat=cMsgSetType(myMsgPointer,type.c_str()))!=CMSG_OK) {
448  throw(cMsgException(cMsgPerror(stat),stat));
449  }
450 }
451 
452 
453 //-----------------------------------------------------------------------------
454 
455 
462 string cMsgMessage::getText(void) const throw(cMsgException) {
463 
464  const char *s;
465 
466  int stat;
467  if((stat=cMsgGetText(myMsgPointer,&s))!=CMSG_OK) {
468  throw(cMsgException(cMsgPerror(stat),stat));
469  }
470 
471  if(s==NULL) {
472  return("null");
473  } else {
474  return(string(s));
475  }
476 }
477 
478 
479 //-----------------------------------------------------------------------------
480 
481 
488 void cMsgMessage::setText(const string &text) throw(cMsgException) {
489 
490  int stat;
491  if((stat=cMsgSetText(myMsgPointer,text.c_str()))!=CMSG_OK) {
492  throw(cMsgException(cMsgPerror(stat),stat));
493  }
494 }
495 
496 
497 //-----------------------------------------------------------------------------
498 
499 
507 
508  int stat;
509  if((stat=cMsgSetByteArrayLength(myMsgPointer,length)!=CMSG_OK)) {
510  throw(cMsgException(cMsgPerror(stat),stat));
511  }
512 }
513 
514 
515 //-----------------------------------------------------------------------------
516 
517 
523  cMsgResetByteArrayLength(myMsgPointer);
524 }
525 
526 
527 //-----------------------------------------------------------------------------
528 
529 
536 
537  int i;
538  cMsgGetByteArrayLength(myMsgPointer,&i);
539  return(i);
540 }
541 
542 
543 //-----------------------------------------------------------------------------
544 
545 
552  int i;
553  cMsgGetByteArrayLengthFull(myMsgPointer,&i);
554  return(i);
555 }
556 
557 
558 //-----------------------------------------------------------------------------
559 
560 
568 
569  int stat;
570  if((stat=cMsgSetByteArrayOffset(myMsgPointer,offset)!=CMSG_OK)) {
571  throw(cMsgException(cMsgPerror(stat),stat));
572  }
573 }
574 
575 
576 //-----------------------------------------------------------------------------
577 
578 
585 
586  int i;
587  cMsgGetByteArrayOffset(myMsgPointer,&i);
588  return(i);
589 }
590 
591 
592 //-----------------------------------------------------------------------------
593 
594 
602 void cMsgMessage::setByteArray(char *array, int length) throw(cMsgException) {
603 
604  int stat;
605  if((stat=cMsgSetByteArray(myMsgPointer,array, length)!=CMSG_OK)) {
606  throw(cMsgException(cMsgPerror(stat),stat));
607  }
608 }
609 
610 
611 //-----------------------------------------------------------------------------
612 
613 
621 void cMsgMessage::setByteArrayNoCopy(char* array, int length) throw(cMsgException) {
622 
623  int stat;
624  if((stat=cMsgSetByteArrayNoCopy(myMsgPointer,array,length)!=CMSG_OK)) {
625  throw(cMsgException(cMsgPerror(stat),stat));
626  }
627 }
628 
629 
630 //-----------------------------------------------------------------------------
631 
632 
639 
640  char *p;
641  cMsgGetByteArray(myMsgPointer,&p);
642  return(p);
643 }
644 
645 
646 //-----------------------------------------------------------------------------
647 
648 
660 
661  int endian;
662  cMsgGetByteArrayEndian(myMsgPointer,&endian);
663  return(endian);
664 }
665 
666 
667 //-----------------------------------------------------------------------------
668 
669 
685  int stat;
686 
687  if((stat=cMsgSetByteArrayEndian(myMsgPointer,endian))!=CMSG_OK) {
688  throw(cMsgException(cMsgPerror(stat),stat));
689  }
690 
691  return;
692 }
693 
694 
695 //-----------------------------------------------------------------------------
696 
697 
704 bool cMsgMessage::needToSwap(void) const throw(cMsgException) {
705 
706  int flag,stat;
707 
708  if((stat=cMsgNeedToSwap(myMsgPointer,&flag))!=CMSG_OK) {
709  throw(cMsgException(cMsgPerror(stat),stat));
710  }
711 
712  return(flag==1);
713 }
714 
715 
716 //-----------------------------------------------------------------------------
717 
718 
725 int cMsgMessage::getUserInt(void) const throw(cMsgException) {
726 
727  int i;
728 
729  int stat;
730  if((stat=cMsgGetUserInt(myMsgPointer,&i))!=CMSG_OK) {
731  throw(cMsgException(cMsgPerror(stat),stat));
732  }
733  return(i);
734 }
735 
736 
737 //-----------------------------------------------------------------------------
738 
739 
747 
748  int stat;
749  if((stat=cMsgSetUserInt(myMsgPointer,i))!=CMSG_OK) {
750  throw(cMsgException(cMsgPerror(stat),stat));
751  }
752 }
753 
754 
755 //-----------------------------------------------------------------------------
756 
757 
764 struct timespec cMsgMessage::getUserTime(void) const throw(cMsgException) {
765 
766  struct timespec t;
767 
768  int stat;
769  if((stat=cMsgGetUserTime(myMsgPointer,&t))!=CMSG_OK) {
770  throw(cMsgException(cMsgPerror(stat),stat));
771  }
772  return(t);
773 }
774 
775 
776 //-----------------------------------------------------------------------------
777 
778 
784 void cMsgMessage::setUserTime(const struct timespec &userTime) throw(cMsgException) {
785 
786  int stat;
787  if((stat=cMsgSetUserTime(myMsgPointer, &userTime))!=CMSG_OK) {
788  throw(cMsgException(cMsgPerror(stat),stat));
789  }
790 }
791 
792 
793 //-----------------------------------------------------------------------------
794 
795 
801 int cMsgMessage::getVersion(void) const throw(cMsgException) {
802 
803  int version;
804 
805  int stat;
806  if((stat=cMsgGetVersion(myMsgPointer, &version))!=CMSG_OK) {
807  throw(cMsgException(cMsgPerror(stat),stat));
808  }
809  return(version);
810 }
811 
812 
813 //-----------------------------------------------------------------------------
814 
815 
822 
823  void *newPointer = cMsgCopyMessage(myMsgPointer);
824  return(new cMsgMessage(newPointer));
825 }
826 
827 
828 //-----------------------------------------------------------------------------
829 
830 
837 string cMsgMessage::getDomain(void) const throw(cMsgException) {
838 
839  const char *s;
840 
841  int stat;
842  if((stat=cMsgGetDomain(myMsgPointer,&s))!=CMSG_OK) {
843  throw(cMsgException(cMsgPerror(stat),stat));
844  };
845 
846  if(s==NULL) {
847  return("null");
848  } else {
849  return(string(s));
850  }
851 }
852 
853 
854 //-----------------------------------------------------------------------------
855 
856 
863 string cMsgMessage::getReceiver(void) const throw(cMsgException) {
864 
865  const char *s;
866 
867  int stat;
868  if((stat=cMsgGetReceiver(myMsgPointer,&s))!=CMSG_OK) {
869  throw(cMsgException(cMsgPerror(stat),stat));
870  };
871 
872  if(s==NULL) {
873  return("null");
874  } else {
875  return(string(s));
876  }
877 }
878 
879 
880 //-----------------------------------------------------------------------------
881 
882 
889 string cMsgMessage::getReceiverHost(void) const throw(cMsgException) {
890 
891  const char *s;
892 
893  int stat;
894  if((stat=cMsgGetReceiverHost(myMsgPointer,&s))!=CMSG_OK) {
895  throw(cMsgException(cMsgPerror(stat),stat));
896  };
897 
898  if(s==NULL) {
899  return("null");
900  } else {
901  return(string(s));
902  }
903 }
904 
905 
906 //-----------------------------------------------------------------------------
907 
908 
915 string cMsgMessage::getSender(void) const throw(cMsgException) {
916 
917  const char *s;
918 
919  int stat;
920  if((stat=cMsgGetSender(myMsgPointer,&s))!=CMSG_OK) {
921  throw(cMsgException(cMsgPerror(stat),stat));
922  };
923 
924  if(s==NULL) {
925  return("null");
926  } else {
927  return(string(s));
928  }
929 }
930 
931 
932 //-----------------------------------------------------------------------------
933 
934 
941 string cMsgMessage::getSenderHost(void) const throw(cMsgException) {
942 
943  const char *s;
944 
945  int stat;
946  if((stat=cMsgGetSenderHost(myMsgPointer,&s))!=CMSG_OK) {
947  throw(cMsgException(cMsgPerror(stat),stat));
948  };
949 
950  if(s==NULL) {
951  return("null");
952  } else {
953  return(string(s));
954  }
955 }
956 
957 
958 //-----------------------------------------------------------------------------
959 
960 
967 struct timespec cMsgMessage::getReceiverTime(void) const throw(cMsgException) {
968 
969  struct timespec t;
970 
971  int stat;
972  if((stat=cMsgGetReceiverTime(myMsgPointer,&t))!=CMSG_OK) {
973  throw(cMsgException(cMsgPerror(stat),stat));
974  }
975  return(t);
976 }
977 
978 
979 //-----------------------------------------------------------------------------
980 
981 
988 struct timespec cMsgMessage::getSenderTime(void) const throw(cMsgException) {
989 
990  struct timespec t;
991 
992  int stat;
993  if((stat=cMsgGetSenderTime(myMsgPointer,&t))!=CMSG_OK) {
994  throw(cMsgException(cMsgPerror(stat),stat));
995  }
996  return(t);
997 }
998 
999 
1000 //-----------------------------------------------------------------------------
1001 
1002 
1008 bool cMsgMessage::isGetRequest(void) const throw(cMsgException) {
1009 
1010  int b;
1011 
1012  int stat;
1013  if((stat=cMsgGetGetRequest(myMsgPointer,&b))!=CMSG_OK) {
1014  throw(cMsgException(cMsgPerror(stat),stat));
1015  }
1016  return(b);
1017 }
1018 
1019 
1020 //-----------------------------------------------------------------------------
1021 
1022 
1029 
1030  int b;
1031 
1032  int stat;
1033  if((stat=cMsgGetGetResponse(myMsgPointer,&b))!=CMSG_OK) {
1034  throw(cMsgException(cMsgPerror(stat),stat));
1035  }
1036  return(b);
1037 }
1038 
1039 
1040 //-----------------------------------------------------------------------------
1041 
1042 
1050 
1051  int b;
1052 
1053  int stat;
1054  if((stat=cMsgGetNullGetResponse(myMsgPointer,&b))!=CMSG_OK) {
1055  throw(cMsgException(cMsgPerror(stat),stat));
1056  }
1057  return(b);
1058 }
1059 
1060 
1061 //-----------------------------------------------------------------------------
1062 
1063 
1071 
1072  cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1073  cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
1074 
1075  t->sysMsgId = m->sysMsgId;
1076  t->senderToken = m->senderToken;
1077  t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
1078 }
1079 
1080 
1081 //-----------------------------------------------------------------------------
1082 
1083 
1091 
1092  cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1093  cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
1094 
1095  t->sysMsgId = m->sysMsgId;
1096  t->senderToken = m->senderToken;
1097  t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
1098 }
1099 
1100 
1101 //-----------------------------------------------------------------------------
1102 
1103 
1111 
1112  cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1113  cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
1114 
1115  t->sysMsgId = m->sysMsgId;
1116  t->senderToken = m->senderToken;
1117  t->info = CMSG_IS_GET_RESPONSE;
1118 }
1119 
1120 
1121 //-----------------------------------------------------------------------------
1122 
1123 
1131 
1132  cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1133  cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
1134 
1135  t->sysMsgId = m->sysMsgId;
1136  t->senderToken = m->senderToken;
1137  t->info = CMSG_IS_GET_RESPONSE;
1138 }
1139 
1140 
1141 //-----------------------------------------------------------------------------
1142 
1143 
1151 
1152  void *newMsgPointer;
1153  if((newMsgPointer=cMsgCreateNullResponseMessage(myMsgPointer))==NULL) {
1154  throw(cMsgException("?cMsgMessage::nullResponse...unable to create message",CMSG_ERROR));
1155  }
1156 
1157  return(new cMsgMessage(newMsgPointer));
1158 }
1159 
1160 
1161 //-----------------------------------------------------------------------------
1162 
1163 
1171 
1172  void *newMsgPointer;
1173  if((newMsgPointer=cMsgCreateResponseMessage(myMsgPointer))==NULL) {
1174  throw(cMsgException("?cMsgMessage::response...unable to create message",CMSG_ERROR));
1175  }
1176 
1177  return(new cMsgMessage(newMsgPointer));
1178 }
1179 
1180 
1181 //-----------------------------------------------------------------------------
1182 
1183 
1191 
1192  int stat;
1193  if((stat=cMsgSetGetResponse(myMsgPointer,b))!=CMSG_OK) {
1194  throw(cMsgException(cMsgPerror(stat),stat));
1195  }
1196 }
1197 
1198 
1199 //-----------------------------------------------------------------------------
1200 
1201 
1209 
1210  int stat;
1211  if((stat=cMsgSetNullGetResponse(myMsgPointer,b))!=CMSG_OK) {
1212  throw(cMsgException(cMsgPerror(stat),stat));
1213  }
1214 }
1215 
1216 
1217 //-----------------------------------------------------------------------------
1218 
1219 
1226 string cMsgMessage::toString(void) const throw(cMsgException) {
1227 
1228  char *cs;
1229 
1230  int stat;
1231  if((stat=cMsgToString(myMsgPointer,&cs))!=CMSG_OK) {
1232  throw(cMsgException(cMsgPerror(stat),stat));
1233  }
1234 
1235  string s(cs);
1236  free(cs);
1237  return(s);
1238 
1239 }
1240 
1241 
1242 //-----------------------------------------------------------------------------
1243 // message context accessor functions
1244 //-----------------------------------------------------------------------------
1245 
1246 
1254 
1255  const char *s;
1256 
1257  int stat;
1258  if((stat=cMsgGetSubscriptionDomain(myMsgPointer,&s))!=CMSG_OK) {
1259  throw(cMsgException(cMsgPerror(stat),stat));
1260  };
1261 
1262  if(s==NULL) {
1263  return("null");
1264  } else {
1265  return(string(s));
1266  }
1267 }
1268 
1269 
1270 //-----------------------------------------------------------------------------
1271 
1272 
1280 
1281  const char *s;
1282 
1283  int stat;
1284  if((stat=cMsgGetSubscriptionSubject(myMsgPointer,&s))!=CMSG_OK) {
1285  throw(cMsgException(cMsgPerror(stat),stat));
1286  };
1287 
1288  if(s==NULL) {
1289  return("null");
1290  } else {
1291  return(string(s));
1292  }
1293 }
1294 
1295 
1296 //-----------------------------------------------------------------------------
1297 
1298 
1306 
1307  const char *s;
1308 
1309  int stat;
1310  if((stat=cMsgGetSubscriptionType(myMsgPointer,&s))!=CMSG_OK) {
1311  throw(cMsgException(cMsgPerror(stat),stat));
1312  };
1313 
1314  if(s==NULL) {
1315  return("null");
1316  } else {
1317  return(string(s));
1318  }
1319 }
1320 
1321 
1322 //-----------------------------------------------------------------------------
1323 
1324 
1332 
1333  const char *s;
1334 
1335  int stat;
1336  if((stat=cMsgGetSubscriptionUDL(myMsgPointer,&s))!=CMSG_OK) {
1337  throw(cMsgException(cMsgPerror(stat),stat));
1338  };
1339 
1340  if(s==NULL) {
1341  return("null");
1342  } else {
1343  return(string(s));
1344  }
1345 }
1346 
1347 
1348 //-----------------------------------------------------------------------------
1349 
1350 
1358 
1359  int i;
1360 
1361  int stat;
1362  if((stat=cMsgGetSubscriptionCueSize(myMsgPointer,&i))!=CMSG_OK) {
1363  throw(cMsgException(cMsgPerror(stat),stat));
1364  }
1365  return(i);
1366 }
1367 
1368 
1369 //-----------------------------------------------------------------------------
1370 
1371 
1379 
1380  int i;
1381 
1382  int stat;
1383  if((stat=cMsgGetReliableSend(myMsgPointer,&i))!=CMSG_OK) {
1384  throw(cMsgException(cMsgPerror(stat),stat));
1385  }
1386  return(i == 0 ? false : true);
1387 }
1388 
1389 
1390 //-----------------------------------------------------------------------------
1391 
1392 
1400 
1401  int i = b ? 1 : 0;
1402 
1403  int stat;
1404  if((stat=cMsgSetReliableSend(myMsgPointer,i))!=CMSG_OK) {
1405  throw(cMsgException(cMsgPerror(stat),stat));
1406  }
1407 }
1408 
1409 
1410 //-----------------------------------------------------------------------------
1411 // cMsgSubscriptionConfig methods
1412 //-----------------------------------------------------------------------------
1413 
1414 
1419  config = cMsgSubscribeConfigCreate();
1420 }
1421 
1422 
1423 //-----------------------------------------------------------------------------
1424 
1425 
1430  cMsgSubscribeConfigDestroy(config);
1431 }
1432 
1433 
1434 //-----------------------------------------------------------------------------
1435 
1436 
1443  int size;
1444  cMsgSubscribeGetMaxCueSize(config,&size);
1445  return(size);
1446 }
1447 
1448 
1449 //-----------------------------------------------------------------------------
1450 
1451 
1458  cMsgSubscribeSetMaxCueSize(config,size);
1459 }
1460 
1461 
1462 //-----------------------------------------------------------------------------
1463 
1464 
1471  int size;
1472  cMsgSubscribeGetSkipSize(config,&size);
1473  return(size);
1474 }
1475 
1476 
1477 //-----------------------------------------------------------------------------
1478 
1479 
1486  cMsgSubscribeSetSkipSize(config,size);
1487 }
1488 
1489 
1490 //-----------------------------------------------------------------------------
1491 
1492 
1499  int maySkip;
1500  cMsgSubscribeGetMaySkip(config,&maySkip);
1501  return((maySkip==0)?false:true);
1502 }
1503 
1504 
1505 //-----------------------------------------------------------------------------
1506 
1507 
1514  cMsgSubscribeSetMaySkip(config,(maySkip)?1:0);
1515 }
1516 
1517 
1518 //-----------------------------------------------------------------------------
1519 
1520 
1527  int maySerialize;
1528  cMsgSubscribeGetMustSerialize(config,&maySerialize);
1529  return((maySerialize==0)?false:true);
1530 }
1531 
1532 
1533 //-----------------------------------------------------------------------------
1534 
1535 
1542  cMsgSubscribeSetMustSerialize(config,(mustSerialize)?1:0);
1543 }
1544 
1545 
1546 //-----------------------------------------------------------------------------
1547 
1548 
1555  int max;
1556  cMsgSubscribeGetMaxThreads(config,&max);
1557  return(max);
1558 }
1559 
1560 
1561 //-----------------------------------------------------------------------------
1562 
1563 
1570  cMsgSubscribeSetMaxThreads(config,max);
1571 }
1572 
1573 
1574 //-----------------------------------------------------------------------------
1575 
1576 
1583  int mpt;
1584  cMsgSubscribeGetMessagesPerThread(config,&mpt);
1585  return(mpt);
1586 }
1587 
1588 
1589 //-----------------------------------------------------------------------------
1590 
1591 
1598  cMsgSubscribeSetMessagesPerThread(config,mpt);
1599 }
1600 
1601 
1602 //-----------------------------------------------------------------------------
1603 
1604 
1611  size_t size;
1612  cMsgSubscribeGetStackSize(config,&size);
1613  return(size);
1614 }
1615 
1616 
1617 //-----------------------------------------------------------------------------
1618 
1619 
1626  cMsgSubscribeSetStackSize(config,size);
1627 }
1628 
1629 
1630 //-----------------------------------------------------------------------------
1631 // cMsg methods
1632 //-----------------------------------------------------------------------------
1633 
1634 
1642 cMsg::cMsg(const string &UDL, const string &name, const string &descr)
1643  : myUDL(UDL), myName(name), myDescr(descr), initialized(false) {
1644 }
1645 
1646 
1647 //-----------------------------------------------------------------------------
1648 
1649 
1655  cMsg::disconnect();
1656 }
1657 
1658 
1659 //-----------------------------------------------------------------------------
1660 
1661 
1673 void cMsg::connect(void) throw(cMsgException) {
1674  int stat;
1675 
1676  // if we're already initialized (called connect once), we want to reconnect
1677  if(initialized) {
1678  if((stat=cMsgReconnect(myDomainId))!=CMSG_OK) {
1679  throw(cMsgException(cMsgPerror(stat),stat));
1680  }
1681  return;
1682  }
1683 
1684 
1685  if((stat=cMsgConnect(myUDL.c_str(),myName.c_str(),myDescr.c_str(),&myDomainId))!=CMSG_OK) {
1686  throw(cMsgException(cMsgPerror(stat),stat));
1687  }
1688  initialized=true;
1689 }
1690 
1691 
1692 //-----------------------------------------------------------------------------
1693 
1694 
1699 void cMsg::disconnect(void) throw(cMsgException) {
1700 
1701  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1702 
1703  cMsgDisconnect(&myDomainId);
1704 
1705  deleteSubscriptions(myDomainId);
1706 }
1707 
1708 
1709 //-----------------------------------------------------------------------------
1710 
1711 
1719 
1720  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1721 
1722 
1723  int stat;
1724  if((stat=cMsgSend(myDomainId,msg.myMsgPointer))!=CMSG_OK) {
1725  throw(cMsgException(cMsgPerror(stat),stat));
1726  }
1727 }
1728 
1729 
1730 //-----------------------------------------------------------------------------
1731 
1732 
1740  cMsg::send(*msg);
1741 }
1742 
1743 
1744 //-----------------------------------------------------------------------------
1745 
1746 
1754 int cMsg::syncSend(cMsgMessage &msg, const struct timespec *timeout) throw(cMsgException) {
1755 
1756  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1757 
1758 
1759  int response;
1760 
1761  int stat;
1762  if((stat=cMsgSyncSend(myDomainId,msg.myMsgPointer,timeout,&response))!=CMSG_OK) {
1763  throw(cMsgException(cMsgPerror(stat),stat));
1764  }
1765  return(response);
1766 }
1767 
1768 
1769 //-----------------------------------------------------------------------------
1770 
1771 
1779 int cMsg::syncSend(cMsgMessage *msg, const struct timespec *timeout) throw(cMsgException) {
1780  return(cMsg::syncSend(*msg, timeout));
1781 }
1782 
1783 
1784 //-----------------------------------------------------------------------------
1785 
1786 
1800 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback *cb, void *userArg,
1801  const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
1802 
1803  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1804 
1805 
1806  int stat;
1807  void *handle;
1808 
1809 
1810  // create and fill dispatcher struct
1811  dispatcherStruct *d = new dispatcherStruct();
1812  d->cb=cb;
1813  d->userArg=userArg;
1814 
1815 
1816  // subscribe and get handle
1817  stat=cMsgSubscribe(myDomainId,
1818  (subject.size()<=0)?NULL:subject.c_str(),
1819  (type.size()<=0)?NULL:type.c_str(),
1820  callbackDispatcher,
1821  (void*)d,
1822  (cfg==NULL)?NULL:(cfg->config),
1823  &handle);
1824 
1825 
1826  // check if subscription accepted
1827  if(stat!=CMSG_OK) {
1828  delete(d);
1829  throw(cMsgException(cMsgPerror(stat),stat));
1830  }
1831 
1832 
1833  // add this subscription to internal list
1834  addSubscription(myDomainId,subject,type,d,handle);
1835 
1836 
1837  return(handle);
1838 }
1839 
1840 
1841 //-----------------------------------------------------------------------------
1842 
1843 
1857 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback &cb, void *userArg,
1858  const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
1859  return(cMsg::subscribe(subject, type, &cb, userArg, cfg));
1860 }
1861 
1862 
1863 //-----------------------------------------------------------------------------
1864 
1865 
1872 void cMsg::unsubscribe(void *handle) throw(cMsgException) {
1873 
1874  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1875 
1876 
1877  int stat;
1878 
1879 
1880  // remove subscription from internal list
1881  if(!deleteSubscription(myDomainId,handle)) {
1882  throw(cMsgException(cMsgPerror(CMSG_BAD_ARGUMENT),CMSG_BAD_ARGUMENT));
1883  }
1884 
1885 
1886  // unsubscribe
1887  if((stat=cMsgUnSubscribe(myDomainId,handle))!=CMSG_OK) {
1888  throw(cMsgException(cMsgPerror(stat),stat));
1889  }
1890 
1891 }
1892 
1893 
1894 //-----------------------------------------------------------------------------
1895 
1896 
1903 void cMsg::subscriptionPause(void *handle) throw(cMsgException) {
1904 
1905  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1906 
1907  int stat;
1908 
1909  // pause
1910  if((stat=cMsgSubscriptionPause(myDomainId,handle))!=CMSG_OK) {
1911  throw(cMsgException(cMsgPerror(stat),stat));
1912  }
1913 
1914 }
1915 
1916 
1917 //-----------------------------------------------------------------------------
1918 
1919 
1926 void cMsg::subscriptionResume(void *handle) throw(cMsgException) {
1927 
1928  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1929 
1930  int stat;
1931 
1932  // resume
1933  if((stat=cMsgSubscriptionResume(myDomainId,handle))!=CMSG_OK) {
1934  throw(cMsgException(cMsgPerror(stat),stat));
1935  }
1936 
1937 }
1938 
1939 
1940 //-----------------------------------------------------------------------------
1941 
1942 
1949 void cMsg::subscriptionQueueClear(void *handle) throw(cMsgException) {
1950 
1951  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1952 
1953  int stat;
1954 
1955  // clear Q
1956  if((stat=cMsgSubscriptionQueueClear(myDomainId,handle))!=CMSG_OK) {
1957  throw(cMsgException(cMsgPerror(stat),stat));
1958  }
1959 
1960 }
1961 
1962 
1963 //-----------------------------------------------------------------------------
1964 
1965 
1974 
1975  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1976 
1977  int stat, count;
1978 
1979  // get Q count
1980  if((stat=cMsgSubscriptionQueueCount(myDomainId,handle,&count))!=CMSG_OK) {
1981  throw(cMsgException(cMsgPerror(stat),stat));
1982  }
1983  return count;
1984 
1985 }
1986 
1987 
1988 //-----------------------------------------------------------------------------
1989 
1990 
1999 
2000  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2001 
2002  int stat, count;
2003 
2004  // total # of messages callback received
2005  if((stat=cMsgSubscriptionMessagesTotal(myDomainId,handle,&count))!=CMSG_OK) {
2006  throw(cMsgException(cMsgPerror(stat),stat));
2007  }
2008  return count;
2009 
2010 }
2011 
2012 
2013 //-----------------------------------------------------------------------------
2014 
2015 
2024 
2025  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2026 
2027  int stat, val;
2028 
2029  // Q is full?
2030  if((stat=cMsgSubscriptionQueueIsFull(myDomainId,handle,&val))!=CMSG_OK) {
2031  throw(cMsgException(cMsgPerror(stat),stat));
2032  }
2033  return (val == 0 ? false : true);
2034 
2035 }
2036 
2037 
2038 //-----------------------------------------------------------------------------
2039 
2040 
2050 cMsgMessage *cMsg::sendAndGet(cMsgMessage &sendMsg, const struct timespec *timeout) throw(cMsgException) {
2051 
2052  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2053 
2054 
2055  void *replyPtr;
2056 
2057  int stat;
2058  if((stat=cMsgSendAndGet(myDomainId,sendMsg.myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
2059  throw(cMsgException(cMsgPerror(stat),stat));
2060  }
2061 
2062  return(new cMsgMessage(replyPtr));
2063 }
2064 
2065 
2066 //-----------------------------------------------------------------------------
2067 
2068 
2078 cMsgMessage *cMsg::sendAndGet(cMsgMessage *sendMsg, const struct timespec *timeout) throw(cMsgException) {
2079 
2080  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2081 
2082 
2083  void *replyPtr;
2084 
2085  int stat;
2086  if((stat=cMsgSendAndGet(myDomainId,sendMsg->myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
2087  throw(cMsgException(cMsgPerror(stat),stat));
2088  }
2089 
2090  return(new cMsgMessage(replyPtr));
2091 }
2092 
2093 //-----------------------------------------------------------------------------
2094 
2095 
2106 cMsgMessage *cMsg::subscribeAndGet(const string &subject, const string &type, const struct timespec *timeout) throw(cMsgException) {
2107 
2108  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2109 
2110 
2111  void *replyPtr;
2112 
2113  int stat;
2114  if((stat=cMsgSubscribeAndGet(myDomainId,subject.c_str(),type.c_str(),timeout,&replyPtr))!=CMSG_OK) {
2115  throw(cMsgException(cMsgPerror(stat),stat));
2116  }
2117 
2118  return(new cMsgMessage(replyPtr));
2119 }
2120 
2121 
2122 //-----------------------------------------------------------------------------
2123 
2124 
2131 void cMsg::flush(const struct timespec *timeout) throw(cMsgException) {
2132 
2133  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2134 
2135 
2136  int stat;
2137  if((stat=cMsgFlush(myDomainId, timeout))!=CMSG_OK) {
2138  throw(cMsgException(cMsgPerror(stat),stat));
2139  }
2140 }
2141 
2142 
2143 //-----------------------------------------------------------------------------
2144 
2145 
2150 void cMsg::start(void) throw(cMsgException) {
2151 
2152  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2153 
2154 
2155  int stat;
2156  if((stat=cMsgReceiveStart(myDomainId))!=CMSG_OK) {
2157  throw(cMsgException(cMsgPerror(stat),stat));
2158  }
2159 }
2160 
2161 
2162 //-----------------------------------------------------------------------------
2163 
2164 
2169 void cMsg::stop(void) throw(cMsgException) {
2170 
2171  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2172 
2173 
2174  int stat;
2175  if((stat=cMsgReceiveStop(myDomainId))!=CMSG_OK) {
2176  throw(cMsgException(cMsgPerror(stat),stat));
2177  }
2178 }
2179 
2180 
2181 //-----------------------------------------------------------------------------
2182 
2183 
2189 string cMsg::getDescription(void) const {
2190  return(myDescr);
2191 }
2192 
2193 
2194 //-----------------------------------------------------------------------------
2195 
2196 
2202 string cMsg::getName(void) const {
2203  return(myName);
2204 }
2205 
2206 
2207 //-----------------------------------------------------------------------------
2208 
2209 
2215 string cMsg::getUDL(void) const {
2216  return(myUDL);
2217 }
2218 
2219 
2220 //-----------------------------------------------------------------------------
2221 
2222 
2230 void cMsg::setUDL(const string &udl) throw(cMsgException) {
2231 
2232  int stat;
2233  if((stat=cMsgSetUDL(myDomainId,udl.c_str()))!=CMSG_OK) {
2234  throw(cMsgException(cMsgPerror(stat),stat));
2235  }
2236 }
2237 
2238 
2239 //-----------------------------------------------------------------------------
2240 
2241 
2248 string cMsg::getCurrentUDL(void) const throw(cMsgException) {
2249  const char *s;
2250 
2251  int stat;
2252  if((stat=cMsgGetCurrentUDL(myDomainId,&s))!=CMSG_OK) {
2253  throw(cMsgException(cMsgPerror(stat),stat));
2254  };
2255 
2256  if(s==NULL) {
2257  return("null");
2258  } else {
2259  return(string(s));
2260  }
2261 }
2262 
2263 
2264 //-----------------------------------------------------------------------------
2265 
2266 
2273 bool cMsg::isConnected(void) const throw(cMsgException) {
2274 
2275  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2276 
2277 
2278  int stat,connected;
2279  if((stat=cMsgGetConnectState(myDomainId,&connected))!=CMSG_OK) {
2280  throw(cMsgException(cMsgPerror(stat),stat));
2281  }
2282  return(connected==1);
2283 }
2284 
2285 
2286 //-----------------------------------------------------------------------------
2287 
2288 
2295 bool cMsg::isReceiving(void) const throw(cMsgException) {
2296 
2297  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2298 
2299 
2300  int stat,receiving;
2301  if((stat=cMsgGetReceiveState(myDomainId,&receiving))!=CMSG_OK) {
2302  throw(cMsgException(cMsgPerror(stat),stat));
2303  }
2304  return(receiving==1);
2305 }
2306 
2307 
2308 //-----------------------------------------------------------------------------
2309 
2310 
2318 void cMsg::setShutdownHandler(cMsgShutdownHandler *handler, void* userArg) throw(cMsgException) {
2319 
2320  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2321 
2322 
2323  int stat;
2324  if((stat=cMsgSetShutdownHandler(myDomainId,handler,userArg))!=CMSG_OK) {
2325  throw(cMsgException(cMsgPerror(stat),stat));
2326  }
2327 }
2328 
2329 
2330 //-----------------------------------------------------------------------------
2331 
2332 
2340 void cMsg::shutdownClients(const string &client, int flag) throw(cMsgException) {
2341 
2342  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2343 
2344 
2345  int stat;
2346  if((stat=cMsgShutdownClients(myDomainId,client.c_str(),flag))!=CMSG_OK) {
2347  throw(cMsgException(cMsgPerror(stat),stat));
2348  }
2349 }
2350 
2351 
2352 //-----------------------------------------------------------------------------
2353 
2354 
2362 void cMsg::shutdownServers(const string &server, int flag) throw(cMsgException) {
2363 
2364  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2365 
2366 
2367  int stat;
2368  if((stat=cMsgShutdownServers(myDomainId,server.c_str(),flag))!=CMSG_OK) {
2369  throw(cMsgException(cMsgPerror(stat),stat));
2370  }
2371 }
2372 
2373 
2374 //-----------------------------------------------------------------------------
2375 
2376 
2384 cMsgMessage *cMsg::monitor(const string &monString) throw(cMsgException) {
2385 
2386  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2387 
2388  void *m = cMsgCreateMessage();
2389  if(m==NULL)throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
2390 
2391  int stat;
2392  if((stat=cMsgMonitor(myDomainId,monString.c_str(),&m))!=CMSG_OK) {
2393  cMsgFreeMessage(&m);
2394  throw(cMsgException(cMsgPerror(stat),stat));
2395  }
2396 
2397  return(new cMsgMessage(m));
2398 }
2399 
2400 
2401 //-----------------------------------------------------------------------------
2402 
2403 
2410  void cMsg::setMonitoringString(const string &monString) throw(cMsgException) {
2411 
2412  if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2413 
2414  int stat;
2415  if((stat=cMsgMonitor(myDomainId,monString.c_str(),NULL))!=CMSG_OK)
2416  throw(cMsgException(cMsgPerror(stat),stat));
2417 
2418  return;
2419 }
2420 
2421 
2422 //-----------------------------------------------------------------------------
2423 //-----------------------------------------------------------------------------
Interface defines callback method.
Definition: cMsg.hxx:315
Exception includes description and return code.
Definition: cMsg.hxx:53
virtual const char * what(void) const
Gets char* represention of exception.
Definition: cMsgWrapper.cc:295
virtual ~cMsgException(void)
Destructor does nothing.
Definition: cMsgWrapper.cc:268
virtual string toString(void) const
Gets string represention of exception.
Definition: cMsgWrapper.cc:280
cMsgException(void)
Empty constructor.
Definition: cMsgWrapper.cc:225
virtual cMsgMessage * subscribeAndGet(const string &subject, const string &type, const struct timespec *timeout=NULL)
Subscribes to subject/type, returns one matching message, then unsubscribes.
virtual bool isConnected(void) const
True if connected.
virtual int subscriptionMessagesTotal(void *handle)
Return the total number of messages passed to the given subscription's callback.
virtual void stop(void)
Disables delivery of messages to callbacks.
virtual void * subscribe(const string &subject, const string &type, cMsgCallback *cb, void *userArg, const cMsgSubscriptionConfig *cfg=NULL)
Subscribes to subject,type and specifies callback, userArg.
virtual void setShutdownHandler(cMsgShutdownHandler *handler, void *userArg)
Sets shutdown handler.
virtual void start(void)
Enables delivery of messages to callbacks.
virtual void unsubscribe(void *handle)
Unsubscribes.
virtual bool subscriptionQueueIsFull(void *handle)
Returns whether the given subscription callback's queue is full (true) or not.
virtual int syncSend(cMsgMessage &msg, const struct timespec *timeout=NULL)
Synchronously sends message.
virtual void send(cMsgMessage &msg)
Sends message.
virtual void shutdownServers(const string &server, int flag)
Shuts down a server.
virtual void flush(const struct timespec *timeout=NULL)
Flushes outgoing message queues.
virtual void subscriptionQueueClear(void *handle)
Clear all messages from the given subscription callback's queue.
virtual int subscriptionQueueCount(void *handle)
Return the number of messages currently in the given subscription callback's queue.
virtual string getName(void) const
Gets connection name.
virtual bool isReceiving(void) const
True if receiving messages.
virtual void subscriptionPause(void *handle)
Pause delivery of messages to the given subscription's callback.
virtual string getCurrentUDL(void) const
Gets UDL of the current connection, "null" if no connection.
virtual void subscriptionResume(void *handle)
Resume delivery of messages to the given subscription's callback if paused.
virtual void shutdownClients(const string &client, int flag)
Shuts down a client.
virtual void setMonitoringString(const string &monString)
Sets monitoring string.
virtual void disconnect(void)
Disconnects from cMsg system.
virtual void connect()
Connects to cMsg system.
virtual string getUDL(void) const
Gets connection UDL.
virtual cMsgMessage * monitor(const string &monString)
Returns domain-dependent monitoring information.
cMsg(const string &UDL, const string &name, const string &descr)
Constructor for cMsg system object.
virtual string getDescription(void) const
Gets connection description.
virtual void setUDL(const string &udl)
Sets the connection UDL.
virtual cMsgMessage * sendAndGet(cMsgMessage &sendMsg, const struct timespec *timeout=NULL)
Sends message and gets reply.
virtual ~cMsg(void)
Destructor disconects from cMsg system.
Class for wrapping cMsg message.
Definition: cMsg.hxx:79
virtual cMsgMessage * nullResponse(void) const
Creates a null response message.
cMsgMessage(void)
Default constructor creates message.
Definition: cMsgWrapper.cc:309
virtual int getByteArrayOffset(void)
Gets offset in byte array.
Definition: cMsgWrapper.cc:584
virtual void setUserInt(int i)
Sets message user int.
Definition: cMsgWrapper.cc:746
virtual void setReliableSend(bool b)
Sets message reliable send flag.
virtual string getSenderHost(void) const
Gets message sender host.
Definition: cMsgWrapper.cc:941
virtual cMsgMessage * response(void) const
Creates a response message.
virtual string getType(void) const
Gets message type.
Definition: cMsgWrapper.cc:418
virtual void setType(const string &type)
Sets message type.
Definition: cMsgWrapper.cc:444
virtual string getSubscriptionUDL() const
Gets subscription UDL.
virtual void setSubject(const string &subject)
Sets message subject.
Definition: cMsgWrapper.cc:400
virtual string getReceiverHost(void) const
Gets message receiver host.
Definition: cMsgWrapper.cc:889
virtual void resetByteArrayLength()
Sets message region-of-interest byte array length to the full length of the array.
Definition: cMsgWrapper.cc:522
virtual string getSubscriptionDomain() const
Gets subscription domain.
virtual void makeResponse(const cMsgMessage &msg)
Makes a message a response message.
virtual bool getReliableSend(void) const
True if message sent via reliable send.
virtual void setByteArrayOffset(int offset)
Specifies offset in byte array.
Definition: cMsgWrapper.cc:567
virtual string getReceiver(void) const
Gets message receiver.
Definition: cMsgWrapper.cc:863
virtual bool needToSwap(void) const
True if need to swap byte array.
Definition: cMsgWrapper.cc:704
virtual string getSubscriptionSubject() const
Gets subscription subject.
virtual void setByteArrayEndian(int endian)
Sets endian-ness of message byte array.
Definition: cMsgWrapper.cc:684
virtual int getVersion(void) const
Gets cMsg version.
Definition: cMsgWrapper.cc:801
virtual int getByteArrayLength(void)
Gets message region-of-interest byte array length.
Definition: cMsgWrapper.cc:535
virtual string getSubject(void) const
Gets message subject.
Definition: cMsgWrapper.cc:374
virtual void setGetResponse(bool b)
Makes message a get response message.
virtual char * getByteArray(void)
Gets byte array.
Definition: cMsgWrapper.cc:638
virtual void setUserTime(const struct timespec &userTime)
Sets message user time.
Definition: cMsgWrapper.cc:784
virtual string toString(void) const
Gets xml representation of message.
virtual string getDomain(void) const
Gets message domain.
Definition: cMsgWrapper.cc:837
virtual void setText(const string &text)
Sets message text.
Definition: cMsgWrapper.cc:488
virtual int getUserInt(void) const
Gets message user int.
Definition: cMsgWrapper.cc:725
virtual bool isGetResponse(void) const
True if message is a get response.
virtual string getSubscriptionType() const
Gets subscription type.
virtual void setNullGetResponse(bool b)
Makes message a null response message.
void * myMsgPointer
Pointer to C message structure.
Definition: cMsg.hxx:304
virtual void makeNullResponse(const cMsgMessage &msg)
Makes a message a null response message.
virtual ~cMsgMessage(void)
Destructor frees C message pointer struct.
Definition: cMsgWrapper.cc:360
virtual string getSender(void) const
Gets message sender.
Definition: cMsgWrapper.cc:915
virtual int getByteArrayLengthFull(void)
Gets message full byte array length.
Definition: cMsgWrapper.cc:551
virtual void setByteArray(char *array, int length)
Specifies byte array by copying it.
Definition: cMsgWrapper.cc:602
virtual void setByteArrayNoCopy(char *array, int length)
Specifies the byte array by only copying the pointer to the array.
Definition: cMsgWrapper.cc:621
virtual bool isNullGetResponse(void) const
True if message is a NULL get response.
virtual bool isGetRequest(void) const
True if message is a get request.
virtual void setByteArrayLength(int length)
Sets message region-of-interest byte array length.
Definition: cMsgWrapper.cc:506
virtual int getByteArrayEndian(void)
Gets endian-ness of message byte array.
Definition: cMsgWrapper.cc:659
virtual string getText(void) const
Gets message text.
Definition: cMsgWrapper.cc:462
virtual int getSubscriptionCueSize(void) const
Gets current subscription cue size.
virtual cMsgMessage * copy(void) const
Copies a message.
Definition: cMsgWrapper.cc:821
Manages subscriptions configurations.
Definition: cMsg.hxx:330
virtual void setMessagesPerThread(int mpt)
Sets max messages per thread.
virtual size_t getStackSize(void) const
Gets message stack size.
virtual void setMustSerialize(bool mustSerialize)
Sets must serialize flag.
virtual void setMaxThreads(int max)
Sets max callback threads.
cMsgSubscribeConfig * config
Pointer to subscription config struct.
Definition: cMsg.hxx:352
virtual void setMaySkip(bool maySkip)
Sets message skip permission.
virtual void setStackSize(size_t size)
Sets message stack size.
virtual int getMaxThreads(void) const
Gets max callback threads.
virtual int getMessagesPerThread(void) const
Gets max messages per thread.
virtual ~cMsgSubscriptionConfig(void)
Deletes subscription config.
virtual void setSkipSize(int size)
Sets skip size.
virtual int getMaxCueSize(void) const
Gets max cue size.
cMsgSubscriptionConfig(void)
Constructor creates empty subscription config.
virtual int getSkipSize(void) const
Gets skip size.
virtual bool getMustSerialize(void) const
Gets must serialize flag.
virtual bool getMaySkip(void) const
True if may skip messages upon overflow.
virtual void setMaxCueSize(int size)
Sets max cue size.
All cMsg symbols reside in the cmsg namespace.
Definition: cMsg.hxx:41