AlcapDAQ  1
mevb.cpp
Go to the documentation of this file.
1 /********************************************************************\
2 Name: mevb.c
3 Created by: Pierre-Andre Amaudruz
4 
5 Contents: Main Event builder task.
6 
7 ** muCap patches **
8 - converted from C to C++: added extern "C" on ybos_swap_event and added
9  pointer type casts from malloc().
10 - added protection on emask to avoid touching crates that are disabled
11  in source unbooking and fragment assembly.
12 
13 $Log: mevb.cpp,v $
14 Revision 1.2 2005/10/31 14:11:37 mucap
15 (Fred)
16 Update to new major version of MIDAS event builder.
17 
18 Revision 1.16 2004/10/07 20:08:34 pierre
19 1.9.5
20 
21 Revision 1.15 2004/10/04 23:55:28 pierre
22 move ebuilder into equipment list
23 
24 Revision 1.14 2004/09/29 17:55:34 pierre
25 fix speed problem
26 
27 Revision 1.13 2004/09/29 16:20:31 pierre
28 change Ebuilder structure
29 
30 Revision 1.12 2004/01/08 08:40:08 midas
31 Implemented standard indentation
32 
33 Revision 1.11 2004/01/08 06:48:26 pierre
34 Doxygen the file
35 
36 Revision 1.10 2003/08/19 23:26:36 pierre
37 fix cm_get_environment arg
38 
39 Revision 1.9 2002/10/07 17:04:01 pierre
40 fix tr_stop request
41 
42 Revision 1.8 2002/09/28 00:48:33 pierre
43 Add EB_USER_ERROR handling, handFlush()
44 
45 Revision 1.7 2002/09/25 18:37:37 pierre
46 correct: header passing, user field, abort run
47 
48 Revision 1.6 2002/08/29 22:07:47 pierre
49 fix event header, double task, EOR
50 
51 Revision 1.5 2002/07/13 05:45:49 pierre
52 added swap before user function
53 
54 Revision 1.4 2002/06/14 04:59:08 pierre
55 revised for ybos
56 
57 Revision 1.3 2002/05/08 20:51:41 midas
58 Added extra parameter to function db_get_value()
59 
60 Revision 1.2 2002/01/17 23:34:14 pierre
61 doc++ format
62 
63 Revision 1.1.1.1 2002/01/17 19:49:54 pierre
64 Initial Version
65 
66 \********************************************************************/
67 
69 /* @file mevb.c
70 The Event builder main file
71 */
72 
73 #include <stdio.h>
74 #include "midas.h"
75 #include "mevb.h"
76 #include "msystem.h"
77 #include "ybos.h"
78 
79 #define SERVER_CACHE_SIZE 0 /* event cache before buffer */
80 
81 #define ODB_UPDATE_TIME 1000 /* 1 seconds for ODB update */
82 
83 #define DEFAULT_FE_TIMEOUT 60000 /* 60 seconds for watchdog timeout */
84 
87 
88 INT run_state; /* STATE_RUNNING, STATE_STOPPED, STATE_PAUSED */
91 DWORD actual_time; /* current time in seconds since 1970 */
92 DWORD actual_millitime; /* current time in milliseconds */
93 
94 char host_name[HOST_NAME_LENGTH];
95 char expt_name[NAME_LENGTH];
97 char buffer_name[NAME_LENGTH];
99 char *dest_event;
102 
103 BOOL wheel = FALSE;
104 char bars[] = "|\\-/";
105 int i_bar;
108 
109 INT(*meb_fragment_add) (char *, char *, INT *);
110 INT handFlush(void);
111 INT source_booking(void);
112 INT source_unbooking(void);
113 INT close_buffers(void);
114 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info);
115 INT eb_mfragment_add(char *pdest, char *psrce, INT * size);
116 INT eb_yfragment_add(char *pdest, char *psrce, INT * size);
117 
118 INT eb_begin_of_run(INT, char *, char *);
119 INT eb_end_of_run(INT, char *);
120 INT eb_user(INT, BOOL mismatch, EBUILDER_CHANNEL *, EVENT_HEADER *, void *, INT *);
121 INT load_fragment(void);
122 INT scan_fragment(void);
123 extern char *frontend_name;
124 extern char *frontend_file_name;
125 extern BOOL frontend_call_loop;
126 
127 extern INT max_event_size;
128 extern INT max_event_size_frag;
129 extern INT event_buffer_size;
130 extern INT display_period;
131 extern INT ebuilder_init(void);
132 extern INT ebuilder_exit(void);
133 extern INT ebuilder_loop(void);
134 
135 extern EQUIPMENT equipment[];
136 extern "C" {
137  extern INT ybos_event_swap(DWORD * pevt);
138 };
139 
140 #define EQUIPMENT_COMMON_STR "\
141 Event ID = WORD : 0\n\
142 Trigger mask = WORD : 0\n\
143 Buffer = STRING : [32] SYSTEM\n\
144 Type = INT : 0\n\
145 Source = INT : 0\n\
146 Format = STRING : [8] FIXED\n\
147 Enabled = BOOL : 0\n\
148 Read on = INT : 0\n\
149 Period = INT : 0\n\
150 Event limit = DOUBLE : 0\n\
151 Num subevents = DWORD : 0\n\
152 Log history = INT : 0\n\
153 Frontend host = STRING : [32] \n\
154 Frontend name = STRING : [32] \n\
155 Frontend file name = STRING : [256] \n\
156 "
157 
158 #define EQUIPMENT_STATISTICS_STR "\
159 Events sent = DOUBLE : 0\n\
160 Events per sec. = DOUBLE : 0\n\
161 kBytes per sec. = DOUBLE : 0\n\
162 "
163 
164 /********************************************************************/
166 {
167  INT index, size, status;
168  char str[256];
169  EQUIPMENT_INFO *eq_info;
170  EQUIPMENT_STATS *eq_stats;
171  HNDLE hKey;
172 
173  /* get current ODB run state */
174  size = sizeof(run_state);
175  run_state = STATE_STOPPED;
176  db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
177  size = sizeof(run_number);
178  run_number = 1;
179  status = db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
180  assert(status == SUCCESS);
181 
182  /* scan EQUIPMENT table from mevb.C */
183  for (index = 0; equipment[index].name[0]; index++) {
184  eq_info = &equipment[index].info;
185  eq_stats = &equipment[index].stats;
186 
187  if (eq_info->event_id == 0) {
188  printf("\nEvent ID 0 for %s not allowed\n", equipment[index].name);
189  cm_disconnect_experiment();
190  ss_sleep(5000);
191  exit(0);
192  }
193 
194  /* init status */
195  equipment[index].status = EB_SUCCESS;
196 
197  sprintf(str, "/Equipment/%s/Common", equipment[index].name);
198 
199  /* get last event limit from ODB */
200  if (eq_info->eq_type != EQ_SLOW) {
201  db_find_key(hDB, 0, str, &hKey);
202  size = sizeof(double);
203  if (hKey)
204  db_get_value(hDB, hKey, "Event limit", &eq_info->event_limit, &size,
205  TID_DOUBLE, TRUE);
206  }
207 
208  /* Create common subtree */
209  status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, TRUE);
210  if (status != DB_SUCCESS) {
211  printf("Cannot check equipment record, status = %d\n", status);
212  ss_sleep(3000);
213  }
214  db_find_key(hDB, 0, str, &hKey);
215 
216  if (equal_ustring(eq_info->format, "YBOS"))
217  equipment[index].format = FORMAT_YBOS;
218  else if (equal_ustring(eq_info->format, "FIXED"))
219  equipment[index].format = FORMAT_FIXED;
220  else /* default format is MIDAS */
221  equipment[index].format = FORMAT_MIDAS;
222 
223  gethostname(eq_info->frontend_host, sizeof(eq_info->frontend_host));
224  strcpy(eq_info->frontend_name, full_frontend_name);
225  strcpy(eq_info->frontend_file_name, frontend_file_name);
226 
227  /* set record from equipment[] table in frontend.c */
228  db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
229 
230  /* get record once at the start equipment info */
231  size = sizeof(EQUIPMENT_INFO);
232  db_get_record(hDB, hKey, eq_info, &size, 0);
233 
234  /*---- Create just the key , leave it empty ---------------------------------*/
235  sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
236  db_create_key(hDB, 0, str, TID_KEY);
237  db_find_key(hDB, 0, str, &hKey);
238  equipment[index].hkey_variables = hKey;
239 
240  /*---- Create and initialize statistics tree -------------------*/
241  sprintf(str, "/Equipment/%s/Statistics", equipment[index].name);
242 
243  status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
244  if (status != DB_SUCCESS) {
245  printf("Cannot create/check statistics record, error %d\n", status);
246  ss_sleep(3000);
247  }
248 
249  status = db_find_key(hDB, 0, str, &hKey);
250  if (status != DB_SUCCESS) {
251  printf("Cannot find statistics record, error %d\n", status);
252  ss_sleep(3000);
253  }
254 
255  eq_stats->events_sent = 0;
256  eq_stats->events_per_sec = 0;
257  eq_stats->kbytes_per_sec = 0;
258 
259  /* open hot link to statistics tree */
260  status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS)
261  , MODE_WRITE, NULL, NULL);
262  if (status != DB_SUCCESS) {
263  cm_msg(MERROR, "register_equipment",
264  "Cannot open statistics record, error %d. Probably other FE is using it",
265  status);
266  ss_sleep(3000);
267  }
268 
269  /*---- open event buffer ---------------------------------------*/
270  if (eq_info->buffer[0]) {
271  status = bm_open_buffer(eq_info->buffer, EVENT_BUFFER_SIZE,
272  &equipment[index].buffer_handle);
273  if (status != BM_SUCCESS && status != BM_CREATED) {
274  cm_msg(MERROR, "register_equipment",
275  "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
276  and rebuild the system.");
277  return 0;
278  }
279 
280  /* set the default buffer cache size */
281  bm_set_cache_size(equipment[index].buffer_handle, 0, SERVER_CACHE_SIZE);
282  } else {
283  cm_msg(MERROR, "register_equipment", "Destination buffer must be present");
284  ss_sleep(3000);
285  exit(0);
286  }
287  }
288  return SUCCESS;
289 }
290 
291 /********************************************************************/
292 INT load_fragment(void)
293 {
294  INT i, size, type;
295  BOOL enabled;
296  HNDLE hEqKey, hSubkey;
297  EQUIPMENT_INFO *eq_info;
298  KEY key;
299  char buffer[NAME_LENGTH];
300  char format[8];
301 
302  /* Get equipment pointer, only one eqp for now */
303  eq_info = &equipment[0].info;
304 
305  /* Scan Equipment/Common listing */
306  if (db_find_key(hDB, 0, "Equipment", &hEqKey) != DB_SUCCESS) {
307  cm_msg(MINFO, "load_fragment", "Equipment listing not found");
308  return EB_ERROR;
309  }
310 
311  /* Scan the Equipment list for fragment info collection */
312  for (i = 0, nfragment=0 ; ; i++) {
313  db_enum_key(hDB, hEqKey, i, &hSubkey);
314  if (!hSubkey)
315  break;
316  db_get_key(hDB, hSubkey, &key);
317  if (key.type == TID_KEY) {
318  /* Equipment name */
319  if (debug) printf("Equipment name:%s\n", key.name);
320  /* Check if equipment is EQ_EB */
321  size = sizeof(INT);
322  db_get_value(hDB, hSubkey, "common/type", &type, &size, TID_INT, 0);
323  size = sizeof(buffer);
324  db_get_value(hDB, hSubkey, "common/Buffer", buffer, &size, TID_STRING, 0);
325  size = sizeof(format);
326  db_get_value(hDB, hSubkey, "common/Format", format, &size, TID_STRING, 0);
327  size = sizeof(BOOL);
328  enabled = FALSE;
329  db_get_value(hDB, hSubkey, "Settings/Enabled", &enabled, &size, TID_BOOL, 0);
330  /* Check if equipment match EB requirements */
331  if ((type & EQ_EB) && enabled
332  && (strncmp(buffer, buffer_name, strlen(buffer_name)) == 0)
333  && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
334  /* match=> fill internal eb structure */
335  strcpy(ebch[nfragment].format, format);
336  strcpy(ebch[nfragment].buffer, buffer);
337  size = sizeof(WORD);
338  db_get_value(hDB, hSubkey, "common/Trigger Mask", &ebch[nfragment].trigger_mask, &size, TID_WORD, 0);
339  size = sizeof(WORD);
340  db_get_value(hDB, hSubkey, "common/Event ID", &ebch[nfragment].event_id, &size, TID_WORD, 0);
341  printf("%s enabled\n",key.name);
342  nfragment++;
343  }
344  }
345  }
346 
347  printf("Found %d fragment matching EB setting\n", nfragment);
348  /* Point to the Ebuilder settings */
349  /* Set fragment_add function based on the format */
350  if (equipment[0].format == FORMAT_MIDAS)
352  else if (equipment[0].format == FORMAT_YBOS)
354  else {
355  cm_msg(MERROR, "mevb", "Unknown data format :%d", format);
356  return EB_ERROR;
357  }
358 
359  /* allocate destination event buffer */
360  dest_event = (char *) malloc(nfragment * (max_event_size + sizeof(EVENT_HEADER)));
361  memset(dest_event, 0, nfragment * (max_event_size + sizeof(EVENT_HEADER)));
362  if (dest_event == NULL) {
363  cm_msg(MERROR, "EBuilder", "%s: Not enough memory for event buffer", full_frontend_name);
364  return EB_ERROR;
365  }
366  return EB_SUCCESS;
367 }
368 
369 /********************************************************************/
370 INT scan_fragment(void)
371 {
372  INT fragn, status;
373  EQUIPMENT *eq;
374  EQUIPMENT_INFO *eq_info;
375 
376  /* Get equipment pointer, only one eqp for now */
377  eq_info = &equipment[0].info;
378 
379  /* Main event loop */
380  do {
381  switch (run_state) {
382  case STATE_STOPPED:
383  case STATE_PAUSED:
384  /* skip the source scan and yield */
385  status = cm_yield(500);
386  if (wheel) {
387  printf("...%c Snoring\r", bars[i_bar++ % 4]);
388  fflush(stdout);
389  }
390  break;
391  case STATE_RUNNING:
392  status = source_scan(equipment[0].format, eq_info);
393  switch (status) {
394  case BM_ASYNC_RETURN: // No event found for now, Check for timeout
395  for (fragn = 0; fragn < nfragment; fragn++) {
396  if (ebch[fragn].timeout > TIMEOUT) { /* Timeout */
397  if (stop_requested) { /* Stop */
398  if (debug) printf("Stop requested on timeout %d\n", status);
399  status = close_buffers();
400  break;
401  }
402  else { /* No stop requested but timeout */
403  if (wheel) {
404  printf("...%c Timoing on %1.0lf\r", bars[i_bar++ % 4],
405  eq->stats.events_sent);
406  fflush(stdout);
407  status = cm_yield(50);
408  }
409  }
410  }
411  //else { /* No timeout loop back */
412  } // for loop over all fragments
413  break;
414  case EB_ERROR:
415  case EB_USER_ERROR:
417  if (status == EB_USER_ERROR)
418  cm_msg(MTALK, "EBuilder", "%s: Error signaled by user code - stopping run...", full_frontend_name);
419  else
420  cm_msg(MTALK, "EBuilder", "%s: Event mismatch - Stopping run...", full_frontend_name);
421  if (cm_transition(TR_STOP, 0, NULL, 0, ASYNC, 0) != CM_SUCCESS) {
422  cm_msg(MERROR, "EBuilder", "%s: Stop Transition request failed", full_frontend_name);
423  status = close_buffers();
424  return status;
425  }
426  if (debug) printf("Stop requested on Error %d\n", status);
427  status = close_buffers();
428  return status;
429  break;
430  case EB_SUCCESS:
431  case EB_SKIP:
432  // Normal path if event has been assembled
433  // No yield in this case.
434  break;
435  default:
436  cm_msg(MERROR, "Source_scan", "unexpected return %d", status);
437  status = SS_ABORT;
438  } // switch scan_source
439  break;
440  }
441  /* EB job done, update statistics if its time */
442  /* Check if it's time to do statistics job */
443  if ((actual_millitime = ss_millitime()) - last_time > 1000) {
444  /* Force event to appear at the destination if Ebuilder is remote */
445  rpc_flush_event();
446  /* Force event ot appear at the destination if Ebuilder is local */
447  bm_flush_cache(equipment[0].buffer_handle, ASYNC);
448 
449  status = cm_yield(10);
450 
451  eq = &equipment[0];
452  eq->stats.events_sent += eq->events_sent;
453  eq->stats.events_per_sec =
454  eq->events_sent / ((actual_millitime - last_time) / 1000.0);
455  eq->stats.kbytes_per_sec =
456  eq->bytes_sent / 1024.0 / ((actual_millitime - last_time) /
457  1000.0);
458  eq->bytes_sent = 0;
459  eq->events_sent = 0;
460  /* update destination statistics */
461  db_send_changed_records();
462  /* Keep track of last ODB update */
463  last_time = ss_millitime();
464  }
465  } while (status != RPC_SHUTDOWN && status != SS_ABORT);
466 
467  return status;
468  }
469 
470 /********************************************************************/
471 INT eb_mfragment_add(char *pdest, char *psrce, INT * size)
472 {
473  BANK_HEADER *psbh, *pdbh;
474  char *psdata, *pddata;
475  INT bksize;
476 
477  /* Condition for new EVENT the data_size should be ZERO */
478  *size = ((EVENT_HEADER *) pdest)->data_size;
479 
480  /* destination pointer */
481  pddata = pdest + *size + sizeof(EVENT_HEADER);
482 
483  if (*size) {
484  /* NOT the first fragment */
485 
486  /* Swap event source if necessary */
487  psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
488  bk_swap(psbh, FALSE);
489 
490  /* source pointer */
491  psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
492  psdata = (char *) (psbh + 1);
493 
494  /* copy all banks without the bank header */
495  bksize = psbh->data_size;
496 
497  /* copy */
498  memcpy(pddata, psdata, bksize);
499 
500  /* update event size */
501  ((EVENT_HEADER *) pdest)->data_size += bksize;
502 
503  /* update bank size */
504  pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
505  pdbh->data_size += bksize;
506 
507  *size = ((EVENT_HEADER *) pdest)->data_size;
508  } else {
509  /* First event without the event header but with the
510  bank header as the size is zero */
511  *size = ((EVENT_HEADER *) psrce)->data_size;
512 
513  /* Swap event if necessary */
514  psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
515  bk_swap(psbh, FALSE);
516 
517  /* copy first fragment */
518  memcpy(pddata, psbh, *size);
519 
520  /* update destination event size */
521  ((EVENT_HEADER *) pdest)->data_size = *size;
522  }
523  return CM_SUCCESS;
524 }
525 
526 /*--------------------------------------------------------------------*/
527 INT eb_yfragment_add(char *pdest, char *psrce, INT * size)
528 {
529  /* pdest : EVENT_HEADER pointer
530  psrce : EVENT_HEADER pointer
531  Keep pbkh for later incrementation
532  */
533  char *psdata, *pddata;
534  DWORD *pslrl, *pdlrl;
535  INT i4frgsize, i1frgsize, status;
536 
537  /* Condition for new EVENT the data_size should be ZERO */
538  *size = ((EVENT_HEADER *) pdest)->data_size;
539 
540  /* destination pointer skip the header as it has been already
541  composed and the usere may have modified it on purpose (Midas Control) */
542  pddata = pdest + *size + sizeof(EVENT_HEADER);
543 
544  /* the Midas header is present for logger */
545  if (*size) { /* already filled with a fragment */
546 
547  /* source pointer: number of DWORD (lrl included) */
548  pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
549 
550  /* Swap event if necessary */
551  status = ybos_event_swap(pslrl);
552 
553  /* copy done in bytes, do not include LRL */
554  psdata = (char *) (pslrl + 1);
555 
556  /* copy size in I*4 (lrl included, remove it) */
557  i4frgsize = (*pslrl);
558  i1frgsize = 4 * i4frgsize;
559 
560  /* append fragment */
561  memcpy(pddata, psdata, i1frgsize);
562 
563  /* update Midas header event size */
564  ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
565 
566  /* update LRL size (I*4) */
567  pdlrl = (DWORD *) (((EVENT_HEADER *) pdest) + 1);
568  *pdlrl += i4frgsize;
569 
570  /* Return event size in bytes */
571  *size = ((EVENT_HEADER *) pdest)->data_size;
572  } else { /* new destination event */
573  /* The composed event has already the MIDAS header.
574  which may have been modified by the user in ebuser.c
575  Will be stripped by the logger (YBOS).
576  Copy the first full event ( no EVID suppression )
577  First event (without the event header) */
578 
579  /* source pointer */
580  pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
581 
582  /* Swap event if necessary */
583  status = ybos_event_swap(pslrl);
584 
585  /* size in byte from the source midas header */
586  *size = ((EVENT_HEADER *) psrce)->data_size;
587 
588  /* copy first fragment */
589  memcpy(pddata, (char *) pslrl, *size);
590 
591  /* update destination Midas header event size */
592  ((EVENT_HEADER *) pdest)->data_size += *size;
593 
594  }
595  return CM_SUCCESS;
596 }
597 
598 /*--------------------------------------------------------------------*/
599 INT tr_start(INT rn, char *error)
600 {
601  EBUILDER(ebuilder_str);
602  INT status, size, i;
603  char str[128];
604  KEY key;
605  HNDLE hKey, hEqkey, hEqFRkey;
606  EQUIPMENT_INFO *eq_info;
607 
608 
609  eq_info = &equipment[0].info;
610 
611  /* Get update eq_info from ODB */
612  sprintf(str, "/Equipment/%s/Common", equipment[0].name);
613  status = db_find_key(hDB, 0, str, &hKey);
614  size = sizeof(EQUIPMENT_INFO);
615  db_get_record(hDB, hKey, eq_info, &size, 0);
616 
617  ebset.nfragment = nfragment;
618 
619  /* reset serial numbers */
620  for (i = 0; equipment[i].name[0]; i++) {
621  equipment[i].serial_number = 1;
622  equipment[i].subevent_number = 0;
623  equipment[i].stats.events_sent = 0;
624  equipment[i].odb_in = equipment[i].odb_out = 0;
625  }
626 
627  /* Get / Set Settings */
628  sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
629  if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
630  status = db_create_record(hDB, 0, str, strcomb(ebuilder_str));
631  }
632 
633  /* Keep Key on Ebuilder/Settings */
634  sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
635  if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
636  cm_msg(MINFO, "load_fragment", "/Equipment/%s/Settings not found", equipment[0].name);
637  }
638 
639  /* Update or Create User_field */
640  size = sizeof(ebset.user_field);
641  status = db_get_value(hDB, hEqkey, "User Field", ebset.user_field, &size, TID_STRING, TRUE);
642 
643  /* Update or Create User_Build */
644  size = sizeof(ebset.user_build);
645  status = db_get_value(hDB, hEqkey, "User Build", &ebset.user_build, &size, TID_BOOL, TRUE);
646 
647  /* update ODB */
648  size = sizeof(INT);
649  status = db_set_value(hDB, hEqkey, "Number of Fragment", &ebset.nfragment, size, 1, TID_INT);
650 
651  /* Create or update the fragment request list */
652  status = db_find_key(hDB, hEqkey, "Fragment Required", &hEqFRkey);
653  status = db_get_key (hDB, hEqFRkey, &key);
654  if (key.num_values != ebset.nfragment) {
655  cm_msg(MINFO, "mevb", "Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.nfragment);
656  free (ebset.preqfrag);
657  size = ebset.nfragment*sizeof(BOOL);
658  ebset.preqfrag = (BOOL *) malloc(size);
659  for (i=0 ; i<ebset.nfragment ; i++)
660  ebset.preqfrag[i] = TRUE;
661  status = db_set_value(hDB, hEqkey, "Fragment Required", ebset.preqfrag, size, ebset.nfragment, TID_BOOL);
662  } else { // Take from ODBedit
663  size = key.total_size;
664  free (ebset.preqfrag);
665  ebset.preqfrag = (BOOL *) malloc(size);
666  status = db_get_data(hDB, hEqFRkey, ebset.preqfrag, &size, TID_BOOL);
667  }
668  /* Cleanup fragment flags */
669  free (ebset.received);
670  ebset.received = (BOOL *) malloc(size);
671  for (i=0 ; i < ebset.nfragment ; i++)
672  ebset.received[i] = FALSE;
673 
674  /* Call BOR user function */
675  status = eb_begin_of_run(run_number, ebset.user_field, error);
676  if (status != EB_SUCCESS) {
677  cm_msg(MERROR, "eb_prestart", "run start aborted due to eb_begin_of_run (%d)",
678  status);
679  return status;
680  }
681 
682  /* Book all fragment */
683  status = source_booking();
684  if (status != SUCCESS)
685  return status;
686 
687  /* local run state */
688  run_state = STATE_RUNNING;
689  run_number = rn;
692  printf("%s-Starting New Run: %d\n", full_frontend_name, rn);
693 
694  /* Reset global trigger mask */
695  return CM_SUCCESS;
696 }
697 
698 /*--------------------------------------------------------------------*/
699 INT tr_stop(INT rn, char *error)
700 {
701  printf("\n%s-Stopping Run: %d detected\n", full_frontend_name, rn);
702 
703  /* local stop */
705 
706  /* local stop time */
707  request_stop_time = ss_millitime();
708  return CM_SUCCESS;
709 }
710 
711 /*--------------------------------------------------------------------*/
712 void free_event_buffer(INT nfrag)
713 {
714  INT i;
715  for (i = 0; i < nfrag; i++) {
716  if (ebch[i].pfragment) {
717  free(ebch[i].pfragment);
718  ebch[i].pfragment = NULL;
719  }
720  }
721 }
722 
723 /*--------------------------------------------------------------------*/
725 {
726  int i, size, status;
727  char strout[256];
728 
729  /* Do Hand flush until better way to garantee the input buffer to be empty */
730  if (debug)
731  printf("Hand flushing system buffer... \n");
732  for (i = 0; i < nfragment; i++) {
733  do {
734  size = max_event_size;
735  status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
736  if (debug1) {
737  sprintf(strout,
738  "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d Last Ser:%d",
739  i, ebch[i].hBuf, status,
740  ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
741  printf("%s\n", strout);
742  }
743  } while (status == BM_SUCCESS);
744  }
745 
746  /* Empty source buffer */
747  status = bm_empty_buffers();
748  if (status != BM_SUCCESS)
749  cm_msg(MERROR, "handFlush", "bm_empty_buffers failure [%d]", status);
750  run_state = STATE_STOPPED;
751  return status;
752 }
753 
754 
755 /*--------------------------------------------------------------------*/
757 {
758  INT j, i, status, status1, status2;
759 
760  if (debug)
761  printf("Entering booking\n");
762 
763  /* Book all the source channels */
764  for (i = 0; i < nfragment; i++) {
765  /* Book only the requested event mask */
766  if (ebset.preqfrag[i]) {
767  /* Connect channel to source buffer */
768  status1 = bm_open_buffer(ebch[i].buffer, EVENT_BUFFER_SIZE, &(ebch[i].hBuf));
769 
770  if (debug)
771  printf("bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
772  i, ebch[i].buffer, ebch[i].hBuf, status1);
773  /* Register for specified channel event ID and Trigger mask */
774  status2 =
775  bm_request_event(ebch[i].hBuf, ebch[i].event_id,
776  ebch[i].trigger_mask, GET_ALL, &ebch[i].req_id, NULL);
777  if (debug)
778  printf("bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
779  i, ebch[i].event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
780  if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
781  ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
782  cm_msg(MERROR, "source_booking",
783  "Open buffer/event request failure [%d %d %d]", i, status1, status2);
784  return BM_CONFLICT;
785  }
786 
787  /* allocate local source event buffer */
788  if (ebch[i].pfragment)
789  free(ebch[i].pfragment);
790  ebch[i].pfragment = (char *) malloc(max_event_size + sizeof(EVENT_HEADER));
791  if (debug)
792  printf("malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
793  if (ebch[i].pfragment == NULL) {
794  free_event_buffer(nfragment);
795  cm_msg(MERROR, "source_booking", "Can't allocate space for buffer");
796  return BM_NO_MEMORY;
797  }
798  }
799  }
800 
801  /* Empty source buffer */
802  status = bm_empty_buffers();
803  if (status != BM_SUCCESS) {
804  cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]", status);
805  return status;
806  }
807 
808  if (debug) {
809  printf("bm_empty_buffers stat:%d\n", status);
810  for (j = 0; j < ebset.nfragment; j++) {
811  printf(" buff:%s", ebch[j].buffer);
812  printf(" ser#:%d", ebch[j].serial);
813  printf(" hbuf:%2d", ebch[j].hBuf);
814  printf(" rqid:%2d", ebch[j].req_id);
815  printf(" opst:%d", status1);
816  printf(" rqst:%d", status2);
817  printf(" evid:%2d", ebch[j].event_id);
818  printf(" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
819  }
820  }
821 
822  return SUCCESS;
823 }
824 
825 /*--------------------------------------------------------------------*/
827 {
828  INT i, status;
829 
830 #if 0
831  /* Skip unbooking if already done */
832  if (ebch[0].pfragment == NULL)
833  return EB_SUCCESS;
834 #endif
835 
836  /* unbook all source channels */
837  for (i = 0; i< nfragment; i++) {
838  bm_empty_buffers();
839 
840  if (ebch[i].trigger_mask) {
841  /* Remove event ID registration */
842  status = bm_delete_request(ebch[i].req_id);
843  if (debug)
844  printf("unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id,
845  status);
846 
847  /* Close source buffer */
848  status = bm_close_buffer(ebch[i].hBuf);
849  if (debug)
850  printf("unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf,
851  status);
852  if (status != BM_SUCCESS) {
853  cm_msg(MERROR, "source_unbooking", "Close buffer[%d] stat:", i, status);
854  return status;
855  }
856  }
857  }
858 
859  /* release local event buffer memory */
860  free_event_buffer(nfragment);
861 
862  return EB_SUCCESS;
863 }
864 
865 /*--------------------------------------------------------------------*/
866 INT close_buffers(void)
867 {
868  INT status;
869  char error[256];
870  EQUIPMENT *eq;
871 
872  eq = &equipment[0];
873 
874  /* Flush local destination cache */
875  bm_flush_cache(equipment[0].buffer_handle, SYNC);
876  /* Call user function */
877  eb_end_of_run(run_number, error);
878  /* Cleanup buffers */
879  handFlush();
880  /* Detach all source from midas */
881  status = source_unbooking();
882 
883  /* Compose message */
884  stop_time = ss_millitime() - request_stop_time;
885  sprintf(error, "Run %d Stop after %1.0lf events sent DT:%d[ms]",
886  run_number, eq->stats.events_sent, stop_time);
887  cm_msg(MINFO, "EBuilder", "%s", error);
888 
889  run_state = STATE_STOPPED;
891  return status;
892 }
893 
894 /********************************************************************/
916 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info)
917 {
918 // static char bars[] = "|/-\\";
919 // static int i_bar;
920  static DWORD serial;
921  DWORD *plrl;
922  BOOL complete;
923  INT i, j, status, size;
924  INT act_size;
925  BOOL found, event_mismatch;
926  BANK_HEADER *psbh;
927  BOOL got_a_fragment;
928 
929  status = SUCCESS;
930  got_a_fragment = FALSE;
931 
932  /* Scan all channels at least once */
933  for (i = 0; i < nfragment; i++) {
934  /* Check if current channel needs to be received */
935  if (ebset.preqfrag[i] && !ebset.received[i]) {
936  /* Get fragment and store it in ebch[i].pfragment */
937  size = max_event_size;
938  status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
939  switch (status) {
940  case BM_SUCCESS: /* event received */
941  /* Mask event */
942  ebset.received[i] = TRUE;
943  /* Keep local serial */
944  ebch[i].serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
945  got_a_fragment = TRUE;
946 
947  /* Swap event depending on data format */
948  switch (fmt) {
949  case FORMAT_YBOS:
950  plrl = (DWORD *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
951  ybos_event_swap(plrl);
952  break;
953  case FORMAT_MIDAS:
954  psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
955  bk_swap(psbh, FALSE);
956  break;
957  }
958 
959  if (debug1) {
960  printf("SUCC: ch:%d ser:%d rec:%d sz:%d\n", i,
961  ebch[i].serial, ebset.received[i], size);
962  }
963  break;
964  case BM_ASYNC_RETURN: /* timeout */
965  ebch[i].timeout++;
966  if (debug1) {
967 // printf("ASYNC: ch:%d ser:%d rec:%d sz:%d\n", i,
968 // ebch[i].serial, ebset.received[i], size);
969  }
970  break;
971  default: /* Error */
972  cm_msg(MERROR, "event_scan", "bm_receive_event error %d", status);
973  return status;
974  break;
975  }
976  } /* next channel */
977  }
978 
979 
980  /* Check if all fragments have been received */
981  complete = FALSE;
982  for (i = 0; i < nfragment;i++) {
983  if (ebset.preqfrag[i] && !ebset.received[i])
984  break;
985  }
986  if (i == nfragment) {
987  complete = TRUE;
988  /* Check if serial matches */
989  found = event_mismatch = FALSE;
990  /* Check Serial, mark first serial */
991  for (i = 0; i < nfragment; i++) {
992  if (ebset.preqfrag[i] && ebset.received[i] && !found) {
993  serial = ebch[i].serial;
994  found = TRUE;
995  } else {
996  if (ebset.preqfrag[i] && ebset.received[i] && (serial != ebch[i].serial)) {
997  /* Event mismatch */
998  event_mismatch = TRUE;
999  }
1000  }
1001  }
1002 
1003  /* internal action in case of event mismatch */
1004  if (event_mismatch && debug) {
1005  char str[256];
1006  char strsub[128];
1007  strcpy(str, "event mismatch: ");
1008  for (j = 0; j < nfragment; j++) {
1009  sprintf(strsub, "Ser[%d]:%d ", j, ebch[j].serial);
1010  strcat(str, strsub);
1011  }
1012  printf("event serial mismatch %s\n", str);
1013  }
1014 
1015  /* In any case reset destination buffer */
1016  memset(dest_event, 0, sizeof(EVENT_HEADER));
1017  act_size = 0;
1018 
1019  /* Fill reserved header space of destination event with
1020  final header information */
1021  bm_compose_event((EVENT_HEADER *) dest_event, eq_info->event_id, eq_info->trigger_mask,
1022  act_size, ebch[0].serial);
1023 
1024  /* Pass fragments to user with mismatch flag, for final check before assembly */
1025  status = eb_user(nfragment, event_mismatch, ebch
1026  , (EVENT_HEADER *) dest_event,(void *) ((EVENT_HEADER *) dest_event + 1), &act_size);
1027  if (status != EB_SUCCESS) {
1028  if (status == EB_SKIP) {
1029  /* Reset mask and timeouts as even thave been succesfully send */
1030  for (i = 0; i < nfragment; i++) {
1031  ebch[i].timeout = 0;
1032  ebset.received[i] = FALSE;
1033  }
1034  }
1035  return status; // Event mark as EB_SKIP or EB_ABORT by user
1036  }
1037 
1038  /* Overall event to be sent */
1039  act_size = ((EVENT_HEADER *) dest_event)->data_size + sizeof(EVENT_HEADER);
1040 
1041  /* Allow bypass of fragment assembly if user did it on its own */
1042  if (!ebset.user_build) {
1043  for (j = 0; j < nfragment; j++) {
1044  if (ebch[j].trigger_mask) {
1045  status = meb_fragment_add(dest_event, ebch[j].pfragment, &act_size);
1046  if (status != EB_SUCCESS) {
1047  cm_msg(MERROR, "source_scan",
1048  "compose fragment:%d current size:%d (%d)", j, act_size, status);
1049  return EB_ERROR;
1050  }
1051  }
1052  }
1053  }
1054 
1055  /* Overall event to be sent */
1056  act_size = ((EVENT_HEADER *) dest_event)->data_size + sizeof(EVENT_HEADER);
1057 
1058  /* Send event and wait for completion */
1059  status = rpc_send_event(equipment[0].buffer_handle, dest_event, act_size, SYNC);
1060  if (status != BM_SUCCESS) {
1061  if (debug)
1062  printf("rpc_send_event returned error %d, event_size %d\n",
1063  status, act_size);
1064  cm_msg(MERROR, "EBuilder", "%s: rpc_send_event returned error %d", full_frontend_name, status);
1065  return EB_ERROR;
1066  }
1067 
1068  /* Keep track of the total byte count */
1069  equipment[0].bytes_sent += act_size;
1070 
1071  /* update destination event count */
1072  equipment[0].events_sent++;
1073 
1074  /* Reset mask and timeouts as even thave been succesfully send */
1075  for (i = 0; i < nfragment; i++) {
1076  ebch[i].timeout = 0;
1077  ebset.received[i] = FALSE;
1078  }
1079  } // all fragment recieved for this event
1080 
1081  // If we did not receive any fragments, then take a break and let the
1082  // CPU get some other work done. The "stop_requested" condition protects
1083  // against a race between the end of one run and the start of the next.
1084  if(!got_a_fragment && !stop_requested) {
1085  cm_yield(10);
1086  }
1087 
1088  return status;
1089 }
1090 
1091 /*--------------------------------------------------------------------*/
1092 int main(unsigned int argc, char **argv)
1093 {
1094  INT status;
1095  unsigned int i;
1096  BOOL daemon = FALSE;
1097 
1098  /* init structure */
1099  memset(&ebch[0], 0, sizeof(ebch));
1100 
1101  /* set default */
1103 
1104  /* get parameters */
1105  for (i = 1; i < argc; i++) {
1106  if (argv[i][0] == '-' && argv[i][1] == 'd') {
1107  debug = TRUE;
1108  debug1 = TRUE;
1109  }
1110  else if (argv[i][0] == '-' && argv[i][1] == 'D')
1111  daemon = TRUE;
1112  else if (argv[i][0] == '-' && argv[i][1] == 'w')
1113  wheel = TRUE;
1114  else if (argv[i][0] == '-') {
1115  if (i + 1 >= argc || argv[i + 1][0] == '-')
1116  goto usage;
1117  if (strncmp(argv[i], "-e", 2) == 0)
1118  strcpy(expt_name, argv[++i]);
1119  else if (strncmp(argv[i], "-h", 2) == 0)
1120  strcpy(host_name, argv[++i]);
1121  else if (strncmp(argv[i], "-b", 2) == 0)
1122  strcpy(buffer_name, argv[++i]);
1123  } else {
1124 usage:
1125  printf("usage: mevb [-h <Hostname>] [-e <Experiment>] -b <buffername> [-d debug]\n");
1126  printf(" -w show wheel -D to start as a daemon\n\n");
1127  return 0;
1128  }
1129  }
1130 
1131  printf("Program mevb version 5 started\n\n");
1132  if (daemon) {
1133  printf("Becoming a daemon...\n");
1134  ss_daemon_init(FALSE);
1135  }
1136 
1137  /* Check buffer arg */
1138  if (buffer_name[0] == 0) {
1139  printf("Buffer name must be specified with -b argument\n");
1140  goto exit;
1141  }
1142 
1143  /* Compose frontend name */
1145 
1146  /* Connect to experiment */
1147  status = cm_connect_experiment(host_name, expt_name, full_frontend_name, NULL);
1148  if (status != CM_SUCCESS) {
1149  ss_sleep(5000);
1150  goto exit;
1151  }
1152 
1153  if (debug)
1154  cm_set_watchdog_params(TRUE, 0);
1155 
1156  /* Connect to ODB */
1157  status = cm_get_experiment_database(&hDB, &hKey);
1158  if (status != EB_SUCCESS) {
1159  ss_sleep(5000);
1160  goto exit;
1161  }
1162 
1163  /* check if Ebuilder is already running */
1164  status = cm_exist(full_frontend_name, TRUE);
1165  if (status == CM_SUCCESS) {
1166  cm_msg(MERROR, "Ebuilder", "%s running already!.", full_frontend_name);
1167  cm_disconnect_experiment();
1168  goto exit;
1169  }
1170 
1171  if (ebuilder_init() != SUCCESS) {
1172  cm_disconnect_experiment();
1173  /* let user read message before window might close */
1174  ss_sleep(5000);
1175  goto exit;
1176  }
1177 
1178  /* Register single equipment */
1179  status = register_equipment();
1180  if (status != EB_SUCCESS) {
1181  ss_sleep(5000);
1182  goto exit;
1183  }
1184 
1185  /* Load Fragment info */
1186  status = load_fragment();
1187  if (status != EB_SUCCESS) {
1188  ss_sleep(5000);
1189  goto exit;
1190  }
1191 
1192  /* Register transition for reset counters */
1193  if (cm_register_transition(TR_START, tr_start, 300) != CM_SUCCESS)
1194  return status;
1195  if (cm_register_transition(TR_STOP, tr_stop, 700) != CM_SUCCESS)
1196  goto exit;
1197 
1198  /* Scan fragments... will stay in */
1199  status = scan_fragment();
1200  printf("%s-Out of scan_fragment\n", full_frontend_name);
1201 
1202  /* Detach all source from midas */
1203  printf("%s-Unbooking\n", full_frontend_name);
1204  source_unbooking();
1205 
1206  ebuilder_exit();
1207 
1208 exit:
1209  /* Free local memory */
1211 
1212  /* Clean disconnect from midas */
1213  cm_disconnect_experiment();
1214  return 0;
1215 }