AlcapDAQ  1
new_mevb.cpp
Go to the documentation of this file.
1 /********************************************************************\
2 Name: mevb.c
3 Created by: Pierre-Andre Amaudruz
4 
5 Contents: Main Event builder task.
6 
7 ** muCap patches **
8 - converted from C to C++: added extern "C" on ybos_swap_event
9 - added protection on emask to avoid touching crates that are disabled
10  in source unbooking and fragment assembly.
11 
12 $Log: mevb.c,v $
13 Revision 1.16 2004/10/07 20:08:34 pierre
14 1.9.5
15 
16 Revision 1.15 2004/10/04 23:55:28 pierre
17 move ebuilder into equipment list
18 
19 Revision 1.14 2004/09/29 17:55:34 pierre
20 fix speed problem
21 
22 Revision 1.13 2004/09/29 16:20:31 pierre
23 change Ebuilder structure
24 
25 Revision 1.12 2004/01/08 08:40:08 midas
26 Implemented standard indentation
27 
28 Revision 1.11 2004/01/08 06:48:26 pierre
29 Doxygen the file
30 
31 Revision 1.10 2003/08/19 23:26:36 pierre
32 fix cm_get_environment arg
33 
34 Revision 1.9 2002/10/07 17:04:01 pierre
35 fix tr_stop request
36 
37 Revision 1.8 2002/09/28 00:48:33 pierre
38 Add EB_USER_ERROR handling, handFlush()
39 
40 Revision 1.7 2002/09/25 18:37:37 pierre
41 correct: header passing, user field, abort run
42 
43 Revision 1.6 2002/08/29 22:07:47 pierre
44 fix event header, double task, EOR
45 
46 Revision 1.5 2002/07/13 05:45:49 pierre
47 added swap before user function
48 
49 Revision 1.4 2002/06/14 04:59:08 pierre
50 revised for ybos
51 
52 Revision 1.3 2002/05/08 20:51:41 midas
53 Added extra parameter to function db_get_value()
54 
55 Revision 1.2 2002/01/17 23:34:14 pierre
56 doc++ format
57 
58 Revision 1.1.1.1 2002/01/17 19:49:54 pierre
59 Initial Version
60 
61 \********************************************************************/
62 
64 /* @file mevb.c
65 The Event builder main file
66 */
67 
68 #include <stdio.h>
69 #include "midas.h"
70 #include "mevb.h"
71 #include "msystem.h"
72 #include "ybos.h"
73 
74 #define SERVER_CACHE_SIZE 100000 /* event cache before buffer */
75 
76 #define ODB_UPDATE_TIME 1000 /* 1 seconds for ODB update */
77 
78 #define DEFAULT_FE_TIMEOUT 60000 /* 60 seconds for watchdog timeout */
79 
82 
83 INT run_state; /* STATE_RUNNING, STATE_STOPPED, STATE_PAUSED */
86 DWORD actual_time; /* current time in seconds since 1970 */
87 DWORD actual_millitime; /* current time in milliseconds */
88 
89 char host_name[HOST_NAME_LENGTH];
90 char expt_name[NAME_LENGTH];
92 char buffer_name[NAME_LENGTH];
94 char *dest_event;
96 BOOL debug = FALSE, debug1 = FALSE;
97 
98 BOOL wheel = FALSE;
99 char bars[] = "|\\-/";
100 int i_bar;
103 
104 INT(*meb_fragment_add) (char *, char *, INT *);
105 INT handFlush(void);
106 INT source_booking(void);
107 INT source_unbooking(void);
108 INT close_buffers(void);
109 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info);
110 INT eb_mfragment_add(char *pdest, char *psrce, INT * size);
111 INT eb_yfragment_add(char *pdest, char *psrce, INT * size);
112 
113 INT eb_begin_of_run(INT, char *, char *);
114 INT eb_end_of_run(INT, char *);
115 INT eb_user(INT, BOOL mismatch, EBUILDER_CHANNEL *, EVENT_HEADER *, void *, INT *);
116 INT load_fragment(void);
117 INT scan_fragment(void);
118 extern char *frontend_name;
119 extern char *frontend_file_name;
120 extern BOOL frontend_call_loop;
121 
122 extern INT max_event_size;
123 extern INT max_event_size_frag;
124 extern INT event_buffer_size;
125 extern INT display_period;
126 extern INT ebuilder_init(void);
127 extern INT ebuilder_exit(void);
128 extern INT ebuilder_loop(void);
129 
130 extern EQUIPMENT equipment[];
131 extern "C" {
132  extern INT ybos_event_swap(DWORD * pevt);
133 };
134 
135 #define EQUIPMENT_COMMON_STR "\
136 Event ID = WORD : 0\n\
137 Trigger mask = WORD : 0\n\
138 Buffer = STRING : [32] SYSTEM\n\
139 Type = INT : 0\n\
140 Source = INT : 0\n\
141 Format = STRING : [8] FIXED\n\
142 Enabled = BOOL : 0\n\
143 Read on = INT : 0\n\
144 Period = INT : 0\n\
145 Event limit = DOUBLE : 0\n\
146 Num subevents = DWORD : 0\n\
147 Log history = INT : 0\n\
148 Frontend host = STRING : [32] \n\
149 Frontend name = STRING : [32] \n\
150 Frontend file name = STRING : [256] \n\
151 "
152 
153 #define EQUIPMENT_STATISTICS_STR "\
154 Events sent = DOUBLE : 0\n\
155 Events per sec. = DOUBLE : 0\n\
156 kBytes per sec. = DOUBLE : 0\n\
157 "
158 
159 /********************************************************************/
161 {
162  INT index, size, status;
163  char str[256];
164  EQUIPMENT_INFO *eq_info;
165  EQUIPMENT_STATS *eq_stats;
166  HNDLE hKey;
167 
168  /* get current ODB run state */
169  size = sizeof(run_state);
170  run_state = STATE_STOPPED;
171  db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
172  size = sizeof(run_number);
173  run_number = 1;
174  status = db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
175  assert(status == SUCCESS);
176 
177  /* scan EQUIPMENT table from mevb.C */
178  for (index = 0; equipment[index].name[0]; index++) {
179  eq_info = &equipment[index].info;
180  eq_stats = &equipment[index].stats;
181 
182  if (eq_info->event_id == 0) {
183  printf("\nEvent ID 0 for %s not allowed\n", equipment[index].name);
184  cm_disconnect_experiment();
185  ss_sleep(5000);
186  exit(0);
187  }
188 
189  /* init status */
190  equipment[index].status = EB_SUCCESS;
191 
192  sprintf(str, "/Equipment/%s/Common", equipment[index].name);
193 
194  /* get last event limit from ODB */
195  if (eq_info->eq_type != EQ_SLOW) {
196  db_find_key(hDB, 0, str, &hKey);
197  size = sizeof(double);
198  if (hKey)
199  db_get_value(hDB, hKey, "Event limit", &eq_info->event_limit, &size,
200  TID_DOUBLE, TRUE);
201  }
202 
203  /* Create common subtree */
204  status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, TRUE);
205  if (status != DB_SUCCESS) {
206  printf("Cannot check equipment record, status = %d\n", status);
207  ss_sleep(3000);
208  }
209  db_find_key(hDB, 0, str, &hKey);
210 
211  if (equal_ustring(eq_info->format, "YBOS"))
212  equipment[index].format = FORMAT_YBOS;
213  else if (equal_ustring(eq_info->format, "FIXED"))
214  equipment[index].format = FORMAT_FIXED;
215  else /* default format is MIDAS */
216  equipment[index].format = FORMAT_MIDAS;
217 
218  gethostname(eq_info->frontend_host, sizeof(eq_info->frontend_host));
219  strcpy(eq_info->frontend_name, full_frontend_name);
220  strcpy(eq_info->frontend_file_name, frontend_file_name);
221 
222  /* set record from equipment[] table in frontend.c */
223  db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
224 
225  /* get record once at the start equipment info */
226  size = sizeof(EQUIPMENT_INFO);
227  db_get_record(hDB, hKey, eq_info, &size, 0);
228 
229  /*---- Create just the key , leave it empty ---------------------------------*/
230  sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
231  db_create_key(hDB, 0, str, TID_KEY);
232  db_find_key(hDB, 0, str, &hKey);
233  equipment[index].hkey_variables = hKey;
234 
235  /*---- Create and initialize statistics tree -------------------*/
236  sprintf(str, "/Equipment/%s/Statistics", equipment[index].name);
237 
238  status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
239  if (status != DB_SUCCESS) {
240  printf("Cannot create/check statistics record, error %d\n", status);
241  ss_sleep(3000);
242  }
243 
244  status = db_find_key(hDB, 0, str, &hKey);
245  if (status != DB_SUCCESS) {
246  printf("Cannot find statistics record, error %d\n", status);
247  ss_sleep(3000);
248  }
249 
250  eq_stats->events_sent = 0;
251  eq_stats->events_per_sec = 0;
252  eq_stats->kbytes_per_sec = 0;
253 
254  /* open hot link to statistics tree */
255  status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS)
256  , MODE_WRITE, NULL, NULL);
257  if (status != DB_SUCCESS) {
258  cm_msg(MERROR, "register_equipment",
259  "Cannot open statistics record, error %d. Probably other FE is using it",
260  status);
261  ss_sleep(3000);
262  }
263 
264  /*---- open event buffer ---------------------------------------*/
265  if (eq_info->buffer[0]) {
266  status = bm_open_buffer(eq_info->buffer, EVENT_BUFFER_SIZE,
267  &equipment[index].buffer_handle);
268  if (status != BM_SUCCESS && status != BM_CREATED) {
269  cm_msg(MERROR, "register_equipment",
270  "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
271  and rebuild the system.");
272  return 0;
273  }
274 
275  /* set the default buffer cache size */
276  bm_set_cache_size(equipment[index].buffer_handle, 0, SERVER_CACHE_SIZE);
277  } else {
278  cm_msg(MERROR, "register_equipment", "Destination buffer must be present");
279  ss_sleep(3000);
280  exit(0);
281  }
282  }
283  return SUCCESS;
284 }
285 
286 /********************************************************************/
287 INT load_fragment(void)
288 {
289  INT i, size, type;
290  HNDLE hEqKey, hSubkey;
291  EQUIPMENT_INFO *eq_info;
292  KEY key;
293  char buffer[NAME_LENGTH];
294  char format[8];
295 
296  /* Get equipment pointer, only one eqp for now */
297  eq_info = &equipment[0].info;
298 
299  /* Scan Equipment/Common listing */
300  if (db_find_key(hDB, 0, "Equipment", &hEqKey) != DB_SUCCESS) {
301  cm_msg(MINFO, "load_fragment", "Equipment listing not found");
302  return EB_ERROR;
303  }
304 
305  /* Scan the Equipment list for fragment info collection */
306  for (i = 0, nfragment=0 ; ; i++) {
307  db_enum_key(hDB, hEqKey, i, &hSubkey);
308  if (!hSubkey)
309  break;
310  db_get_key(hDB, hSubkey, &key);
311  if (key.type == TID_KEY) {
312  /* Equipment name */
313  if (debug) printf("Equipment name:%s\n", key.name);
314  /* Check if equipment is EQ_EB */
315  size = sizeof(INT);
316  db_get_value(hDB, hSubkey, "common/type", &type, &size, TID_INT, 0);
317  size = sizeof(buffer);
318  db_get_value(hDB, hSubkey, "common/Buffer", buffer, &size, TID_STRING, 0);
319  size = sizeof(format);
320  db_get_value(hDB, hSubkey, "common/Format", format, &size, TID_STRING, 0);
321  /* Check if equipment match EB requirements */
322  if ((type & EQ_EB)
323  && (strncmp(buffer, buffer_name, strlen(buffer_name)) == 0)
324  && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
325  /* match=> fill internal eb structure */
326  strcpy(ebch[nfragment].format, format);
327  strcpy(ebch[nfragment].buffer, buffer);
328  size = sizeof(WORD);
329  db_get_value(hDB, hSubkey, "common/Trigger Mask", &ebch[nfragment].trigger_mask, &size, TID_WORD, 0);
330  size = sizeof(WORD);
331  db_get_value(hDB, hSubkey, "common/Event ID", &ebch[nfragment].event_id, &size, TID_WORD, 0);
332  nfragment++;
333  }
334  }
335  }
336 
337  printf("Found %d fragment matching EB setting\n", nfragment);
338  /* Point to the Ebuilder settings */
339  /* Set fragment_add function based on the format */
340  if (equipment[0].format == FORMAT_MIDAS)
342  else if (equipment[0].format == FORMAT_YBOS)
344  else {
345  cm_msg(MERROR, "mevb", "Unknown data format :%d", format);
346  return EB_ERROR;
347  }
348 
349  /* allocate destination event buffer */
350  dest_event = (char *) malloc(nfragment * (max_event_size + sizeof(EVENT_HEADER)));
351  memset(dest_event, 0, nfragment * (max_event_size + sizeof(EVENT_HEADER)));
352  if (dest_event == NULL) {
353  cm_msg(MERROR, "EBuilder", "%s: Not enough memory for event buffer", full_frontend_name);
354  return EB_ERROR;
355  }
356  return EB_SUCCESS;
357 }
358 
359 /********************************************************************/
360 INT scan_fragment(void)
361 {
362  INT fragn, status;
363  EQUIPMENT *eq;
364  EQUIPMENT_INFO *eq_info;
365 
366  /* Get equipment pointer, only one eqp for now */
367  eq_info = &equipment[0].info;
368 
369  /* Main event loop */
370  do {
371  switch (run_state) {
372  case STATE_STOPPED:
373  case STATE_PAUSED:
374  /* skip the source scan and yield */
375  status = cm_yield(500);
376  if (wheel) {
377  printf("...%c Snoring\r", bars[i_bar++ % 4]);
378  fflush(stdout);
379  }
380  break;
381  case STATE_RUNNING:
382  status = source_scan(equipment[0].format, eq_info);
383  switch (status) {
384  case BM_ASYNC_RETURN: // No event found for now, Check for timeout
385  for (fragn = 0; fragn < nfragment; fragn++) {
386  if (ebch[fragn].timeout > TIMEOUT) { /* Timeout */
387  if (stop_requested) { /* Stop */
388  if (debug) printf("Stop requested on timeout %d\n", status);
389  status = close_buffers();
390  break;
391  }
392  else { /* No stop requested but timeout */
393  if (wheel) {
394  printf("...%c Timoing on %1.0lf\r", bars[i_bar++ % 4],
395  eq->stats.events_sent);
396  fflush(stdout);
397  status = cm_yield(50);
398  }
399  }
400  }
401  //else { /* No timeout loop back */
402  } // for loop over all fragments
403  break;
404  case EB_ERROR:
405  case EB_USER_ERROR:
407  if (status == EB_USER_ERROR)
408  cm_msg(MTALK, "EBuilder", "%s: Error signaled by user code - stopping run...", full_frontend_name);
409  else
410  cm_msg(MTALK, "EBuilder", "%s: Event mismatch - Stopping run...", full_frontend_name);
411  if (cm_transition(TR_STOP, 0, NULL, 0, ASYNC, 0) != CM_SUCCESS) {
412  cm_msg(MERROR, "EBuilder", "%s: Stop Transition request failed", full_frontend_name);
413  return status;
414  }
415  if (debug) printf("Stop requested on Error %d\n", status);
416  status = close_buffers();
417  return status;
418  break;
419  case EB_SUCCESS:
420  case EB_SKIP:
421  // Normal path if event has been assembled
422  // No yield in this case.
423  break;
424  default:
425  cm_msg(MERROR, "Source_scan", "unexpected return %d", status);
426  status = SS_ABORT;
427  } // switch scan_source
428  break;
429  }
430  /* EB job done, update statistics if its time */
431  /* Check if it's time to do statistics job */
432  if ((actual_millitime = ss_millitime()) - last_time > 1000) {
433  /* Force event to appear at the destination if Ebuilder is remote */
434  rpc_flush_event();
435  /* Force event ot appear at the destination if Ebuilder is local */
436  bm_flush_cache(equipment[0].buffer_handle, ASYNC);
437 
438  status = cm_yield(10);
439 
440  eq = &equipment[0];
441  eq->stats.events_sent += eq->events_sent;
442  eq->stats.events_per_sec =
443  eq->events_sent / ((actual_millitime - last_time) / 1000.0);
444  eq->stats.kbytes_per_sec =
445  eq->bytes_sent / 1024.0 / ((actual_millitime - last_time) /
446  1000.0);
447  eq->bytes_sent = 0;
448  eq->events_sent = 0;
449  /* update destination statistics */
450  db_send_changed_records();
451  /* Keep track of last ODB update */
452  last_time = ss_millitime();
453  }
454  } while (status != RPC_SHUTDOWN && status != SS_ABORT);
455 
456  return status;
457  }
458 
459 /********************************************************************/
460 INT eb_mfragment_add(char *pdest, char *psrce, INT * size)
461 {
462  BANK_HEADER *psbh, *pdbh;
463  char *psdata, *pddata;
464  INT bksize;
465 
466  /* Condition for new EVENT the data_size should be ZERO */
467  *size = ((EVENT_HEADER *) pdest)->data_size;
468 
469  /* destination pointer */
470  pddata = pdest + *size + sizeof(EVENT_HEADER);
471 
472  if (*size) {
473  /* NOT the first fragment */
474 
475  /* Swap event source if necessary */
476  psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
477  bk_swap(psbh, FALSE);
478 
479  /* source pointer */
480  psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
481  psdata = (char *) (psbh + 1);
482 
483  /* copy all banks without the bank header */
484  bksize = psbh->data_size;
485 
486  /* copy */
487  memcpy(pddata, psdata, bksize);
488 
489  /* update event size */
490  ((EVENT_HEADER *) pdest)->data_size += bksize;
491 
492  /* update bank size */
493  pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
494  pdbh->data_size += bksize;
495 
496  *size = ((EVENT_HEADER *) pdest)->data_size;
497  } else {
498  /* First event without the event header but with the
499  bank header as the size is zero */
500  *size = ((EVENT_HEADER *) psrce)->data_size;
501 
502  /* Swap event if necessary */
503  psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
504  bk_swap(psbh, FALSE);
505 
506  /* copy first fragment */
507  memcpy(pddata, psbh, *size);
508 
509  /* update destination event size */
510  ((EVENT_HEADER *) pdest)->data_size = *size;
511  }
512  return CM_SUCCESS;
513 }
514 
515 /*--------------------------------------------------------------------*/
516 INT eb_yfragment_add(char *pdest, char *psrce, INT * size)
517 {
518  /* pdest : EVENT_HEADER pointer
519  psrce : EVENT_HEADER pointer
520  Keep pbkh for later incrementation
521  */
522  char *psdata, *pddata;
523  DWORD *pslrl, *pdlrl;
524  INT i4frgsize, i1frgsize, status;
525 
526  /* Condition for new EVENT the data_size should be ZERO */
527  *size = ((EVENT_HEADER *) pdest)->data_size;
528 
529  /* destination pointer skip the header as it has been already
530  composed and the usere may have modified it on purpose (Midas Control) */
531  pddata = pdest + *size + sizeof(EVENT_HEADER);
532 
533  /* the Midas header is present for logger */
534  if (*size) { /* already filled with a fragment */
535 
536  /* source pointer: number of DWORD (lrl included) */
537  pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
538 
539  /* Swap event if necessary */
540  status = ybos_event_swap(pslrl);
541 
542  /* copy done in bytes, do not include LRL */
543  psdata = (char *) (pslrl + 1);
544 
545  /* copy size in I*4 (lrl included, remove it) */
546  i4frgsize = (*pslrl);
547  i1frgsize = 4 * i4frgsize;
548 
549  /* append fragment */
550  memcpy(pddata, psdata, i1frgsize);
551 
552  /* update Midas header event size */
553  ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
554 
555  /* update LRL size (I*4) */
556  pdlrl = (DWORD *) (((EVENT_HEADER *) pdest) + 1);
557  *pdlrl += i4frgsize;
558 
559  /* Return event size in bytes */
560  *size = ((EVENT_HEADER *) pdest)->data_size;
561  } else { /* new destination event */
562  /* The composed event has already the MIDAS header.
563  which may have been modified by the user in ebuser.c
564  Will be stripped by the logger (YBOS).
565  Copy the first full event ( no EVID suppression )
566  First event (without the event header) */
567 
568  /* source pointer */
569  pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
570 
571  /* Swap event if necessary */
572  status = ybos_event_swap(pslrl);
573 
574  /* size in byte from the source midas header */
575  *size = ((EVENT_HEADER *) psrce)->data_size;
576 
577  /* copy first fragment */
578  memcpy(pddata, (char *) pslrl, *size);
579 
580  /* update destination Midas header event size */
581  ((EVENT_HEADER *) pdest)->data_size += *size;
582 
583  }
584  return CM_SUCCESS;
585 }
586 
587 /*--------------------------------------------------------------------*/
588 INT tr_start(INT rn, char *error)
589 {
590  EBUILDER(ebuilder_str);
591  INT status, size, i;
592  char str[128];
593  KEY key;
594  HNDLE hKey, hEqkey, hEqFRkey;
595  EQUIPMENT_INFO *eq_info;
596 
597 
598  eq_info = &equipment[0].info;
599 
600  /* Get update eq_info from ODB */
601  sprintf(str, "/Equipment/%s/Common", equipment[0].name);
602  status = db_find_key(hDB, 0, str, &hKey);
603  size = sizeof(EQUIPMENT_INFO);
604  db_get_record(hDB, hKey, eq_info, &size, 0);
605 
606  ebset.nfragment = nfragment;
607 
608  /* reset serial numbers */
609  for (i = 0; equipment[i].name[0]; i++) {
610  equipment[i].serial_number = 1;
611  equipment[i].subevent_number = 0;
612  equipment[i].stats.events_sent = 0;
613  equipment[i].odb_in = equipment[i].odb_out = 0;
614  }
615 
616  /* Get / Set Settings */
617  sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
618  if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
619  status = db_create_record(hDB, 0, str, strcomb(ebuilder_str));
620  }
621 
622  /* Keep Key on Ebuilder/Settings */
623  sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
624  if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
625  cm_msg(MINFO, "load_fragment", "/Equipment/%s/Settings not found", equipment[0].name);
626  }
627 
628  /* Update or Create User_field */
629  size = sizeof(ebset.user_field);
630  status = db_get_value(hDB, hEqkey, "User Field", ebset.user_field, &size, TID_STRING, TRUE);
631 
632  /* Update or Create User_Build */
633  size = sizeof(ebset.user_build);
634  status = db_get_value(hDB, hEqkey, "User Build", &ebset.user_build, &size, TID_BOOL, TRUE);
635 
636  /* update ODB */
637  size = sizeof(INT);
638  status = db_set_value(hDB, hEqkey, "Number of Fragment", &ebset.nfragment, size, 1, TID_INT);
639 
640  /* Create or update the fragment request list */
641  status = db_find_key(hDB, hEqkey, "Fragment Required", &hEqFRkey);
642  status = db_get_key (hDB, hEqFRkey, &key);
643  if (key.num_values != ebset.nfragment) {
644  cm_msg(MINFO, "mevb", "Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.nfragment);
645  free (ebset.preqfrag);
646  size = ebset.nfragment*sizeof(BOOL);
647  ebset.preqfrag = malloc(size);
648  for (i=0 ; i<ebset.nfragment ; i++)
649  ebset.preqfrag[i] = TRUE;
650  status = db_set_value(hDB, hEqkey, "Fragment Required", ebset.preqfrag, size, ebset.nfragment, TID_BOOL);
651  } else { // Take from ODBedit
652  size = key.total_size;
653  free (ebset.preqfrag);
654  ebset.preqfrag = malloc(size);
655  status = db_get_data(hDB, hEqkey, ebset.preqfrag, &size, TID_BOOL);
656  }
657  /* Cleanup fragment flags */
658  free (ebset.received);
659  ebset.received = malloc(size);
660  for (i=0 ; i < ebset.nfragment ; i++)
661  ebset.received[i] = FALSE;
662 
663  /* Call BOR user function */
664  status = eb_begin_of_run(run_number, ebset.user_field, error);
665  if (status != EB_SUCCESS) {
666  cm_msg(MERROR, "eb_prestart", "run start aborted due to eb_begin_of_run (%d)",
667  status);
668  return status;
669  }
670 
671  /* Book all fragment */
672  status = source_booking();
673  if (status != SUCCESS)
674  return status;
675 
676  /* local run state */
677  run_state = STATE_RUNNING;
678  run_number = rn;
681  printf("%s-Starting New Run: %d\n", full_frontend_name, rn);
682 
683  /* Reset global trigger mask */
684  return CM_SUCCESS;
685 }
686 
687 /*--------------------------------------------------------------------*/
688 INT tr_stop(INT rn, char *error)
689 {
690  printf("\n%s-Stopping Run: %d detected\n", full_frontend_name, rn);
691 
692  /* local stop */
694 
695  /* local stop time */
696  request_stop_time = ss_millitime();
697  return CM_SUCCESS;
698 }
699 
700 /*--------------------------------------------------------------------*/
701 void free_event_buffer(INT nfrag)
702 {
703  INT i;
704  for (i = 0; i < nfrag; i++) {
705  if (ebch[i].pfragment) {
706  free(ebch[i].pfragment);
707  ebch[i].pfragment = NULL;
708  }
709  }
710 }
711 
712 /*--------------------------------------------------------------------*/
713 INT handFlush()
714 {
715  int i, size, status;
716  char strout[256];
717 
718  /* Do Hand flush until better way to garantee the input buffer to be empty */
719  if (debug)
720  printf("Hand flushing system buffer... \n");
721  for (i = 0; i < nfragment; i++) {
722  do {
723  size = max_event_size;
724  status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
725  if (debug1) {
726  sprintf(strout,
727  "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d Last Ser:%d",
728  i, ebch[i].hBuf, status,
729  ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
730  printf("%s\n", strout);
731  }
732  } while (status == BM_SUCCESS);
733  }
734 
735  /* Empty source buffer */
736  status = bm_empty_buffers();
737  if (status != BM_SUCCESS)
738  cm_msg(MERROR, "handFlush", "bm_empty_buffers failure [%d]", status);
739  run_state = STATE_STOPPED;
740  return status;
741 }
742 
743 
744 /*--------------------------------------------------------------------*/
745 INT source_booking()
746 {
747  INT j, i, status, status1, status2;
748 
749  if (debug)
750  printf("Entering booking\n");
751 
752  /* Book all the source channels */
753  for (i = 0; i < nfragment; i++) {
754  /* Book only the requested event mask */
755  if (ebset.preqfrag[i]) {
756  /* Connect channel to source buffer */
757  status1 = bm_open_buffer(ebch[i].buffer, EVENT_BUFFER_SIZE, &(ebch[i].hBuf));
758 
759  if (debug)
760  printf("bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
761  i, ebch[i].buffer, ebch[i].hBuf, status1);
762  /* Register for specified channel event ID and Trigger mask */
763  status2 =
764  bm_request_event(ebch[i].hBuf, ebch[i].event_id,
765  ebch[i].trigger_mask, GET_ALL, &ebch[i].req_id, NULL);
766  if (debug)
767  printf("bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
768  i, ebch[i].event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
769  if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
770  ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
771  cm_msg(MERROR, "source_booking",
772  "Open buffer/event request failure [%d %d %d]", i, status1, status2);
773  return BM_CONFLICT;
774  }
775 
776  /* allocate local source event buffer */
777  if (ebch[i].pfragment)
778  free(ebch[i].pfragment);
779  ebch[i].pfragment = (char *) malloc(max_event_size + sizeof(EVENT_HEADER));
780  if (debug)
781  printf("malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
782  if (ebch[i].pfragment == NULL) {
783  free_event_buffer(nfragment);
784  cm_msg(MERROR, "source_booking", "Can't allocate space for buffer");
785  return BM_NO_MEMORY;
786  }
787  }
788  }
789 
790  /* Empty source buffer */
791  status = bm_empty_buffers();
792  if (status != BM_SUCCESS) {
793  cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]", status);
794  return status;
795  }
796 
797  if (debug) {
798  printf("bm_empty_buffers stat:%d\n", status);
799  for (j = 0; j < ebset.nfragment; j++) {
800  printf(" buff:%s", ebch[j].buffer);
801  printf(" ser#:%d", ebch[j].serial);
802  printf(" hbuf:%2d", ebch[j].hBuf);
803  printf(" rqid:%2d", ebch[j].req_id);
804  printf(" opst:%d", status1);
805  printf(" rqst:%d", status2);
806  printf(" evid:%2d", ebch[j].event_id);
807  printf(" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
808  }
809  }
810 
811  return SUCCESS;
812 }
813 
814 /*--------------------------------------------------------------------*/
815 INT source_unbooking()
816 {
817  INT i, status;
818 
819 #if 0
820  /* Skip unbooking if already done */
821  if (ebch[0].pfragment == NULL)
822  return EB_SUCCESS;
823 #endif
824 
825  /* unbook all source channels */
826  for (i = 0; i< nfragment; i++) {
827  bm_empty_buffers();
828 
829  if (ebch[i].set.emask) {
830  /* Remove event ID registration */
831  status = bm_delete_request(ebch[i].req_id);
832  if (debug)
833  printf("unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id,
834  status);
835 
836  /* Close source buffer */
837  status = bm_close_buffer(ebch[i].hBuf);
838  if (debug)
839  printf("unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf,
840  status);
841  if (status != BM_SUCCESS) {
842  cm_msg(MERROR, "source_unbooking", "Close buffer[%d] stat:", i, status);
843  return status;
844  }
845  }
846  }
847 
848  /* release local event buffer memory */
849  free_event_buffer(nfragment);
850 
851  return EB_SUCCESS;
852 }
853 
854 /*--------------------------------------------------------------------*/
855 INT close_buffers(void)
856 {
857  INT status;
858  char error[256];
859  EQUIPMENT *eq;
860 
861  eq = &equipment[0];
862 
863  /* Flush local destination cache */
864  bm_flush_cache(equipment[0].buffer_handle, SYNC);
865  /* Call user function */
866  eb_end_of_run(run_number, error);
867  /* Cleanup buffers */
868  handFlush();
869  /* Detach all source from midas */
870  status = source_unbooking();
871 
872  /* Compose message */
873  stop_time = ss_millitime() - request_stop_time;
874  sprintf(error, "Run %d Stop after %1.0lf events sent DT:%d[ms]",
875  run_number, eq->stats.events_sent, stop_time);
876  cm_msg(MINFO, "EBuilder", "%s", error);
877 
878  run_state = STATE_STOPPED;
880  return status;
881 }
882 
883 /********************************************************************/
905 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info)
906 {
907  static char bars[] = "|/-\\";
908  static int i_bar;
909  static DWORD serial;
910  DWORD *plrl;
911  BOOL complete;
912  INT i, j, status, size;
913  INT act_size;
914  BOOL found, event_mismatch;
915  BANK_HEADER *psbh;
916 
917  status = SUCCESS;
918 
919  /* Scan all channels at least once */
920  for (i = 0; i < nfragment; i++) {
921  /* Check if current channel needs to be received */
922  if (ebset.preqfrag[i] && !ebset.received[i]) {
923  /* Get fragment and store it in ebch[i].pfragment */
924  size = max_event_size;
925  status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
926  switch (status) {
927  case BM_SUCCESS: /* event received */
928  /* Mask event */
929  ebset.received[i] = TRUE;
930  /* Keep local serial */
931  ebch[i].serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
932 
933  /* Swap event depending on data format */
934  switch (fmt) {
935  case FORMAT_YBOS:
936  plrl = (DWORD *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
937  ybos_event_swap(plrl);
938  break;
939  case FORMAT_MIDAS:
940  psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
941  bk_swap(psbh, FALSE);
942  break;
943  }
944 
945  if (debug1) {
946  printf("SUCC: ch:%d ser:%d rec:%d sz:%d\n", i,
947  ebch[i].serial, ebset.received[i], size);
948  }
949  break;
950  case BM_ASYNC_RETURN: /* timeout */
951  ebch[i].timeout++;
952  if (debug1) {
953  printf("ASYNC: ch:%d ser:%d rec:%d sz:%d\n", i,
954  ebch[i].serial, ebset.received[i], size);
955  }
956  break;
957  default: /* Error */
958  cm_msg(MERROR, "event_scan", "bm_receive_event error %d", status);
959  return status;
960  break;
961  }
962  } /* next channel */
963  }
964 
965  /* Check if all fragments have been received */
966  complete = FALSE;
967  for (i = 0; i < nfragment;i++) {
968  if (ebset.preqfrag[i] && !ebset.received[i])
969  break;
970  }
971  if (i == nfragment) {
972  complete = TRUE;
973  /* Check if serial matches */
974  found = event_mismatch = FALSE;
975  /* Check Serial, mark first serial */
976  for (i = 0; i < nfragment; i++) {
977  if (ebset.preqfrag[i] && ebset.received[i] && !found) {
978  serial = ebch[i].serial;
979  found = TRUE;
980  } else {
981  if (ebset.preqfrag[i] && ebset.received[i] && (serial != ebch[i].serial)) {
982  /* Event mismatch */
983  event_mismatch = TRUE;
984  }
985  }
986  }
987 
988  /* internal action in case of event mismatch */
989  if (event_mismatch && debug) {
990  char str[256];
991  char strsub[128];
992  strcpy(str, "event mismatch: ");
993  for (j = 0; j < nfragment; j++) {
994  sprintf(strsub, "Ser[%d]:%d ", j, ebch[j].serial);
995  strcat(str, strsub);
996  }
997  printf("event serial mismatch %s\n", str);
998  }
999 
1000  /* In any case reset destination buffer */
1001  memset(dest_event, 0, sizeof(EVENT_HEADER));
1002  act_size = 0;
1003 
1004  /* Fill reserved header space of destination event with
1005  final header information */
1006  bm_compose_event((EVENT_HEADER *) dest_event, eq_info->event_id, eq_info->trigger_mask,
1007  act_size, ebch[0].serial);
1008 
1009  /* Pass fragments to user with mismatch flag, for final check before assembly */
1010  status = eb_user(nfragment, event_mismatch, ebch
1011  , (EVENT_HEADER *) dest_event,(void *) ((EVENT_HEADER *) dest_event + 1), &act_size);
1012  if (status != EB_SUCCESS) {
1013  if (status == EB_SKIP) {
1014  /* Reset mask and timeouts as even thave been succesfully send */
1015  for (i = 0; i < nfragment; i++) {
1016  ebch[i].timeout = 0;
1017  ebset.received[i] = FALSE;
1018  }
1019  }
1020  return status; // Event mark as EB_SKIP or EB_ABORT by user
1021  }
1022 
1023  /* Allow bypass of fragment assembly if user did it on its own */
1024  if (!ebset.user_build) {
1025  for (j = 0; j < nfragment; j++) {
1026  if (ebch[j].set.emask) {
1027  status = meb_fragment_add(dest_event, ebch[j].pfragment, &act_size);
1028  if (status != EB_SUCCESS) {
1029  cm_msg(MERROR, "source_scan",
1030  "compose fragment:%d current size:%d (%d)", j, act_size, status);
1031  return EB_ERROR;
1032  }
1033  }
1034  }
1035  }
1036 
1037  /* Overall event to be sent */
1038  act_size = ((EVENT_HEADER *) dest_event)->data_size + sizeof(EVENT_HEADER);
1039 
1040  /* Send event and wait for completion */
1041  status = rpc_send_event(equipment[0].buffer_handle, dest_event, act_size, SYNC);
1042  if (status != BM_SUCCESS) {
1043  if (debug)
1044  printf("rpc_send_event returned error %d, event_size %d\n",
1045  status, act_size);
1046  cm_msg(MERROR, "EBuilder", "%s: rpc_send_event returned error %d", full_frontend_name, status);
1047  return EB_ERROR;
1048  }
1049 
1050  /* Keep track of the total byte count */
1051  equipment[0].bytes_sent += act_size;
1052 
1053  /* update destination event count */
1054  equipment[0].events_sent++;
1055 
1056  /* Reset mask and timeouts as even thave been succesfully send */
1057  for (i = 0; i < nfragment; i++) {
1058  ebch[i].timeout = 0;
1059  ebset.received[i] = FALSE;
1060  }
1061  } // all fragment recieved for this event
1062 
1063  return status;
1064 }
1065 
1066 /*--------------------------------------------------------------------*/
1067 int main(unsigned int argc, char **argv)
1068 {
1069  INT status;
1070  unsigned int i;
1071  BOOL daemon = FALSE;
1072 
1073  /* init structure */
1074  memset(&ebch[0], 0, sizeof(ebch));
1075 
1076  /* set default */
1078 
1079  /* get parameters */
1080  for (i = 1; i < argc; i++) {
1081  if (argv[i][0] == '-' && argv[i][1] == 'd')
1082  debug = TRUE;
1083  else if (argv[i][0] == '-' && argv[i][1] == 'D')
1084  daemon = TRUE;
1085  else if (argv[i][0] == '-' && argv[i][1] == 'w')
1086  wheel = TRUE;
1087  else if (argv[i][0] == '-') {
1088  if (i + 1 >= argc || argv[i + 1][0] == '-')
1089  goto usage;
1090  if (strncmp(argv[i], "-e", 2) == 0)
1091  strcpy(expt_name, argv[++i]);
1092  else if (strncmp(argv[i], "-h", 2) == 0)
1093  strcpy(host_name, argv[++i]);
1094  else if (strncmp(argv[i], "-b", 2) == 0)
1095  strcpy(buffer_name, argv[++i]);
1096  } else {
1097 usage:
1098  printf("usage: mevb [-h <Hostname>] [-e <Experiment>] -b <buffername> [-d debug]\n");
1099  printf(" -w show wheel -D to start as a daemon\n\n");
1100  return 0;
1101  }
1102  }
1103 
1104  printf("Program mevb version 5 started\n\n");
1105  if (daemon) {
1106  printf("Becoming a daemon...\n");
1107  ss_daemon_init(FALSE);
1108  }
1109 
1110  /* Check buffer arg */
1111  if (buffer_name[0] == 0) {
1112  printf("Buffer name must be specified with -b argument\n");
1113  goto exit;
1114  }
1115 
1116  /* Compose frontend name */
1118 
1119  /* Connect to experiment */
1120  status = cm_connect_experiment(host_name, expt_name, full_frontend_name, NULL);
1121  if (status != CM_SUCCESS) {
1122  ss_sleep(5000);
1123  goto exit;
1124  }
1125 
1126  if (debug)
1127  cm_set_watchdog_params(TRUE, 0);
1128 
1129  /* Connect to ODB */
1130  status = cm_get_experiment_database(&hDB, &hKey);
1131  if (status != EB_SUCCESS) {
1132  ss_sleep(5000);
1133  goto exit;
1134  }
1135 
1136  /* check if Ebuilder is already running */
1137  status = cm_exist(full_frontend_name, TRUE);
1138  if (status == CM_SUCCESS) {
1139  cm_msg(MERROR, "Ebuilder", "%s running already!.", full_frontend_name);
1140  cm_disconnect_experiment();
1141  goto exit;
1142  }
1143 
1144  if (ebuilder_init() != SUCCESS) {
1145  cm_disconnect_experiment();
1146  /* let user read message before window might close */
1147  ss_sleep(5000);
1148  goto exit;
1149  }
1150 
1151  /* Register single equipment */
1152  status = register_equipment();
1153  if (status != EB_SUCCESS) {
1154  ss_sleep(5000);
1155  goto exit;
1156  }
1157 
1158  /* Load Fragment info */
1159  status = load_fragment();
1160  if (status != EB_SUCCESS) {
1161  ss_sleep(5000);
1162  goto exit;
1163  }
1164 
1165  /* Register transition for reset counters */
1166  if (cm_register_transition(TR_START, tr_start, 300) != CM_SUCCESS)
1167  return status;
1168  if (cm_register_transition(TR_STOP, tr_stop, 700) != CM_SUCCESS)
1169  goto exit;
1170 
1171  /* Scan fragments... will stay in */
1172  status = scan_fragment();
1173  printf("%s-Out of scan_fragment\n", full_frontend_name);
1174 
1175  /* Detach all source from midas */
1176  printf("%s-Unbooking\n", full_frontend_name);
1177  source_unbooking();
1178 
1179  ebuilder_exit();
1180 
1181 exit:
1182  /* Free local memory */
1184 
1185  /* Clean disconnect from midas */
1186  cm_disconnect_experiment();
1187  return 0;
1188 }