74 #define SERVER_CACHE_SIZE 100000
76 #define ODB_UPDATE_TIME 1000
78 #define DEFAULT_FE_TIMEOUT 60000
135 #define EQUIPMENT_COMMON_STR "\
136 Event ID = WORD : 0\n\
137 Trigger mask = WORD : 0\n\
138 Buffer = STRING : [32] SYSTEM\n\
141 Format = STRING : [8] FIXED\n\
142 Enabled = BOOL : 0\n\
145 Event limit = DOUBLE : 0\n\
146 Num subevents = DWORD : 0\n\
147 Log history = INT : 0\n\
148 Frontend host = STRING : [32] \n\
149 Frontend name = STRING : [32] \n\
150 Frontend file name = STRING : [256] \n\
153 #define EQUIPMENT_STATISTICS_STR "\
154 Events sent = DOUBLE : 0\n\
155 Events per sec. = DOUBLE : 0\n\
156 kBytes per sec. = DOUBLE : 0\n\
164 EQUIPMENT_INFO *eq_info;
165 EQUIPMENT_STATS *eq_stats;
174 status = db_get_value(
hDB, 0,
"/Runinfo/Run number", &
run_number, &size, TID_INT,
TRUE);
178 for (index = 0;
equipment[index].name[0]; index++) {
182 if (eq_info->event_id == 0) {
184 cm_disconnect_experiment();
195 if (eq_info->eq_type != EQ_SLOW) {
196 db_find_key(
hDB, 0, str, &hKey);
197 size =
sizeof(double);
199 db_get_value(
hDB, hKey,
"Event limit", &eq_info->event_limit, &size,
205 if (status != DB_SUCCESS) {
206 printf(
"Cannot check equipment record, status = %d\n", status);
209 db_find_key(
hDB, 0, str, &hKey);
211 if (equal_ustring(eq_info->format,
"YBOS"))
213 else if (equal_ustring(eq_info->format,
"FIXED"))
218 gethostname(eq_info->frontend_host,
sizeof(eq_info->frontend_host));
223 db_set_record(
hDB, hKey, eq_info,
sizeof(EQUIPMENT_INFO), 0);
226 size =
sizeof(EQUIPMENT_INFO);
227 db_get_record(
hDB, hKey, eq_info, &size, 0);
231 db_create_key(
hDB, 0, str, TID_KEY);
232 db_find_key(
hDB, 0, str, &hKey);
239 if (status != DB_SUCCESS) {
240 printf(
"Cannot create/check statistics record, error %d\n", status);
244 status = db_find_key(
hDB, 0, str, &hKey);
245 if (status != DB_SUCCESS) {
246 printf(
"Cannot find statistics record, error %d\n", status);
250 eq_stats->events_sent = 0;
251 eq_stats->events_per_sec = 0;
252 eq_stats->kbytes_per_sec = 0;
255 status = db_open_record(
hDB, hKey, eq_stats,
sizeof(EQUIPMENT_STATS)
256 , MODE_WRITE, NULL, NULL);
257 if (status != DB_SUCCESS) {
258 cm_msg(MERROR,
"register_equipment",
259 "Cannot open statistics record, error %d. Probably other FE is using it",
265 if (eq_info->buffer[0]) {
266 status = bm_open_buffer(eq_info->buffer, EVENT_BUFFER_SIZE,
268 if (status != BM_SUCCESS && status != BM_CREATED) {
269 cm_msg(MERROR,
"register_equipment",
270 "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
271 and rebuild the system.");
278 cm_msg(MERROR,
"register_equipment",
"Destination buffer must be present");
291 EQUIPMENT_INFO *eq_info;
293 char buffer[NAME_LENGTH];
300 if (db_find_key(
hDB, 0,
"Equipment", &hEqKey) != DB_SUCCESS) {
301 cm_msg(MINFO,
"load_fragment",
"Equipment listing not found");
307 db_enum_key(
hDB, hEqKey, i, &hSubkey);
310 db_get_key(
hDB, hSubkey, &key);
311 if (key.type == TID_KEY) {
313 if (
debug)
printf(
"Equipment name:%s\n", key.name);
316 db_get_value(
hDB, hSubkey,
"common/type", &type, &size, TID_INT, 0);
317 size =
sizeof(buffer);
318 db_get_value(
hDB, hSubkey,
"common/Buffer", buffer, &size, TID_STRING, 0);
319 size =
sizeof(format);
320 db_get_value(
hDB, hSubkey,
"common/Format", format, &size, TID_STRING, 0);
324 && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
329 db_get_value(
hDB, hSubkey,
"common/Trigger Mask", &ebch[
nfragment].trigger_mask, &size, TID_WORD, 0);
342 else if (
equipment[0].format == FORMAT_YBOS)
345 cm_msg(MERROR,
"mevb",
"Unknown data format :%d", format);
353 cm_msg(MERROR,
"EBuilder",
"%s: Not enough memory for event buffer",
full_frontend_name);
364 EQUIPMENT_INFO *eq_info;
375 status = cm_yield(500);
384 case BM_ASYNC_RETURN:
385 for (fragn = 0; fragn <
nfragment; fragn++) {
388 if (
debug)
printf(
"Stop requested on timeout %d\n", status);
395 eq->stats.events_sent);
397 status = cm_yield(50);
408 cm_msg(MTALK,
"EBuilder",
"%s: Error signaled by user code - stopping run...",
full_frontend_name);
410 cm_msg(MTALK,
"EBuilder",
"%s: Event mismatch - Stopping run...",
full_frontend_name);
411 if (cm_transition(TR_STOP, 0, NULL, 0, ASYNC, 0) != CM_SUCCESS) {
415 if (
debug)
printf(
"Stop requested on Error %d\n", status);
425 cm_msg(MERROR,
"Source_scan",
"unexpected return %d", status);
436 bm_flush_cache(
equipment[0].buffer_handle, ASYNC);
438 status = cm_yield(10);
441 eq->stats.events_sent += eq->events_sent;
442 eq->stats.events_per_sec =
444 eq->stats.kbytes_per_sec =
450 db_send_changed_records();
454 }
while (status != RPC_SHUTDOWN && status != SS_ABORT);
462 BANK_HEADER *
psbh, *pdbh;
463 char *psdata, *pddata;
467 *size = ((EVENT_HEADER *) pdest)->data_size;
470 pddata = pdest + *size +
sizeof(EVENT_HEADER);
476 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
477 bk_swap(psbh,
FALSE);
480 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
481 psdata = (
char *) (psbh + 1);
484 bksize = psbh->data_size;
487 memcpy(pddata, psdata, bksize);
490 ((EVENT_HEADER *) pdest)->data_size += bksize;
493 pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
494 pdbh->data_size += bksize;
496 *size = ((EVENT_HEADER *) pdest)->data_size;
500 *size = ((EVENT_HEADER *) psrce)->data_size;
503 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
504 bk_swap(psbh,
FALSE);
507 memcpy(pddata, psbh, *size);
510 ((EVENT_HEADER *) pdest)->data_size = *
size;
522 char *psdata, *pddata;
523 DWORD *pslrl, *pdlrl;
524 INT i4frgsize, i1frgsize,
status;
527 *size = ((EVENT_HEADER *) pdest)->data_size;
531 pddata = pdest + *size +
sizeof(EVENT_HEADER);
537 pslrl = (
DWORD *) (((EVENT_HEADER *) psrce) + 1);
543 psdata = (
char *) (pslrl + 1);
546 i4frgsize = (*pslrl);
547 i1frgsize = 4 * i4frgsize;
550 memcpy(pddata, psdata, i1frgsize);
553 ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
556 pdlrl = (
DWORD *) (((EVENT_HEADER *) pdest) + 1);
560 *size = ((EVENT_HEADER *) pdest)->data_size;
569 pslrl = (
DWORD *) (((EVENT_HEADER *) psrce) + 1);
575 *size = ((EVENT_HEADER *) psrce)->data_size;
578 memcpy(pddata, (
char *) pslrl, *size);
581 ((EVENT_HEADER *) pdest)->data_size += *
size;
594 HNDLE
hKey, hEqkey, hEqFRkey;
595 EQUIPMENT_INFO *eq_info;
602 status = db_find_key(
hDB, 0, str, &hKey);
603 size =
sizeof(EQUIPMENT_INFO);
604 db_get_record(
hDB, hKey, eq_info, &size, 0);
618 if (db_find_key(
hDB, 0, str, &hEqkey) != DB_SUCCESS) {
619 status = db_create_record(
hDB, 0, str, strcomb(ebuilder_str));
624 if (db_find_key(
hDB, 0, str, &hEqkey) != DB_SUCCESS) {
625 cm_msg(MINFO,
"load_fragment",
"/Equipment/%s/Settings not found",
equipment[0].name);
630 status = db_get_value(
hDB, hEqkey,
"User Field", ebset.
user_field, &size, TID_STRING,
TRUE);
634 status = db_get_value(
hDB, hEqkey,
"User Build", &ebset.
user_build, &size, TID_BOOL,
TRUE);
638 status = db_set_value(
hDB, hEqkey,
"Number of Fragment", &ebset.
nfragment, size, 1, TID_INT);
641 status = db_find_key(
hDB, hEqkey,
"Fragment Required", &hEqFRkey);
642 status = db_get_key (
hDB, hEqFRkey, &key);
644 cm_msg(MINFO,
"mevb",
"Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.
nfragment);
650 status = db_set_value(
hDB, hEqkey,
"Fragment Required", ebset.
preqfrag, size, ebset.
nfragment, TID_BOOL);
652 size = key.total_size;
655 status = db_get_data(
hDB, hEqkey, ebset.
preqfrag, &size, TID_BOOL);
666 cm_msg(MERROR,
"eb_prestart",
"run start aborted due to eb_begin_of_run (%d)",
704 for (i = 0; i < nfrag; i++) {
705 if (ebch[i].pfragment) {
706 free(ebch[i].pfragment);
720 printf(
"Hand flushing system buffer... \n");
724 status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
727 "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d Last Ser:%d",
728 i, ebch[i].hBuf, status,
729 ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
732 }
while (status == BM_SUCCESS);
736 status = bm_empty_buffers();
737 if (status != BM_SUCCESS)
738 cm_msg(MERROR,
"handFlush",
"bm_empty_buffers failure [%d]", status);
747 INT j,
i,
status, status1, status2;
750 printf(
"Entering booking\n");
757 status1 = bm_open_buffer(ebch[i].buffer, EVENT_BUFFER_SIZE, &(ebch[i].hBuf));
760 printf(
"bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
761 i, ebch[i].buffer, ebch[i].hBuf, status1);
764 bm_request_event(ebch[i].hBuf, ebch[i].
event_id,
765 ebch[i].trigger_mask, GET_ALL, &ebch[i].req_id, NULL);
767 printf(
"bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
768 i, ebch[i].
event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
769 if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
770 ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
771 cm_msg(MERROR,
"source_booking",
772 "Open buffer/event request failure [%d %d %d]", i, status1, status2);
777 if (ebch[i].pfragment)
778 free(ebch[i].pfragment);
781 printf(
"malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
782 if (ebch[i].pfragment == NULL) {
784 cm_msg(MERROR,
"source_booking",
"Can't allocate space for buffer");
791 status = bm_empty_buffers();
792 if (status != BM_SUCCESS) {
793 cm_msg(MERROR,
"source_booking",
"bm_empty_buffers failure [%d]", status);
798 printf(
"bm_empty_buffers stat:%d\n", status);
800 printf(
" buff:%s", ebch[j].buffer);
801 printf(
" ser#:%d", ebch[j].serial);
802 printf(
" hbuf:%2d", ebch[j].hBuf);
803 printf(
" rqid:%2d", ebch[j].req_id);
804 printf(
" opst:%d", status1);
805 printf(
" rqst:%d", status2);
807 printf(
" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
821 if (ebch[0].pfragment == NULL)
829 if (ebch[i].set.emask) {
831 status = bm_delete_request(ebch[i].req_id);
833 printf(
"unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id,
837 status = bm_close_buffer(ebch[i].hBuf);
839 printf(
"unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf,
841 if (status != BM_SUCCESS) {
842 cm_msg(MERROR,
"source_unbooking",
"Close buffer[%d] stat:", i, status);
864 bm_flush_cache(
equipment[0].buffer_handle, SYNC);
874 sprintf(error,
"Run %d Stop after %1.0lf events sent DT:%d[ms]",
876 cm_msg(MINFO,
"EBuilder",
"%s", error);
907 static char bars[] =
"|/-\\";
925 status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
931 ebch[
i].
serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
940 psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
941 bk_swap(psbh,
FALSE);
946 printf(
"SUCC: ch:%d ser:%d rec:%d sz:%d\n", i,
947 ebch[i].serial, ebset.
received[i], size);
950 case BM_ASYNC_RETURN:
953 printf(
"ASYNC: ch:%d ser:%d rec:%d sz:%d\n", i,
954 ebch[i].serial, ebset.
received[i], size);
958 cm_msg(MERROR,
"event_scan",
"bm_receive_event error %d", status);
971 if (i == nfragment) {
974 found = event_mismatch =
FALSE;
983 event_mismatch =
TRUE;
989 if (event_mismatch &&
debug) {
992 strcpy(str,
"event mismatch: ");
994 sprintf(strsub,
"Ser[%d]:%d ", j, ebch[j].serial);
997 printf(
"event serial mismatch %s\n", str);
1006 bm_compose_event((EVENT_HEADER *)
dest_event, eq_info->event_id, eq_info->trigger_mask,
1007 act_size, ebch[0].
serial);
1010 status =
eb_user(nfragment, event_mismatch, ebch
1026 if (ebch[j].set.emask) {
1029 cm_msg(MERROR,
"source_scan",
1030 "compose fragment:%d current size:%d (%d)", j, act_size, status);
1038 act_size = ((EVENT_HEADER *)
dest_event)->data_size +
sizeof(EVENT_HEADER);
1042 if (status != BM_SUCCESS) {
1044 printf(
"rpc_send_event returned error %d, event_size %d\n",
1046 cm_msg(MERROR,
"EBuilder",
"%s: rpc_send_event returned error %d",
full_frontend_name, status);
1067 int main(
unsigned int argc,
char **argv)
1071 BOOL daemon =
FALSE;
1074 memset(&ebch[0], 0,
sizeof(ebch));
1080 for (i = 1; i < argc; i++) {
1081 if (argv[i][0] ==
'-' && argv[i][1] ==
'd')
1083 else if (argv[i][0] ==
'-' && argv[i][1] ==
'D')
1085 else if (argv[i][0] ==
'-' && argv[i][1] ==
'w')
1087 else if (argv[i][0] ==
'-') {
1088 if (i + 1 >= argc || argv[i + 1][0] ==
'-')
1090 if (strncmp(argv[i],
"-e", 2) == 0)
1092 else if (strncmp(argv[i],
"-h", 2) == 0)
1094 else if (strncmp(argv[i],
"-b", 2) == 0)
1098 printf(
"usage: mevb [-h <Hostname>] [-e <Experiment>] -b <buffername> [-d debug]\n");
1099 printf(
" -w show wheel -D to start as a daemon\n\n");
1104 printf(
"Program mevb version 5 started\n\n");
1106 printf(
"Becoming a daemon...\n");
1107 ss_daemon_init(
FALSE);
1112 printf(
"Buffer name must be specified with -b argument\n");
1121 if (status != CM_SUCCESS) {
1127 cm_set_watchdog_params(
TRUE, 0);
1130 status = cm_get_experiment_database(&
hDB, &
hKey);
1138 if (status == CM_SUCCESS) {
1140 cm_disconnect_experiment();
1145 cm_disconnect_experiment();
1166 if (cm_register_transition(TR_START,
tr_start, 300) != CM_SUCCESS)
1168 if (cm_register_transition(TR_STOP,
tr_stop, 700) != CM_SUCCESS)
1186 cm_disconnect_experiment();