/* ----------------------------------------------------------------------------- * * (c) The GHC Team, 2008-2009 * * Support for fast binary event logging. * * ---------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" #ifdef TRACING #include "Trace.h" #include "Capability.h" #include "RtsUtils.h" #include "Stats.h" #include "EventLog.h" #include #include static char *event_log_filename = NULL; // File for logging events FILE *event_log_file = NULL; #define EVENT_LOG_SIZE 2 * (1024 * 1024) // 2MB static int flushCount; // Struct for record keeping of buffer to store event types and events. typedef struct _EventsBuf { StgInt8 *begin; StgInt8 *pos; StgInt8 *marker; StgWord64 size; EventCapNo capno; // which capability this buffer belongs to, or -1 } EventsBuf; EventsBuf *capEventBuf; // one EventsBuf for each Capability EventsBuf eventBuf; // an EventsBuf not associated with any Capability #ifdef THREADED_RTS Mutex eventBufMutex; // protected by this mutex #endif char *EventDesc[] = { [EVENT_CREATE_THREAD] = "Create thread", [EVENT_RUN_THREAD] = "Run thread", [EVENT_STOP_THREAD] = "Stop thread", [EVENT_THREAD_RUNNABLE] = "Thread runnable", [EVENT_MIGRATE_THREAD] = "Migrate thread", [EVENT_RUN_SPARK] = "Run spark", [EVENT_STEAL_SPARK] = "Steal spark", [EVENT_SHUTDOWN] = "Shutdown", [EVENT_THREAD_WAKEUP] = "Wakeup thread", [EVENT_GC_START] = "Starting GC", [EVENT_GC_END] = "Finished GC", [EVENT_REQUEST_SEQ_GC] = "Request sequential GC", [EVENT_REQUEST_PAR_GC] = "Request parallel GC", [EVENT_CREATE_SPARK_THREAD] = "Create spark thread", [EVENT_LOG_MSG] = "Log message", [EVENT_USER_MSG] = "User message", [EVENT_STARTUP] = "Startup", [EVENT_GC_IDLE] = "GC idle", [EVENT_GC_WORK] = "GC working", [EVENT_GC_DONE] = "GC done", [EVENT_BLOCK_MARKER] = "Block marker" }; // Event type. typedef struct _EventType { EventTypeNum etNum; // Event Type number. nat size; // size of the payload in bytes char *desc; // Description } EventType; EventType eventTypes[NUM_EVENT_TAGS]; static void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno); static void resetEventsBuf(EventsBuf* eb); static void printAndClearEventBuf (EventsBuf *eventsBuf); static void postEventType(EventsBuf *eb, EventType *et); static void postLogMsg(EventsBuf *eb, EventTypeNum type, char *msg, va_list ap); static void postBlockMarker(EventsBuf *eb); static void closeBlockMarker(EventsBuf *ebuf); static StgBool hasRoomForEvent(EventsBuf *eb, EventTypeNum eNum); static StgBool hasRoomForVariableEvent(EventsBuf *eb, nat payload_bytes); static inline void postWord8(EventsBuf *eb, StgWord8 i) { *(eb->pos++) = i; } static inline void postWord16(EventsBuf *eb, StgWord16 i) { postWord8(eb, (StgWord8)(i >> 8)); postWord8(eb, (StgWord8)i); } static inline void postWord32(EventsBuf *eb, StgWord32 i) { postWord16(eb, (StgWord16)(i >> 16)); postWord16(eb, (StgWord16)i); } static inline void postWord64(EventsBuf *eb, StgWord64 i) { postWord32(eb, (StgWord32)(i >> 32)); postWord32(eb, (StgWord32)i); } static inline void postBuf(EventsBuf *eb, StgWord8 *buf, nat size) { memcpy(eb->pos, buf, size); eb->pos += size; } static inline StgWord64 time_ns(void) { return stat_getElapsedTime() * (1000000000LL/TICKS_PER_SECOND); } static inline void postEventTypeNum(EventsBuf *eb, EventTypeNum etNum) { postWord16(eb, etNum); } static inline void postTimestamp(EventsBuf *eb) { postWord64(eb, time_ns()); } static inline void postThreadID(EventsBuf *eb, EventThreadID id) { postWord32(eb,id); } static inline void postCapNo(EventsBuf *eb, EventCapNo no) { postWord16(eb,no); } static inline void postPayloadSize(EventsBuf *eb, EventPayloadSize size) { postWord16(eb,size); } static inline void postEventHeader(EventsBuf *eb, EventTypeNum type) { postEventTypeNum(eb, type); postTimestamp(eb); } static inline void postInt8(EventsBuf *eb, StgInt8 i) { postWord8(eb, (StgWord8)i); } static inline void postInt16(EventsBuf *eb, StgInt16 i) { postWord16(eb, (StgWord16)i); } static inline void postInt32(EventsBuf *eb, StgInt32 i) { postWord32(eb, (StgWord32)i); } static inline void postInt64(EventsBuf *eb, StgInt64 i) { postWord64(eb, (StgWord64)i); } void initEventLogging(void) { StgWord8 t, c; nat n_caps; event_log_filename = stgMallocBytes(strlen(prog_name) + 10, "initEventLogging"); if (sizeof(EventDesc) / sizeof(char*) != NUM_EVENT_TAGS) { barf("EventDesc array has the wrong number of elements"); } sprintf(event_log_filename, "%s.eventlog", prog_name); /* Open event log file for writing. */ if ((event_log_file = fopen(event_log_filename, "wb")) == NULL) { sysErrorBelch("initEventLogging: can't open %s", event_log_filename); stg_exit(EXIT_FAILURE); } /* * Allocate buffer(s) to store events. * Create buffer large enough for the header begin marker, all event * types, and header end marker to prevent checking if buffer has room * for each of these steps, and remove the need to flush the buffer to * disk during initialization. * * Use a single buffer to store the header with event types, then flush * the buffer so all buffers are empty for writing events. */ #ifdef THREADED_RTS // XXX n_capabilities hasn't been initislised yet n_caps = RtsFlags.ParFlags.nNodes; #else n_caps = 1; #endif capEventBuf = stgMallocBytes(n_caps * sizeof(EventsBuf),"initEventLogging"); for (c = 0; c < n_caps; ++c) { // Init buffer for events. initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c); } initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1)); // Write in buffer: the header begin marker. postInt32(&eventBuf, EVENT_HEADER_BEGIN); // Mark beginning of event types in the header. postInt32(&eventBuf, EVENT_HET_BEGIN); for (t = 0; t < NUM_EVENT_TAGS; ++t) { eventTypes[t].etNum = t; eventTypes[t].desc = EventDesc[t]; switch (t) { case EVENT_CREATE_THREAD: // (cap, thread) case EVENT_RUN_THREAD: // (cap, thread) case EVENT_THREAD_RUNNABLE: // (cap, thread) case EVENT_RUN_SPARK: // (cap, thread) case EVENT_CREATE_SPARK_THREAD: // (cap, spark_thread) eventTypes[t].size = sizeof(EventThreadID); break; case EVENT_MIGRATE_THREAD: // (cap, thread, new_cap) case EVENT_STEAL_SPARK: // (cap, thread, victim_cap) case EVENT_THREAD_WAKEUP: // (cap, thread, other_cap) eventTypes[t].size = sizeof(EventThreadID) + sizeof(EventCapNo); break; case EVENT_STOP_THREAD: // (cap, thread, status) eventTypes[t].size = sizeof(EventThreadID) + sizeof(StgWord16); break; case EVENT_SHUTDOWN: // (cap) case EVENT_REQUEST_SEQ_GC: // (cap) case EVENT_REQUEST_PAR_GC: // (cap) case EVENT_GC_START: // (cap) case EVENT_GC_END: // (cap) case EVENT_STARTUP: case EVENT_GC_IDLE: case EVENT_GC_WORK: case EVENT_GC_DONE: eventTypes[t].size = 0; break; case EVENT_LOG_MSG: // (msg) case EVENT_USER_MSG: // (msg) eventTypes[t].size = 0xffff; break; case EVENT_BLOCK_MARKER: eventTypes[t].size = sizeof(StgWord32) + sizeof(EventTimestamp) + sizeof(EventCapNo); break; default: continue; /* ignore deprecated events */ } // Write in buffer: the start event type. postEventType(&eventBuf, &eventTypes[t]); } // Mark end of event types in the header. postInt32(&eventBuf, EVENT_HET_END); // Write in buffer: the header end marker. postInt32(&eventBuf, EVENT_HEADER_END); // Prepare event buffer for events (data). postInt32(&eventBuf, EVENT_DATA_BEGIN); // Post a STARTUP event with the number of capabilities postEventHeader(&eventBuf, EVENT_STARTUP); postCapNo(&eventBuf, n_caps); // Flush capEventBuf with header. /* * Flush header and data begin marker to the file, thus preparing the * file to have events written to it. */ printAndClearEventBuf(&eventBuf); for (c = 0; c < n_caps; ++c) { postBlockMarker(&capEventBuf[c]); } #ifdef THREADED_RTS initMutex(&eventBufMutex); #endif } void endEventLogging(void) { nat c; // Flush all events remaining in the buffers. for (c = 0; c < n_capabilities; ++c) { printAndClearEventBuf(&capEventBuf[c]); } printAndClearEventBuf(&eventBuf); resetEventsBuf(&eventBuf); // we don't want the block marker // Mark end of events (data). postEventTypeNum(&eventBuf, EVENT_DATA_END); // Flush the end of data marker. printAndClearEventBuf(&eventBuf); if (event_log_file != NULL) { fclose(event_log_file); } } void freeEventLogging(void) { StgWord8 c; // Free events buffer. for (c = 0; c < n_capabilities; ++c) { if (capEventBuf[c].begin != NULL) stgFree(capEventBuf[c].begin); } if (capEventBuf != NULL) { stgFree(capEventBuf); } if (event_log_filename != NULL) { stgFree(event_log_filename); } } /* * Post an event message to the capability's eventlog buffer. * If the buffer is full, prints out the buffer and clears it. */ void postSchedEvent (Capability *cap, EventTypeNum tag, StgThreadID thread, StgWord64 other) { EventsBuf *eb; eb = &capEventBuf[cap->no]; if (!hasRoomForEvent(eb, tag)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, tag); switch (tag) { case EVENT_CREATE_THREAD: // (cap, thread) case EVENT_RUN_THREAD: // (cap, thread) case EVENT_THREAD_RUNNABLE: // (cap, thread) case EVENT_RUN_SPARK: // (cap, thread) { postThreadID(eb,thread); break; } case EVENT_CREATE_SPARK_THREAD: // (cap, spark_thread) { postThreadID(eb,other /* spark_thread */); break; } case EVENT_MIGRATE_THREAD: // (cap, thread, new_cap) case EVENT_STEAL_SPARK: // (cap, thread, victim_cap) case EVENT_THREAD_WAKEUP: // (cap, thread, other_cap) { postThreadID(eb,thread); postCapNo(eb,other /* new_cap | victim_cap | other_cap */); break; } case EVENT_STOP_THREAD: // (cap, thread, status) { postThreadID(eb,thread); postWord16(eb,other /* status */); break; } case EVENT_SHUTDOWN: // (cap) case EVENT_REQUEST_SEQ_GC: // (cap) case EVENT_REQUEST_PAR_GC: // (cap) case EVENT_GC_START: // (cap) case EVENT_GC_END: // (cap) { break; } default: barf("postEvent: unknown event tag %d", tag); } } void postEvent (Capability *cap, EventTypeNum tag) { EventsBuf *eb; eb = &capEventBuf[cap->no]; if (!hasRoomForEvent(eb, tag)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, tag); } #define BUF 512 void postLogMsg(EventsBuf *eb, EventTypeNum type, char *msg, va_list ap) { char buf[BUF]; nat size; size = vsnprintf(buf,BUF,msg,ap); if (size > BUF) { buf[BUF-1] = '\0'; size = BUF; } if (!hasRoomForVariableEvent(eb, size)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, type); postPayloadSize(eb, size); postBuf(eb,(StgWord8*)buf,size); } void postMsg(char *msg, va_list ap) { ACQUIRE_LOCK(&eventBufMutex); postLogMsg(&eventBuf, EVENT_LOG_MSG, msg, ap); RELEASE_LOCK(&eventBufMutex); } void postCapMsg(Capability *cap, char *msg, va_list ap) { postLogMsg(&capEventBuf[cap->no], EVENT_LOG_MSG, msg, ap); } void postUserMsg(Capability *cap, char *msg, va_list ap) { postLogMsg(&capEventBuf[cap->no], EVENT_USER_MSG, msg, ap); } void closeBlockMarker (EventsBuf *ebuf) { StgInt8* save_pos; if (ebuf->marker) { // (type:16, time:64, size:32, end_time:64) save_pos = ebuf->pos; ebuf->pos = ebuf->marker + sizeof(EventTypeNum) + sizeof(EventTimestamp); postWord32(ebuf, save_pos - ebuf->marker); postTimestamp(ebuf); ebuf->pos = save_pos; ebuf->marker = NULL; } } void postBlockMarker (EventsBuf *eb) { if (!hasRoomForEvent(eb, EVENT_BLOCK_MARKER)) { printAndClearEventBuf(eb); } closeBlockMarker(eb); eb->marker = eb->pos; postEventHeader(eb, EVENT_BLOCK_MARKER); postWord32(eb,0); // these get filled in later by closeBlockMarker(); postWord64(eb,0); postCapNo(eb, eb->capno); } void printAndClearEventBuf (EventsBuf *ebuf) { StgWord64 numBytes = 0, written = 0; closeBlockMarker(ebuf); if (ebuf->begin != NULL && ebuf->pos != ebuf->begin) { numBytes = ebuf->pos - ebuf->begin; written = fwrite(ebuf->begin, 1, numBytes, event_log_file); if (written != numBytes) { debugBelch( "printAndClearEventLog: fwrite() failed, written=%" FMT_Word64 " doesn't match numBytes=%" FMT_Word64, written, numBytes); return; } resetEventsBuf(ebuf); flushCount++; postBlockMarker(ebuf); } } void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno) { eb->begin = eb->pos = stgMallocBytes(size, "initEventsBuf"); eb->size = size; eb->marker = NULL; eb->capno = capno; } void resetEventsBuf(EventsBuf* eb) { eb->pos = eb->begin; eb->marker = NULL; } StgBool hasRoomForEvent(EventsBuf *eb, EventTypeNum eNum) { nat size; size = sizeof(EventTypeNum) + sizeof(EventTimestamp) + eventTypes[eNum].size; if (eb->pos + size > eb->begin + eb->size) { return 0; // Not enough space. } else { return 1; // Buf has enough space for the event. } } StgBool hasRoomForVariableEvent(EventsBuf *eb, nat payload_bytes) { nat size; size = sizeof(EventTypeNum) + sizeof(EventTimestamp) + sizeof(EventPayloadSize) + payload_bytes; if (eb->pos + size > eb->begin + eb->size) { return 0; // Not enough space. } else { return 1; // Buf has enough space for the event. } } void postEventType(EventsBuf *eb, EventType *et) { StgWord8 d; nat desclen; postInt32(eb, EVENT_ET_BEGIN); postEventTypeNum(eb, et->etNum); postWord16(eb, (StgWord16)et->size); desclen = strlen(et->desc); postWord32(eb, desclen); for (d = 0; d < desclen; ++d) { postInt8(eb, (StgInt8)et->desc[d]); } postWord32(eb, 0); // no extensions yet postInt32(eb, EVENT_ET_END); } #endif /* TRACING */