AlcapDAQ  1
old_mevb.cpp
Go to the documentation of this file.
1 /********************************************************************\
2 Name: mevb.c
3 Created by: Pierre-Andre Amaudruz
4 
5 Contents: Main Event builder task.
6 $Log: mevb.cpp,v $
7 Revision 1.1 2003/07/17 18:52:54 fegray
8 Converted eventbuilder to C++.
9 
10 Revision 1.4 2003/05/27 16:26:20 mucap
11 Update to CVS version of MIDAS.
12 
13 Revision 1.3 2003/05/13 19:27:04 mucap
14 Fixes to allow a subset of the crates to participate in an event. Also,
15 this pulls in Pierre's CVS changes to the generic eventbuilder code.
16 
17 Revision 1.9 2002/10/07 17:04:01 pierre
18 fix tr_stop request
19 
20 Revision 1.8 2002/09/28 00:48:33 pierre
21 Add EB_USER_ERROR handling, handFlush()
22 
23 Revision 1.7 2002/09/25 18:37:37 pierre
24 correct: header passing, user field, abort run
25 
26 Revision 1.6 2002/08/29 22:07:47 pierre
27 fix event header, double task, EOR
28 
29 Revision 1.5 2002/07/13 05:45:49 pierre
30 added swap before user function
31 
32 Revision 1.4 2002/06/14 04:59:08 pierre
33 revised for ybos
34 
35 Revision 1.3 2002/05/08 20:51:41 midas
36 Added extra parameter to function db_get_value()
37 
38 Revision 1.2 2002/01/17 23:34:14 pierre
39 doc++ format
40 
41 Revision 1.1.1.1 2002/01/17 19:49:54 pierre
42 Initial Version
43 
44 \********************************************************************/
45 
46 #include <stdio.h>
47 #include "midas.h"
48 #include "mevb.h"
49 #include "msystem.h"
50 #include "ybos.h"
51 
55 
56 DWORD max_event_size = MAX_EVENT_SIZE;
57 
58 HNDLE hDB, hKey, hStatKey;
60 
62 BOOL stopped = TRUE;
63 BOOL wheel = FALSE;
64 INT run_state=0;
68 INT gbl_run=0;
69 
70 INT (*meb_fragment_add)(char *, char *, INT *);
71 INT handFlush(INT);
72 INT source_booking(INT nfrag);
73 INT eb_mfragment_add(char * pdest, char * psrce, INT *size);
74 INT eb_yfragment_add(char * pdest, char * psrce, INT *size);
75 
76 INT eb_begin_of_run(INT, char *, char *);
77 INT eb_end_of_run(INT, char *);
78 INT eb_user(INT, EBUILDER_CHANNEL *, EVENT_HEADER *, void *, INT *);
79 
80 extern "C" {
81  extern INT ybos_event_swap (DWORD * pevt);
82 };
83 
84 /*--------------------------------------------------------------------*/
85 /* eb_mfragment_add_()
86 @memo append one fragment to current destination event.
87 @param fmt Data format
88 @param pdest Destination pointer
89 @param psrce Fragment source pointer
90 @param size Current destination event size (byte)
91 @return EB_SUCCESS
92 */
93 INT eb_mfragment_add(char * pdest, char * psrce, INT *size)
94 {
95  BANK_HEADER *psbh, *pdbh;
96  char *psdata, *pddata;
97  INT bksize;
98 
99  /* Condition for new EVENT the data_size should be ZERO */
100  *size = ((EVENT_HEADER *) pdest)->data_size;
101 
102  /* destination pointer */
103  pddata = pdest + *size + sizeof(EVENT_HEADER);
104 
105  if (*size) {
106  /* NOT the first fragment */
107 
108  /* Swap event source if necessary */
109  psbh = (BANK_HEADER *) (((EVENT_HEADER *)psrce)+1);
110  bk_swap(psbh, FALSE);
111 
112  /* source pointer */
113  psbh = (BANK_HEADER *)(((EVENT_HEADER *)psrce)+1);
114  psdata = (char *) (psbh+1);
115 
116  /* copy all banks without the bank header */
117  bksize = psbh->data_size;
118 
119  /* copy */
120  memcpy(pddata, psdata, bksize);
121 
122  /* update event size */
123  ((EVENT_HEADER *) pdest)->data_size += bksize;
124 
125  /* update bank size */
126  pdbh = (BANK_HEADER *)(((EVENT_HEADER *)pdest)+1);
127  pdbh->data_size += bksize;
128 
129  *size = ((EVENT_HEADER *) pdest)->data_size;
130  }
131  else {
132  /* First event without the event header but with the
133  bank header as the size is zero */
134  *size = ((EVENT_HEADER *) psrce)->data_size;
135 
136  /* Swap event if necessary */
137  psbh = (BANK_HEADER *) (((EVENT_HEADER *)psrce)+1);
138  bk_swap(psbh, FALSE);
139 
140  /* copy first fragment */
141  memcpy (pddata, psbh, *size);
142 
143  /* update destination event size */
144  ((EVENT_HEADER *) pdest)->data_size = *size;
145  }
146  return CM_SUCCESS;
147 }
148 
149 /*--------------------------------------------------------------------*/
150 INT eb_yfragment_add(char * pdest, char * psrce, INT *size)
151 {
152  /* pdest : EVENT_HEADER pointer
153  psrce : EVENT_HEADER pointer
154  Keep pbkh for later incrementation
155  */
156  char *psdata, *pddata;
157  DWORD *pslrl, *pdlrl;
158  INT i4frgsize, i1frgsize, status;
159 
160  /* Condition for new EVENT the data_size should be ZERO */
161  *size = ((EVENT_HEADER *) pdest)->data_size;
162 
163  /* destination pointer skip the header as it has been already
164  composed and the usere may have modified it on purpose (Midas Control) */
165  pddata = pdest + *size + sizeof(EVENT_HEADER);
166 
167  /* the Midas header is present for logger */
168  if (*size)
169  { /* already filled with a fragment */
170 
171  /* source pointer: number of DWORD (lrl included) */
172  pslrl = (DWORD *)(((EVENT_HEADER *)psrce)+1);
173 
174  /* Swap event if necessary */
175  status = ybos_event_swap(pslrl);
176 
177  /* copy done in bytes, do not include LRL */
178  psdata = (char *) (pslrl+1);
179 
180  /* copy size in I*4 (lrl included, remove it) */
181  i4frgsize = (*pslrl);
182  i1frgsize = 4 * i4frgsize;
183 
184  /* append fragment */
185  memcpy(pddata, psdata, i1frgsize);
186 
187  /* update Midas header event size */
188  ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
189 
190  /* update LRL size (I*4) */
191  pdlrl = (DWORD *)(((EVENT_HEADER *)pdest)+1);
192  *pdlrl += i4frgsize;
193 
194  /* Return event size in bytes */
195  *size = ((EVENT_HEADER *) pdest)->data_size;
196  }
197  else
198  { /* new destination event */
199  /* The composed event has already the MIDAS header.
200  which may have been modified by the user in ebuser.c
201  Will be stripped by the logger (YBOS).
202  Copy the first full event ( no EVID suppression )
203  First event (without the event header) */
204 
205  /* source pointer */
206  pslrl = (DWORD *)(((EVENT_HEADER *)psrce)+1);
207 
208  /* Swap event if necessary */
209  status = ybos_event_swap(pslrl);
210 
211  /* size in byte from the source midas header */
212  *size = ((EVENT_HEADER *) psrce)->data_size;
213 
214  /* copy first fragment */
215  memcpy (pddata, (char *) pslrl, *size);
216 
217  /* update destination Midas header event size */
218  ((EVENT_HEADER *) pdest)->data_size += *size;
219 
220  }
221  return CM_SUCCESS;
222 }
223 
224 /*--------------------------------------------------------------------*/
225 INT tr_prestart(INT rn, char *error)
226 {
227  INT fragn, status, size;
228 
230  gbl_run = rn;
231  printf("EBuilder-Starting New Run: %d\n", rn);
232 
233  /* Reset Destination statistics */
234  memset((char *)&ebstat, 0, sizeof(EBUILDER_STATISTICS));
235  db_set_record(hDB, hStatKey, &ebstat, sizeof(EBUILDER_STATISTICS), 0);
236  gbl_bytes_sent = 0;
237  gbl_events_sent = 0;
238 
239  /* Reset local Source statistics */
240  for (fragn=0 ; ; fragn++)
241  {
242  if (ebch[fragn].name[0] == 0)
243  break;
244  memset(&(ebch[fragn].stat), 0, sizeof(EBUILDER_STATISTICS));
245  }
246 
247  /* Update the user_field */
248  size = sizeof(ebset.user_field);
249  db_get_value(hDB, 0, "/Ebuilder/Settings/User Field"
250  , ebset.user_field, &size, TID_STRING, FALSE);
251 
252  /* Call BOR user function */
253  status = eb_begin_of_run(gbl_run, ebset.user_field, error);
254  if (status != EB_SUCCESS) {
255  cm_msg(MERROR, "eb_prestart"
256  , "run start aborted due to eb_begin_of_run (%d)", status);
257  return status;
258  }
259 
260  /* Book all fragment */
261  status = source_booking(fragn);
262  if (status != SUCCESS)
263  return status;
264 
265  /* Mark run start time for local purpose */
266  start_time = ss_millitime();
267 
268  /* local run state */
269  run_state = STATE_RUNNING;
270  stopped = FALSE;
272 
273  /* Reset global trigger mask */
274  cdemask = 0;
275  return CM_SUCCESS;
276 }
277 
278 /*--------------------------------------------------------------------*/
279 INT tr_stop(INT rn, char *error)
280 {
281  printf("\nEBuilder-Stopping Run: %d detected\n", rn);
282 
283  /* local stop */
285 
286  /* local stop time */
287  request_stop_time = ss_millitime();
288  return CM_SUCCESS;
289 }
290 
291 /*--------------------------------------------------------------------*/
292 void free_event_buffer(INT nfrag)
293 {
294  INT i;
295  for (i=0; i<nfrag; i++) {
296  if (ebch[i].pfragment) {
297  free(ebch[i].pfragment);
298  ebch[i].pfragment = NULL;
299  }
300  }
301 }
302 
303 
304 /*--------------------------------------------------------------------*/
306 {
307  int i, size, status;
308  char strout[256];
309 
310  /* Do Hand flush until better way to garantee the input buffer to be empty */
311  if (debug)
312  printf("Hand flushing system buffer... \n");
313  for (i=0;i<nfragment;i++) {
314  if (ebch[i].set.emask) do {
315  size = max_event_size;
316  status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
317  if (debug1) {
318  sprintf(strout
319  ,"booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d Last Ser:%d"
320  , i, ebch[i].hBuf, status
321  , ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
322  printf("%s\n", strout);
323  }
324  } while (status == BM_SUCCESS);
325  }
326 
327  /* Empty source buffer */
328  status = bm_empty_buffers();
329  if (status != BM_SUCCESS)
330  cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]",status);
331  stopped = TRUE;
332  run_state = STATE_STOPPED;
333  return status;
334 }
335 
336 
337 /*--------------------------------------------------------------------*/
338 INT source_booking(INT nfrag)
339 {
340  INT j, i, status, status1, status2;
341 
342  if(debug) printf("Entering booking\n");
343 
344  /* Book all the source channels */
345  for (i=0; i<nfrag ; i++)
346  {
347  /* Book only the requested event mask */
348  if (ebch[i].set.emask)
349  {
350  /* Connect channel to source buffer */
351  status1 = bm_open_buffer(ebch[i].set.buffer
352  , EVENT_BUFFER_SIZE , &(ebch[i].hBuf));
353 
354  if (debug)
355  printf("bm_open_buffer frag:%d handle:%d stat:%d\n",
356  i, ebch[i].hBuf, status1);
357  /* Register for specified channel event ID and Trigger mask */
358  status2 = bm_request_event(ebch[i].hBuf
359  , ebch[i].set.event_id
360  , ebch[i].set.trigger_mask
361  , GET_ALL, &ebch[i].req_id, NULL);
362  if (debug)
363  printf("bm_request_event frag:%d req_id:%d stat:%d\n",
364  i, ebch[i].req_id, status1);
365  if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
366  ((status2 != BM_SUCCESS) && (status2 != BM_CREATED)))
367  {
368  cm_msg(MERROR, "source_booking"
369  , "Open buffer/event request failure [%d %d %d]",
370  i, status1, status2 );
371  return BM_CONFLICT;
372  }
373 
374  /* allocate local source event buffer */
375  if (ebch[i].pfragment)
376  free(ebch[i].pfragment);
377  ebch[i].pfragment = (char *) malloc(max_event_size + sizeof(EVENT_HEADER));
378  if (debug)
379  printf("malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
380  if (ebch[i].pfragment == NULL)
381  {
382  free_event_buffer(nfrag);
383  cm_msg(MERROR, "source_booking", "Can't allocate space for buffer");
384  return BM_NO_MEMORY;
385  }
386  }
387  }
388 
389  /* Empty source buffer */
390  status = bm_empty_buffers();
391  if (status != BM_SUCCESS) {
392  cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]",status);
393  return status;
394  }
395 
396  if (debug)
397  {
398  printf("bm_empty_buffers stat:%d\n",status);
399  printf("Dest: mask:%x\n", ebset.emask);
400  for (j=0; ; j++)
401  {
402  if (ebch[j].name[0] == 0)
403  break;
404 
405  printf("%d)%s",j , ebch[j].name);
406  printf(" buff:%s", ebch[j].set.buffer);
407  printf(" msk#:%4.4x", ebch[j].set.emask);
408  printf(" ser#:%d", ebch[j].serial);
409  printf(" hbuf:%2d", ebch[j].hBuf);
410  printf(" rqid:%2d", ebch[j].req_id);
411  printf(" opst:%d", status1);
412  printf(" rqst:%d", status2);
413  printf(" evid:%2d", ebch[j].set.event_id);
414  printf(" tmsk:0x%4.4x\n", ebch[j].set.trigger_mask);
415  }
416  }
417 
418  return SUCCESS;
419 }
420 
421 /*--------------------------------------------------------------------*/
422 INT source_unbooking(INT nfrag)
423 {
424  INT i, status;
425 
426  /* Skip unbooking if already done */
427 #if 0
428  if (ebch[0].pfragment == NULL)
429  return EB_SUCCESS;
430 #endif
431 
432  /* unbook all source channels */
433  for (i=nfrag-1; i>=0 ; i--)
434  {
435  bm_empty_buffers();
436 
437  if (ebch[i].set.emask)
438  {
439  /* Remove event ID registration */
440  status = bm_delete_request(ebch[i].req_id);
441  if (debug)
442  printf("unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id, status);
443 
444  /* Close source buffer */
445  status = bm_close_buffer(ebch[i].hBuf);
446  if (debug)
447  printf("unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf, status);
448  if (status != BM_SUCCESS)
449  {
450  cm_msg(MERROR, "source_unbooking", "Close buffer[%d] stat:", i, status);
451  return status;
452  }
453  }
454  }
455 
456  /* release local event buffer memory */
457  free_event_buffer(nfrag);
458 
459  return EB_SUCCESS;
460 }
461 
462 /*--------------------------------------------------------------------*/
463 /* source_scan()
464 Scan all the fragment source once per call.
465 1) This will retrieve the full midas event not swapped (except the
466 MIDAS_HEADER) for each fragment if possible. The fragment will
467 be stored in the channel event pointer.
468 2a) if after a full nfrag path some frag are still not cellected, it
469 returns with the frag# missing for timeout check.
470 2b) If ALL fragments are present it will check the midas serial#
471 for a full match across all the fragments.
472 3a) If the serial check fails it returns with "event mismatch"
473 and will abort the event builder but not stop the run for now.
474 3b) If the serial check is passed, it will call the user_build function
475 where the destination event is going to be composed.
476 
477 @memo Scan all defined source and build a event if all fragment
478 are present.
479 @param fmt Fragment format type
480 @param nfragment number of fragment to collect
481 @param dest_hBuf Destination buffer handle
482 @param event destination point for built event
483 @return EB_NO_MORE_EVENT, EB_COMPOSE_TIMEOUT
484 if different then SUCCESS (bm_compose, rpc_sent error)
485 */
486 INT source_scan(INT fmt, INT nfragment, HNDLE dest_hBuf, char * dest_event)
487 {
488  static char bars[] = "|/-\\";
489  static int i_bar;
490  static DWORD serial;
491  DWORD *plrl;
492  INT i, j, status, size;
493  INT act_size;
494  BOOL found, event_mismatch;
495  BANK_HEADER *psbh;
496 
497  status = SUCCESS;
498 
499  /* Scan all channels at least once */
500  for(i=0 ; i<nfragment ; i++) {
501  /* Check if current channel needs to be received */
502  if ((ebset.emask & ebch[i].set.emask) & ~cdemask) {
503  /* Get fragment and store it in ebch[i].pfragment */
504  size = max_event_size;
505  status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
506  switch (status) {
507  case BM_SUCCESS : /* event received */
508  /* Mask event */
509  cdemask |= ebch[i].set.emask;
510 
511  /* Keep local serial */
512  ebch[i].serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
513 
514  /* Swap event depending on data format */
515  switch (fmt) {
516  case FORMAT_YBOS :
517  plrl = (DWORD *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
518  ybos_event_swap (plrl);
519  break;
520  case FORMAT_MIDAS :
521  psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
522  bk_swap(psbh, FALSE);
523  break;
524  }
525 
526  /* update local source statistics */
527  ebch[i].stat.events_sent++;
528 
529  if (debug1) {
530  printf("SUCC: ch:%d ser:%d Dest_emask:%d cdemask:%x emask:%x sz:%d\n"
531  , i, ebch[i].serial
532  , ebset.emask, cdemask, ebch[i].set.emask, size);
533  }
534  break;
535  case BM_ASYNC_RETURN : /* timeout */
536  ebch[i].timeout++;
537  if (debug1) {
538  printf("ASYNC: ch:%d ser:%d Dest_emask:%d cdemask:%x emask:%x sz:%d\n"
539  , i, ebch[i].serial
540  , ebset.emask, cdemask, ebch[i].set.emask, size);
541  }
542  break;
543  default : /* Error */
544  cm_msg(MERROR, "event_scan", "bm_receive_event error %d", status);
545  return status;
546  break;
547  }
548  } /* ~cdemask => next channel */
549  }
550 
551  /* Check if all fragments have been received */
552  if (cdemask == ebset.emask) { /* All fragment in */
553  /* Check if serial matches */
554  found = event_mismatch = FALSE;
555  /* Mark first serial */
556  for (j=0; j<nfragment; j++) {
557  if (ebch[j].set.emask && !found) {
558  serial = ebch[j].serial;
559  found = TRUE;
560  }
561  else {
562  if (ebch[j].set.emask && (serial != ebch[j].serial)) {
563  /* Event mismatch */
564  event_mismatch = TRUE;
565  }
566  }
567  }
568 
569  if (abort_requested) {
570  cdemask = 0;
571  return EB_SKIP;
572  }
573 
574  /* Global event mismatch */
575  if (event_mismatch) {
576  char str[256];
577  char strsub[128];
578  cdemask = 0;
579  strcpy(str, "event mismatch: ");
580  for (j=0;j<nfragment; j++) {
581  sprintf (strsub, "Ser[%d]:%d ", j, ebch[j].serial);
582  strcat (str, strsub);
583  }
584  if(debug) {
585  printf("%s\n", str);
586  }
587  }
588  else { /* serial number match */
589 
590  /* wheel display */
591  if (wheel && (serial % 1024)==0) {
592  printf("...%c ..Going on %1.0lf\r", bars[i_bar++ % 4], ebstat.events_sent);
593  fflush(stdout);
594  }
595 
596  /* Inform this is a NEW destination event building procedure */
597  memset(dest_event, 0, sizeof(EVENT_HEADER));
598  act_size = 0;
599 
600  /* Fill reserved header space of destination event with
601  final header information */
602  bm_compose_event((EVENT_HEADER *) dest_event
603  , ebset.event_id, ebset.trigger_mask,
604  act_size, ebch[0].serial);
605 
606  /* Pass fragments to user for final check before assembly */
607  status = eb_user(nfragment, ebch, (EVENT_HEADER *) dest_event
608  , (void *) ((EVENT_HEADER *)dest_event+1), &act_size);
609  if (status != SS_SUCCESS)
610  return status;
611 
612  /* Allow bypass of fragment assembly if user wants to do it on its own */
613  if (!ebset.user_build) {
614  for (j=0 ; j<nfragment ; j++) {
615  if (ebch[j].set.emask) {
616  status = meb_fragment_add(dest_event, ebch[j].pfragment, &act_size);
617  if (status != EB_SUCCESS) {
618  cm_msg(MERROR,"source_scan","compose fragment:%d current size:%d (%d)"
619  , j, act_size, status);
620  return EB_ERROR;
621  }
622  }
623  }
624  } /* skip user_build */
625 
626  /* Overall event to be sent */
627  act_size = ((EVENT_HEADER *)dest_event)->data_size + sizeof(EVENT_HEADER);
628 
629  /* Send event and wait for completion */
630  status = rpc_send_event(dest_hBuf, dest_event, act_size, SYNC);
631  if (status != BM_SUCCESS) {
632  if (debug)
633  printf("rpc_send_event returned error %d, event_size %d\n",
634  status, act_size);
635  cm_msg(MERROR,"EBuilder","rpc_send_event returned error %d",status);
636  return EB_ERROR;
637  }
638 
639  /* Keep track of the total byte count */
640  gbl_bytes_sent += act_size;
641 
642  /* update destination event count */
643  ebstat.events_sent++;
644  gbl_events_sent++;
645 
646  /* Reset mask and timeouts */
647  for (i=0;i<nfragment;i++)
648  ebch[i].timeout = 0;
649  cdemask = 0;
650  } /* serial match */
651  return EB_SUCCESS;
652  } /* cdemask == ebset.emask */
653 
654  return status;
655 }
656 
657 /*--------------------------------------------------------------------*/
658 int main(unsigned int argc,char **argv)
659 {
660  static char bars[] = "|\\-/";
661  static int i_bar;
662  char host_name[HOST_NAME_LENGTH], expt_name[HOST_NAME_LENGTH];
663  INT size, status;
664  DWORD nfragment, fragn;
665  char *dest_event;
666  DWORD last_time=0, actual_millitime=0, previous_event_sent=0;
667  DWORD i, j;
668  BOOL daemon=FALSE, flag = TRUE;
669  INT state, fmt;
670  HNDLE hBuf, hSubkey, hEKey, hSetKey, hChKey;
671  EBUILDER(ebuilder_str);
672  EBUILDER_CHANNEL(ebuilder_channel_str);
673  char strout[128];
674  KEY key;
675  /* init structure */
676  memset (&ebch[0], 0, sizeof(ebch));
677 
678  /* set default */
679  cm_get_environment (host_name, sizeof(host_name),
680  expt_name, sizeof(expt_name));
681 
682  /* get parameters */
683  for (i=1 ; i<argc ; i++)
684  {
685  if (argv[i][0] == '-' && argv[i][1] == 'd')
686  debug = TRUE;
687  else if (argv[i][0] == '-' && argv[i][1] == 'D')
688  daemon = TRUE;
689  else if (argv[i][0] == '-' && argv[i][1] == 'w')
690  wheel = TRUE;
691  else if (argv[i][0] == '-')
692  {
693  if (i+1 >= argc || argv[i+1][0] == '-')
694  goto usage;
695  if (strncmp(argv[i],"-e",2) == 0)
696  strcpy(expt_name, argv[++i]);
697  else if (strncmp(argv[i],"-h",2)==0)
698  strcpy(host_name, argv[++i]);
699  }
700  else
701  {
702 usage:
703  printf("usage: mevb [-h <Hostname>] [-e <Experiment>] [-d debug]\n");
704  printf(" -w show wheel -D to start as a daemon\n\n");
705  return 0;
706  }
707  }
708 
709  printf("Program mevb/EBuilder version 3 started\n\n");
710  if (daemon)
711  {
712  printf("Becoming a daemon...\n");
713  ss_daemon_init(FALSE);
714  }
715 
716  /* Connect to experiment */
717  status = cm_connect_experiment(host_name, expt_name, "EBuilder", NULL);
718  if (status != CM_SUCCESS)
719  return 1;
720 
721  /* check if Ebuilder is already running */
722  status = cm_exist("Ebuilder", FALSE);
723  if (status == CM_SUCCESS)
724  {
725  cm_msg(MERROR,"Ebuilder","Ebuilder running already!.\n");
726  cm_disconnect_experiment();
727  return 1;
728  }
729 
730  /* Connect to ODB */
731  cm_get_experiment_database(&hDB, &hKey);
732 
733  /* Setup tree */
734  if (db_find_key(hDB, 0, "EBuilder", &hEKey) != DB_SUCCESS)
735  db_create_record(hDB, 0, "EBuilder", strcomb(ebuilder_str));
736  db_find_key(hDB, 0, "EBuilder", &hEKey);
737 
738  /* EB setting handle */
739  db_find_key(hDB, hEKey, "Settings", &hSetKey);
740  size = sizeof(EBUILDER_SETTINGS);
741  status = db_get_record(hDB, hSetKey, &ebset, &size, 0);
742 
743  /* Get hostname for status page */
744  gethostname(ebset.hostname, sizeof(ebset.hostname));
745  size = sizeof(ebset.hostname);
746  db_set_value(hDB, hSetKey, "hostname", ebset.hostname, size, 1, TID_STRING);
747 
748  /* Get EB statistics */
749  db_find_key(hDB, hEKey, "Statistics", &hStatKey);
750 
751  /* extract format */
752  if (equal_ustring(ebset.format, "YBOS"))
753  fmt = FORMAT_YBOS;
754  else if (equal_ustring(ebset.format, "MIDAS"))
755  fmt = FORMAT_MIDAS;
756  else /* default format is MIDAS */
757  {
758  cm_msg(MERROR,"EBuilder", "Format not permitted");
759  goto error;
760  }
761 
762  /* Check for run condition */
763  size = sizeof(state);
764  db_get_value(hDB,0,"/Runinfo/state", &state, &size, TID_INT, TRUE);
765  if (state != STATE_STOPPED)
766  {
767  cm_msg(MTALK,"EBuilder","Run must be stopped before starting EBuilder");
768  goto error;
769  }
770 
771  /* Scan EB Channels */
772  if (db_find_key(hDB, hEKey, "Channels", &hChKey) != DB_SUCCESS)
773  {
774  db_create_record(hDB, hEKey, "Channels", strcomb(ebuilder_channel_str));
775  db_find_key(hDB, hEKey, "Channels", &hChKey);
776  }
777 
778  for (i=0, j=0, nfragment=0; i<MAX_CHANNELS ; i++)
779  {
780  db_enum_key(hDB, hChKey, i, &hSubkey);
781  if (!hSubkey)
782  break;
783  db_get_key(hDB, hSubkey, &key);
784  if (key.type == TID_KEY)
785  {
786  /* read channel record */
787  sprintf(ebch[j].name, "%s", key.name);
788  status = db_find_key(hDB, hSubkey, "Statistics", &(ebch[j].hStat));
789  status = db_find_key(hDB, hSubkey, "Settings", &hKey);
790  size = sizeof(EBUILDER_SETTINGS_CH);
791  status = db_get_record(hDB, hKey, &(ebch[j].set), &size, 0);
792  j++;
793  nfragment++;
794  }
795  }
796 
797  /* Register transition for reset counters */
798  if (cm_register_transition(TR_START, tr_prestart, 300) != CM_SUCCESS)
799  goto error;
800  if (cm_register_transition(TR_STOP, tr_stop, 700) != CM_SUCCESS)
801  goto error;
802 
803  if (debug)
804  cm_set_watchdog_params(TRUE, 0);
805 
806  /* Destination buffer */
807  status = bm_open_buffer(ebset.buffer, EVENT_BUFFER_SIZE, &hBuf);
808  if(debug)printf("bm_open_buffer dest returns %d\n",status);
809  if (status != BM_SUCCESS && status != BM_CREATED) {
810  printf("Error return from bm_open_buffer\n");
811  goto error;
812  }
813 
814  /* set the buffer write cache size */
815  status = bm_set_cache_size(hBuf, 0, 200000);
816  if(debug)printf("bm_set_cache_size dest returns %d\n",status);
817 
818  /* allocate destination event buffer */
819  dest_event = (char *) malloc(nfragment*(max_event_size + sizeof(EVENT_HEADER)));
820  memset(dest_event, 0, nfragment*(max_event_size + sizeof(EVENT_HEADER)));
821  if (dest_event == NULL) {
822  cm_msg(MERROR,"EBuilder","Not enough memory for event buffer\n");
823  goto error;
824  }
825 
826  /* Set fragment_add function based on the format */
827  if (fmt == FORMAT_MIDAS)
829  else if (fmt == FORMAT_YBOS)
831  else {
832  cm_msg(MERROR,"mevb","Unknown data format :%d", fmt);
833  goto error;
834  }
835 
836  /* Main event loop */
837  do {
838  if (run_state != STATE_RUNNING) {
839  /* skip the source scan and yield */
840  status = cm_yield(500);
841  if (wheel) {
842  printf("...%c Snoring on %1.0lf\r", bars[i_bar++ % 4], ebstat.events_sent);
843  fflush(stdout);
844  }
845  continue;
846  }
847 
848  /* scan source buffer and send event to destination
849  The source_scan() serves one event at the time.
850  The status returns:
851  EB_SUCCESS, BM_ASYNC_RETURN, EB_ERROR
852  In the case of no fragment found(timeout), a watchdog would
853  kick in for a fix amount of time. If timeout occur,
854  the run state is checked and memory channels are freed
855  */
856  status = source_scan(fmt, nfragment, hBuf, dest_event);
857  switch (status) {
858  case BM_ASYNC_RETURN:
859  // No event found for now:
860  // Check for timeout
861  for (fragn=0; fragn<nfragment ;fragn++) {
862  if (ebch[fragn].timeout > TIMEOUT) { /* Timeout */
863  if (stop_requested) { /* Stop */
864  if (debug) printf ("Stop requested on timeout %d\n", status);
865 
866  /* Flush local destination cache */
867  bm_flush_cache(hBuf, SYNC);
868 
869  /* Call user function */
870  eb_end_of_run(gbl_run, strout);
871 
872  /* Cleanup buffers */
873  handFlush(nfragment);
874 
875  /* Detach all source from midas */
876  source_unbooking(nfragment);
877 
878  /* Compose message */
879  stop_time = ss_millitime() - request_stop_time;
880  sprintf(strout,"Run %d Stop on frag#%d; events_sent %1.0lf DT:%d[ms]",
881  gbl_run, fragn, ebstat.events_sent, stop_time);
882 
883  /* Send message */
884  cm_msg(MINFO,"EBuilder","%s",strout);
885 
886  run_state = STATE_STOPPED;
888  break;
889  }
890  else { /* No stop requested */
891  ebch[fragn].timeout = 0;
892  status = cm_yield(100);
893  if (wheel) {
894  printf("...%c Timoing on %1.0lf\r", bars[i_bar++ % 4], ebstat.events_sent);
895  fflush(stdout);
896  }
897  }
898  }
899  //else { /* No timeout */
900  // status = cm_yield(50);
901  //}
902  } /* do loop */
903  break;
904  case EB_ERROR :
905  case EB_USER_ERROR :
907  if (status == EB_USER_ERROR)
908  cm_msg(MTALK,"EBuilder","Error signaled by user code - stopping run...");
909  else
910  cm_msg(MTALK,"EBuilder","Event mismatch - Stopping run...");
911  cdemask = 0;
912  if (cm_transition(TR_STOP, 0, NULL, 0, ASYNC, 0) != CM_SUCCESS) {
913  cm_msg(MERROR, "EBuilder", "Stop Transition request failed");
914  goto error;
915  }
916  break;
917  case EB_SUCCESS :
918  case EB_SKIP :
919  // Normal path if event has been assembled
920  // No yield in this case.
921  break;
922  default:
923  cm_msg(MERROR, "Source_scan", "unexpected return %d", status);
924  status = SS_ABORT;
925  }
926 
927  /* EB job done, update statistics if its time */
928 
929  /* Check if it's time to do statistics job */
930  if ((actual_millitime = ss_millitime()) - last_time > 1000) {
931  /* Force event to appear at the destination if Ebuilder is remote */
932  rpc_flush_event();
933  /* Force event ot appear at the destination if Ebuilder is local */
934  bm_flush_cache(hBuf, ASYNC);
935 
936  /* update all source statistics */
937  for (j=0 ; j<nfragment ; j++) {
938 
939  /* Compute statistics */
940  if ((actual_millitime > start_time) && ebch[j].stat.events_sent) {
941  ebch[j].stat.events_per_sec_ = ebch[j].stat.events_sent
942  / ((actual_millitime-last_time)/1000.0);
943 
944  /* Update ODB channel statistics */
945  db_set_record(hDB, ebch[j].hStat
946  , &(ebch[j].stat)
947  , sizeof(EBUILDER_STATISTICS), 0);
948  }
949  }
950 
951  /* Compute destination statistics */
952  if ((actual_millitime > start_time) && ebstat.events_sent) {
954  / ((actual_millitime-last_time)/1000.0) ;
955 
957  /1024.0/((actual_millitime-last_time)/1000.0);
958 
959  /* update destination statistics */
960  db_set_record(hDB, hStatKey
961  , &ebstat
962  , sizeof(EBUILDER_STATISTICS), 0);
963  }
964 
965  /* Keep track of last ODB update */
966  last_time = ss_millitime();
967 
968  /* Reset local rate counters */
969  gbl_events_sent = 0;
970  gbl_bytes_sent = 0;
971 
972  /* Yield for system messages */
973  status = cm_yield(50);
974  if (wheel && (run_state != STATE_RUNNING)) {
975  printf("...%c Idleing on %1.0lf\r", bars[i_bar++ % 4], ebstat.events_sent);
976  fflush(stdout);
977  }
978  }
979  } while (status != RPC_SHUTDOWN && status != SS_ABORT);
980  if (status == SS_ABORT)
981  goto error;
982  else
983  goto exit;
984 
985 error:
986  cm_msg(MTALK,"EBuilder","Event builder error. Check messages");
987 
988 exit:
989  /* Detach all source from midas */
990  printf("EBuilder-Unbooking\n");
991  source_unbooking(nfragment);
992 
993  /* Free local memory */
994  free_event_buffer(nfragment);
995 
996  /* Clean disconnect from midas */
997  cm_disconnect_experiment();
998  return 0;
999 }