79 #define SERVER_CACHE_SIZE 0
81 #define ODB_UPDATE_TIME 1000
83 #define DEFAULT_FE_TIMEOUT 60000
140 #define EQUIPMENT_COMMON_STR "\
141 Event ID = WORD : 0\n\
142 Trigger mask = WORD : 0\n\
143 Buffer = STRING : [32] SYSTEM\n\
146 Format = STRING : [8] FIXED\n\
147 Enabled = BOOL : 0\n\
150 Event limit = DOUBLE : 0\n\
151 Num subevents = DWORD : 0\n\
152 Log history = INT : 0\n\
153 Frontend host = STRING : [32] \n\
154 Frontend name = STRING : [32] \n\
155 Frontend file name = STRING : [256] \n\
158 #define EQUIPMENT_STATISTICS_STR "\
159 Events sent = DOUBLE : 0\n\
160 Events per sec. = DOUBLE : 0\n\
161 kBytes per sec. = DOUBLE : 0\n\
169 EQUIPMENT_INFO *eq_info;
170 EQUIPMENT_STATS *eq_stats;
179 status = db_get_value(
hDB, 0,
"/Runinfo/Run number", &
run_number, &size, TID_INT,
TRUE);
183 for (index = 0;
equipment[index].name[0]; index++) {
187 if (eq_info->event_id == 0) {
189 cm_disconnect_experiment();
200 if (eq_info->eq_type != EQ_SLOW) {
201 db_find_key(
hDB, 0, str, &hKey);
202 size =
sizeof(double);
204 db_get_value(
hDB, hKey,
"Event limit", &eq_info->event_limit, &size,
210 if (status != DB_SUCCESS) {
211 printf(
"Cannot check equipment record, status = %d\n", status);
214 db_find_key(
hDB, 0, str, &hKey);
216 if (equal_ustring(eq_info->format,
"YBOS"))
218 else if (equal_ustring(eq_info->format,
"FIXED"))
223 gethostname(eq_info->frontend_host,
sizeof(eq_info->frontend_host));
228 db_set_record(
hDB, hKey, eq_info,
sizeof(EQUIPMENT_INFO), 0);
231 size =
sizeof(EQUIPMENT_INFO);
232 db_get_record(
hDB, hKey, eq_info, &size, 0);
236 db_create_key(
hDB, 0, str, TID_KEY);
237 db_find_key(
hDB, 0, str, &hKey);
244 if (status != DB_SUCCESS) {
245 printf(
"Cannot create/check statistics record, error %d\n", status);
249 status = db_find_key(
hDB, 0, str, &hKey);
250 if (status != DB_SUCCESS) {
251 printf(
"Cannot find statistics record, error %d\n", status);
255 eq_stats->events_sent = 0;
256 eq_stats->events_per_sec = 0;
257 eq_stats->kbytes_per_sec = 0;
260 status = db_open_record(
hDB, hKey, eq_stats,
sizeof(EQUIPMENT_STATS)
261 , MODE_WRITE, NULL, NULL);
262 if (status != DB_SUCCESS) {
263 cm_msg(MERROR,
"register_equipment",
264 "Cannot open statistics record, error %d. Probably other FE is using it",
270 if (eq_info->buffer[0]) {
271 status = bm_open_buffer(eq_info->buffer, EVENT_BUFFER_SIZE,
273 if (status != BM_SUCCESS && status != BM_CREATED) {
274 cm_msg(MERROR,
"register_equipment",
275 "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
276 and rebuild the system.");
283 cm_msg(MERROR,
"register_equipment",
"Destination buffer must be present");
297 EQUIPMENT_INFO *eq_info;
299 char buffer[NAME_LENGTH];
306 if (db_find_key(
hDB, 0,
"Equipment", &hEqKey) != DB_SUCCESS) {
307 cm_msg(MINFO,
"load_fragment",
"Equipment listing not found");
313 db_enum_key(
hDB, hEqKey, i, &hSubkey);
316 db_get_key(
hDB, hSubkey, &key);
317 if (key.type == TID_KEY) {
319 if (
debug)
printf(
"Equipment name:%s\n", key.name);
322 db_get_value(
hDB, hSubkey,
"common/type", &type, &size, TID_INT, 0);
323 size =
sizeof(buffer);
324 db_get_value(
hDB, hSubkey,
"common/Buffer", buffer, &size, TID_STRING, 0);
325 size =
sizeof(format);
326 db_get_value(
hDB, hSubkey,
"common/Format", format, &size, TID_STRING, 0);
329 db_get_value(
hDB, hSubkey,
"Settings/Enabled", &enabled, &size, TID_BOOL, 0);
331 if ((type & EQ_EB) && enabled
333 && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
338 db_get_value(
hDB, hSubkey,
"common/Trigger Mask", &ebch[
nfragment].trigger_mask, &size, TID_WORD, 0);
341 printf(
"%s enabled\n",key.name);
352 else if (
equipment[0].format == FORMAT_YBOS)
355 cm_msg(MERROR,
"mevb",
"Unknown data format :%d", format);
363 cm_msg(MERROR,
"EBuilder",
"%s: Not enough memory for event buffer",
full_frontend_name);
374 EQUIPMENT_INFO *eq_info;
385 status = cm_yield(500);
394 case BM_ASYNC_RETURN:
395 for (fragn = 0; fragn <
nfragment; fragn++) {
398 if (
debug)
printf(
"Stop requested on timeout %d\n", status);
405 eq->stats.events_sent);
407 status = cm_yield(50);
418 cm_msg(MTALK,
"EBuilder",
"%s: Error signaled by user code - stopping run...",
full_frontend_name);
420 cm_msg(MTALK,
"EBuilder",
"%s: Event mismatch - Stopping run...",
full_frontend_name);
421 if (cm_transition(TR_STOP, 0, NULL, 0, ASYNC, 0) != CM_SUCCESS) {
426 if (
debug)
printf(
"Stop requested on Error %d\n", status);
436 cm_msg(MERROR,
"Source_scan",
"unexpected return %d", status);
447 bm_flush_cache(
equipment[0].buffer_handle, ASYNC);
449 status = cm_yield(10);
452 eq->stats.events_sent += eq->events_sent;
453 eq->stats.events_per_sec =
455 eq->stats.kbytes_per_sec =
461 db_send_changed_records();
465 }
while (status != RPC_SHUTDOWN && status != SS_ABORT);
473 BANK_HEADER *
psbh, *pdbh;
474 char *psdata, *pddata;
478 *size = ((EVENT_HEADER *) pdest)->data_size;
481 pddata = pdest + *size +
sizeof(EVENT_HEADER);
487 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
488 bk_swap(psbh,
FALSE);
491 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
492 psdata = (
char *) (psbh + 1);
495 bksize = psbh->data_size;
498 memcpy(pddata, psdata, bksize);
501 ((EVENT_HEADER *) pdest)->data_size += bksize;
504 pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
505 pdbh->data_size += bksize;
507 *size = ((EVENT_HEADER *) pdest)->data_size;
511 *size = ((EVENT_HEADER *) psrce)->data_size;
514 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
515 bk_swap(psbh,
FALSE);
518 memcpy(pddata, psbh, *size);
521 ((EVENT_HEADER *) pdest)->data_size = *
size;
533 char *psdata, *pddata;
534 DWORD *pslrl, *pdlrl;
535 INT i4frgsize, i1frgsize,
status;
538 *size = ((EVENT_HEADER *) pdest)->data_size;
542 pddata = pdest + *size +
sizeof(EVENT_HEADER);
548 pslrl = (
DWORD *) (((EVENT_HEADER *) psrce) + 1);
554 psdata = (
char *) (pslrl + 1);
557 i4frgsize = (*pslrl);
558 i1frgsize = 4 * i4frgsize;
561 memcpy(pddata, psdata, i1frgsize);
564 ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
567 pdlrl = (
DWORD *) (((EVENT_HEADER *) pdest) + 1);
571 *size = ((EVENT_HEADER *) pdest)->data_size;
580 pslrl = (
DWORD *) (((EVENT_HEADER *) psrce) + 1);
586 *size = ((EVENT_HEADER *) psrce)->data_size;
589 memcpy(pddata, (
char *) pslrl, *size);
592 ((EVENT_HEADER *) pdest)->data_size += *
size;
605 HNDLE
hKey, hEqkey, hEqFRkey;
606 EQUIPMENT_INFO *eq_info;
613 status = db_find_key(
hDB, 0, str, &hKey);
614 size =
sizeof(EQUIPMENT_INFO);
615 db_get_record(
hDB, hKey, eq_info, &size, 0);
629 if (db_find_key(
hDB, 0, str, &hEqkey) != DB_SUCCESS) {
630 status = db_create_record(
hDB, 0, str, strcomb(ebuilder_str));
635 if (db_find_key(
hDB, 0, str, &hEqkey) != DB_SUCCESS) {
636 cm_msg(MINFO,
"load_fragment",
"/Equipment/%s/Settings not found",
equipment[0].name);
641 status = db_get_value(
hDB, hEqkey,
"User Field", ebset.
user_field, &size, TID_STRING,
TRUE);
645 status = db_get_value(
hDB, hEqkey,
"User Build", &ebset.
user_build, &size, TID_BOOL,
TRUE);
649 status = db_set_value(
hDB, hEqkey,
"Number of Fragment", &ebset.
nfragment, size, 1, TID_INT);
652 status = db_find_key(
hDB, hEqkey,
"Fragment Required", &hEqFRkey);
653 status = db_get_key (
hDB, hEqFRkey, &key);
655 cm_msg(MINFO,
"mevb",
"Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.
nfragment);
658 ebset.
preqfrag = (BOOL *) malloc(size);
661 status = db_set_value(
hDB, hEqkey,
"Fragment Required", ebset.
preqfrag, size, ebset.
nfragment, TID_BOOL);
663 size = key.total_size;
665 ebset.
preqfrag = (BOOL *) malloc(size);
666 status = db_get_data(
hDB, hEqFRkey, ebset.
preqfrag, &size, TID_BOOL);
670 ebset.
received = (BOOL *) malloc(size);
677 cm_msg(MERROR,
"eb_prestart",
"run start aborted due to eb_begin_of_run (%d)",
715 for (i = 0; i < nfrag; i++) {
716 if (ebch[i].pfragment) {
717 free(ebch[i].pfragment);
731 printf(
"Hand flushing system buffer... \n");
735 status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
738 "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d Last Ser:%d",
739 i, ebch[i].hBuf, status,
740 ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
743 }
while (status == BM_SUCCESS);
747 status = bm_empty_buffers();
748 if (status != BM_SUCCESS)
749 cm_msg(MERROR,
"handFlush",
"bm_empty_buffers failure [%d]", status);
758 INT j,
i,
status, status1, status2;
761 printf(
"Entering booking\n");
768 status1 = bm_open_buffer(ebch[i].buffer, EVENT_BUFFER_SIZE, &(ebch[i].hBuf));
771 printf(
"bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
772 i, ebch[i].buffer, ebch[i].hBuf, status1);
775 bm_request_event(ebch[i].hBuf, ebch[i].
event_id,
776 ebch[i].trigger_mask, GET_ALL, &ebch[i].req_id, NULL);
778 printf(
"bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
779 i, ebch[i].event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
780 if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
781 ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
782 cm_msg(MERROR,
"source_booking",
783 "Open buffer/event request failure [%d %d %d]", i, status1, status2);
788 if (ebch[i].pfragment)
789 free(ebch[i].pfragment);
792 printf(
"malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
793 if (ebch[i].pfragment == NULL) {
795 cm_msg(MERROR,
"source_booking",
"Can't allocate space for buffer");
802 status = bm_empty_buffers();
803 if (status != BM_SUCCESS) {
804 cm_msg(MERROR,
"source_booking",
"bm_empty_buffers failure [%d]", status);
809 printf(
"bm_empty_buffers stat:%d\n", status);
811 printf(
" buff:%s", ebch[j].buffer);
812 printf(
" ser#:%d", ebch[j].serial);
813 printf(
" hbuf:%2d", ebch[j].hBuf);
814 printf(
" rqid:%2d", ebch[j].req_id);
815 printf(
" opst:%d", status1);
816 printf(
" rqst:%d", status2);
818 printf(
" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
832 if (ebch[0].pfragment == NULL)
840 if (ebch[i].trigger_mask) {
842 status = bm_delete_request(ebch[i].req_id);
844 printf(
"unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id,
848 status = bm_close_buffer(ebch[i].hBuf);
850 printf(
"unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf,
852 if (status != BM_SUCCESS) {
853 cm_msg(MERROR,
"source_unbooking",
"Close buffer[%d] stat:", i, status);
875 bm_flush_cache(
equipment[0].buffer_handle, SYNC);
885 sprintf(error,
"Run %d Stop after %1.0lf events sent DT:%d[ms]",
887 cm_msg(MINFO,
"EBuilder",
"%s", error);
930 got_a_fragment =
FALSE;
938 status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
944 ebch[
i].
serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
945 got_a_fragment =
TRUE;
954 psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
955 bk_swap(psbh,
FALSE);
960 printf(
"SUCC: ch:%d ser:%d rec:%d sz:%d\n", i,
961 ebch[i].serial, ebset.
received[i], size);
964 case BM_ASYNC_RETURN:
972 cm_msg(MERROR,
"event_scan",
"bm_receive_event error %d", status);
986 if (i == nfragment) {
989 found = event_mismatch =
FALSE;
998 event_mismatch =
TRUE;
1004 if (event_mismatch &&
debug) {
1007 strcpy(str,
"event mismatch: ");
1009 sprintf(strsub,
"Ser[%d]:%d ", j, ebch[j].serial);
1010 strcat(str, strsub);
1012 printf(
"event serial mismatch %s\n", str);
1021 bm_compose_event((EVENT_HEADER *)
dest_event, eq_info->event_id, eq_info->trigger_mask,
1022 act_size, ebch[0].
serial);
1025 status =
eb_user(nfragment, event_mismatch, ebch
1039 act_size = ((EVENT_HEADER *)
dest_event)->data_size +
sizeof(EVENT_HEADER);
1044 if (ebch[j].trigger_mask) {
1047 cm_msg(MERROR,
"source_scan",
1048 "compose fragment:%d current size:%d (%d)", j, act_size, status);
1056 act_size = ((EVENT_HEADER *)
dest_event)->data_size +
sizeof(EVENT_HEADER);
1060 if (status != BM_SUCCESS) {
1062 printf(
"rpc_send_event returned error %d, event_size %d\n",
1064 cm_msg(MERROR,
"EBuilder",
"%s: rpc_send_event returned error %d",
full_frontend_name, status);
1092 int main(
unsigned int argc,
char **argv)
1096 BOOL daemon =
FALSE;
1099 memset(&ebch[0], 0,
sizeof(ebch));
1105 for (i = 1; i < argc; i++) {
1106 if (argv[i][0] ==
'-' && argv[i][1] ==
'd') {
1110 else if (argv[i][0] ==
'-' && argv[i][1] ==
'D')
1112 else if (argv[i][0] ==
'-' && argv[i][1] ==
'w')
1114 else if (argv[i][0] ==
'-') {
1115 if (i + 1 >= argc || argv[i + 1][0] ==
'-')
1117 if (strncmp(argv[i],
"-e", 2) == 0)
1119 else if (strncmp(argv[i],
"-h", 2) == 0)
1121 else if (strncmp(argv[i],
"-b", 2) == 0)
1125 printf(
"usage: mevb [-h <Hostname>] [-e <Experiment>] -b <buffername> [-d debug]\n");
1126 printf(
" -w show wheel -D to start as a daemon\n\n");
1131 printf(
"Program mevb version 5 started\n\n");
1133 printf(
"Becoming a daemon...\n");
1134 ss_daemon_init(
FALSE);
1139 printf(
"Buffer name must be specified with -b argument\n");
1148 if (status != CM_SUCCESS) {
1154 cm_set_watchdog_params(
TRUE, 0);
1157 status = cm_get_experiment_database(&
hDB, &
hKey);
1165 if (status == CM_SUCCESS) {
1167 cm_disconnect_experiment();
1172 cm_disconnect_experiment();
1193 if (cm_register_transition(TR_START,
tr_start, 300) != CM_SUCCESS)
1195 if (cm_register_transition(TR_STOP,
tr_stop, 700) != CM_SUCCESS)
1213 cm_disconnect_experiment();