/* Time-stamp: <2009-07-06 21:48:36 simonmar> Variables and functions specific to GranSim the parallelism simulator for GPH. */ //@node GranSim specific code, , , //@section GranSim specific code /* Macros for dealing with the new and improved GA field for simulating parallel execution. Based on @CONCURRENT@ package. The GA field now contains a mask, where the n-th bit stands for the n-th processor, where this data can be found. In case of multiple copies, several bits are set. The total number of processors is bounded by @MAX_PROC@, which should be <= the length of a word in bits. -- HWL */ //@menu //* Includes:: //* Prototypes and externs:: //* Constants and Variables:: //* Initialisation:: //* Global Address Operations:: //* Global Event Queue:: //* Spark queue functions:: //* Scheduling functions:: //* Thread Queue routines:: //* GranSim functions:: //* GranSimLight routines:: //* Code for Fetching Nodes:: //* Idle PEs:: //* Routines directly called from Haskell world:: //* Emiting profiling info for GrAnSim:: //* Dumping routines:: //* Index:: //@end menu //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code //@subsection Includes #if defined(GRAN) #include "Rts.h" #include "RtsFlags.h" #include "RtsUtils.h" #include "StgMiscClosures.h" #include "StgTypes.h" #include "Storage.h" // for recordMutable #include "Schedule.h" #include "SchedAPI.h" // for pushClosure #include "GranSimRts.h" #include "GranSim.h" #include "ParallelRts.h" #include "ParallelDebug.h" #include "Sparks.h" //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code //@subsection Prototypes and externs /* Prototypes */ static inline PEs ga_to_proc(StgWord); static inline rtsBool any_idle(void); static inline nat idlers(void); PEs where_is(StgClosure *node); static rtsBool stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread); static rtsBool stealSpark(PEs proc); static rtsBool stealThread(PEs proc); static rtsBool stealSparkMagic(PEs proc); static rtsBool stealThreadMagic(PEs proc); /* subsumed by stealSomething static void stealThread(PEs proc); static void stealSpark(PEs proc); */ static rtsTime sparkStealTime(void); static nat natRandom(nat from, nat to); static PEs findRandomPE(PEs proc); static void sortPEsByTime (PEs proc, PEs *pes_by_time, nat *firstp, nat *np); void GetRoots(void); #endif /* GRAN */ //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code //@subsection Constants and Variables #if defined(GRAN) || defined(PAR) /* See GranSim.h for the definition of the enum gran_event_types */ char *gran_event_names[] = { "START", "START(Q)", "STEALING", "STOLEN", "STOLEN(Q)", "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)", "SCHEDULE", "DESCHEDULE", "END", "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED", "ALLOC", "TERMINATE", "SYSTEM_START", "SYSTEM_END", /* only for debugging */ "??" }; #endif #if defined(GRAN) /* whole file */ char *proc_status_names[] = { "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy", "UnknownProcStatus" }; /* For internal use (event statistics) only */ char *event_names[] = { "ContinueThread", "StartThread", "ResumeThread", "MoveSpark", "MoveThread", "FindWork", "FetchNode", "FetchReply", "GlobalBlock", "UnblockThread" }; //@cindex CurrentProc PEs CurrentProc = 0; /* ToDo: Create a structure for the processor status and put all the arrays below into it. -- HWL */ //@cindex CurrentTime /* One clock for each PE */ rtsTime CurrentTime[MAX_PROC]; /* Useful to restrict communication; cf fishing model in GUM */ nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC]; /* Status of each PE (new since but independent of GranSim Light) */ rtsProcStatus procStatus[MAX_PROC]; # if defined(GRAN) && defined(GRAN_CHECK) /* To check if the RTS ever tries to run a thread that should be blocked because of fetching remote data */ StgTSO *BlockedOnFetch[MAX_PROC]; # define FETCH_MASK_TSO 0x08000000 /* only bits 0, 1, 2 should be used */ # endif nat SparksAvail = 0; /* How many sparks are available */ nat SurplusThreads = 0; /* How many excess threads are there */ /* Do we need to reschedule following a fetch? */ rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse; rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */ //@cindex spark queue /* GranSim: a globally visible array of spark queues */ rtsSparkQ pending_sparks_hds[MAX_PROC]; rtsSparkQ pending_sparks_tls[MAX_PROC]; nat sparksIgnored = 0, sparksCreated = 0; GlobalGranStats globalGranStats; nat gran_arith_cost, gran_branch_cost, gran_load_cost, gran_store_cost, gran_float_cost; /* Old comment from 0.29. ToDo: Check and update -- HWL The following variables control the behaviour of GrAnSim. In general, there is one RTS option for enabling each of these features. In getting the desired setup of GranSim the following questions have to be answered: \begin{itemize} \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)? Currently only unfair scheduling is supported. \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)? Either block and wait for the data or reschedule and do some other work. Thus, if this variable is true, asynchronous communication is modelled. Block on fetch mainly makes sense for incremental fetching. There is also a simplified fetch variant available (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model communication. It is faster but the results will be less accurate. \item How aggressive to be in getting work after a reschedule on fetch (@RtsFlags.GranFlags.FetchStrategy@)? This is determined by the so-called {\em fetching strategy\/}. Currently, there are four possibilities: \begin{enumerate} \item Only run a runnable thread. \item Turn a spark into a thread, if necessary. \item Steal a remote spark, if necessary. \item Steal a runnable thread from another processor, if necessary. \end{itemize} The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list when rescheduling on a fetch. \item Should sparks or threads be stolen first when looking for work (@RtsFlags.GranFlags.DoStealThreadsFirst@)? The default is to steal sparks first (much cheaper). \item Should the RTS use a lazy thread creation scheme (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)? By default yes i.e.\ sparks are only turned into threads when work is needed. Also note, that sparks can be discarded by the RTS (this is done in the case of an overflow of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@ to @True@ forces the creation of threads at the next possibility (i.e.\ when new work is demanded the next time). \item Should data be fetched closure-by-closure or in packets (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental (i.e.\ closure-by-closure) strategy. This makes sense in a low-latency setting but is bad in a high-latency system. Setting @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other parameters determine the size of the packets (@pack_buffer_size@) and the number of thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@). \item If there is no other possibility to find work, should runnable threads be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the RTS tried to get sparks (either local or remote ones) first. Thread migration is very expensive, since a whole TSO has to be transferred and probably data locality becomes worse in the process. Note, that the closure, which will be evaluated next by that TSO is not transferred together with the TSO (that might block another thread). \item Should the RTS distinguish between sparks created by local nodes and stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)? The idea is to improve data locality by preferring sparks of local nodes (it is more likely that the data for those sparks is already on the local processor). However, such a distinction also imposes an overhead on the spark queue management, and typically a large number of sparks are generated during execution. By default this variable is set to @False@. \item Should the RTS use granularity control mechanisms? The idea of a granularity control mechanism is to make use of granularity information provided via annotation of the @par@ construct in order to prefer bigger threads when either turning a spark into a thread or when choosing the next thread to schedule. Currently, three such mechanisms are implemented: \begin{itemize} \item Cut-off: The granularity information is interpreted as a priority. If a threshold priority is given to the RTS, then only those sparks with a higher priority than the threshold are actually created. Other sparks are immediately discarded. This is similar to a usual cut-off mechanism often used in parallel programs, where parallelism is only created if the input data is lage enough. With this option, the choice is hidden in the RTS and only the threshold value has to be provided as a parameter to the runtime system. \item Priority Sparking: This mechanism keeps priorities for sparks and chooses the spark with the highest priority when turning a spark into a thread. After that the priority information is discarded. The overhead of this mechanism comes from maintaining a sorted spark queue. \item Priority Scheduling: This mechanism keeps the granularity information for threads, to. Thus, on each reschedule the largest thread is chosen. This mechanism has a higher overhead, as the thread queue is sorted, too. \end{itemize} \end{itemize} */ //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code //@subsection Initialisation void init_gr_stats (void) { memset(&globalGranStats, '\0', sizeof(GlobalGranStats)); #if 0 /* event stats */ globalGranStats.noOfEvents = 0; for (i=0; i not unique else unique = rtsTrue; // found 1st instance ASSERT(unique); // otherwise returned from within loop return (unique); } //@cindex any_idle static inline rtsBool any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */ PEs i; rtsBool any_idle; for(i=0, any_idle=rtsFalse; !any_idle && ievttype]++; } entry = EventHd; IF_GRAN_DEBUG(event_trace, print_event(entry)); EventHd = EventHd->next; return(entry); } /* When getting the time of the next event we ignore CONTINUETHREAD events: we don't want to be interrupted before the end of the current time slice unless there is something important to handle. */ //@cindex get_time_of_next_event rtsTime get_time_of_next_event(void) { rtsEventQ event = EventHd; while (event != NULL && event->evttype==ContinueThread) { event = event->next; } if(event == NULL) return ((rtsTime) 0); else return (event->time); } /* ToDo: replace malloc/free with a free list */ //@cindex insert_event void insert_event(newentry) rtsEvent *newentry; { rtsEventType evttype = newentry->evttype; rtsEvent *event, **prev; /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */ /* Search the queue and insert at the right point: FINDWORK before everything, CONTINUETHREAD after everything. This ensures that we find any available work after all threads have executed the current cycle. This level of detail would normally be irrelevant, but matters for ridiculously low latencies... */ /* Changed the ordering: Now FINDWORK comes after everything but CONTINUETHREAD. This makes sure that a MOVESPARK comes before a FINDWORK. This is important when a GranSimSparkAt happens and DoAlwaysCreateThreads is turned on. Also important if a GC occurs when trying to build a new thread (see much_spark) -- HWL 02/96 */ if(EventHd == NULL) EventHd = newentry; else { for (event = EventHd, prev=(rtsEvent**)&EventHd; event != NULL; prev = (rtsEvent**)&(event->next), event = event->next) { switch (evttype) { case FindWork: if ( event->time < newentry->time || ( (event->time == newentry->time) && (event->evttype != ContinueThread) ) ) continue; else break; case ContinueThread: if ( event->time <= newentry->time ) continue; else break; default: if ( event->time < newentry->time || ((event->time == newentry->time) && (event->evttype == newentry->evttype)) ) continue; else break; } /* Insert newentry here (i.e. before event) */ *prev = newentry; newentry->next = event; break; } if (event == NULL) *prev = newentry; } } //@cindex new_event void new_event(proc,creator,time,evttype,tso,node,spark) PEs proc, creator; rtsTime time; rtsEventType evttype; StgTSO *tso; StgClosure *node; rtsSpark *spark; { rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event"); newentry->proc = proc; newentry->creator = creator; newentry->time = time; newentry->evttype = evttype; newentry->tso = tso; newentry->node = node; newentry->spark = spark; newentry->gc_info = 0; newentry->next = NULL; insert_event(newentry); IF_DEBUG(gran, fprintf(stderr, "GRAN: new_event: \n"); print_event(newentry)); } //@cindex prepend_event void prepend_event(event) /* put event at beginning of EventQueue */ rtsEvent *event; { /* only used for GC! */ event->next = EventHd; EventHd = event; } //@cindex grab_event rtsEventQ grab_event(void) /* undo prepend_event i.e. get the event */ { /* at the head of EventQ but don't free anything */ rtsEventQ event = EventHd; if (EventHd == NULL) { barf("No next event (in grab_event). This may be caused by a circular data dependency in the program."); } EventHd = EventHd->next; return (event); } //@cindex traverse_eventq_for_gc void traverse_eventq_for_gc(void) { rtsEventQ event = EventHd; StgWord bufsize; StgClosure *closurep; StgTSO *tsop; StgPtr buffer, bufptr; PEs proc, creator; /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the orig closure (root of packed graph). This means that a graph, which is between processors at the time of GC is fetched again at the time when it would have arrived, had there been no GC. Slightly inaccurate but safe for GC. This is only needed for GUM style fetchng. -- HWL */ if (!RtsFlags.GranFlags.DoBulkFetching) return; for(event = EventHd; event!=NULL; event=event->next) { if (event->evttype==FetchReply) { buffer = stgCast(StgPtr,event->node); ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG); /* It's a pack buffer */ bufsize = buffer[PACK_SIZE_LOCN]; closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]); tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]); proc = event->proc; creator = event->creator; /* similar to unpacking */ for (bufptr=buffer+PACK_HDR_SIZE; bufptr<(buffer+bufsize); bufptr++) { // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) || // (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) { if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) { convertFromRBH(stgCast(StgClosure *,bufptr)); } } free(buffer); event->evttype = FetchNode; event->proc = creator; event->creator = proc; event->node = closurep; event->tso = tsop; event->gc_info = 0; } } } void markEventQueue(void) { StgClosure *MarkRoot(StgClosure *root); // prototype rtsEventQ event = EventHd; nat len; /* iterate over eventq and register relevant fields in event as roots */ for(event = EventHd, len = 0; event!=NULL; event=event->next, len++) { switch (event->evttype) { case ContinueThread: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); break; case StartThread: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); event->node = (StgClosure *)MarkRoot((StgClosure *)event->node); break; case ResumeThread: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); event->node = (StgClosure *)MarkRoot((StgClosure *)event->node); break; case MoveSpark: event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node); break; case MoveThread: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); break; case FindWork: break; case FetchNode: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); event->node = (StgClosure *)MarkRoot((StgClosure *)event->node); break; case FetchReply: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); if (RtsFlags.GranFlags.DoBulkFetching) // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal"); else event->node = (StgClosure *)MarkRoot((StgClosure *)event->node); break; case GlobalBlock: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); event->node = (StgClosure *)MarkRoot((StgClosure *)event->node); break; case UnblockThread: event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso); event->node = (StgClosure *)MarkRoot((StgClosure *)event->node); break; default: barf("markEventQueue: trying to mark unknown event @ %p", event); }} IF_DEBUG(gc, belch("GC: markEventQueue: %d events in queue", len)); } /* Prune all ContinueThread events related to tso or node in the eventq. Currently used if a thread leaves STG land with ThreadBlocked status, i.e. it blocked on a closure and has been put on its blocking queue. It will be reawakended via a call to awakenBlockedQueue. Until then no event effecting this tso should appear in the eventq. A bit of a hack, because ideally we shouldn't generate such spurious ContinueThread events in the first place. */ //@cindex prune_eventq void prune_eventq(tso, node) StgTSO *tso; StgClosure *node; { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd; /* node unused for now */ ASSERT(node==NULL); /* tso must be valid, then */ ASSERT(tso!=END_TSO_QUEUE); while (event != NULL) { if (event->evttype==ContinueThread && (event->tso==tso)) { IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)", event->tso->id, event->tso, event->proc, event->time, event)); if (prev==(rtsEventQ)NULL) { // beginning of eventq EventHd = event->next; free(event); event = EventHd; } else { prev->next = event->next; free(event); event = prev->next; } } else { // no pruning necessary; go to next event prev = event; event = event->next; } } } //@cindex print_event void print_event(event) rtsEvent *event; { char str_tso[16], str_node[16]; StgThreadID tso_id; if (event->tso==END_TSO_QUEUE) { strcpy(str_tso, "______"); tso_id = 0; } else { sprintf(str_tso, "%p", event->tso); tso_id = (event->tso==NULL) ? 0 : event->tso->id; } if (event->node==(StgClosure*)NULL) { strcpy(str_node, "______"); } else { sprintf(str_node, "%p", event->node); } // HWL: shouldn't be necessary; ToDo: nuke //str_tso[6]='\0'; //str_node[6]='\0'; if (event==NULL) fprintf(stderr,"Evt: NIL\n"); else fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n", event_names[event->evttype], event->evttype, event->proc, event->creator, event->time, tso_id, str_tso, str_node /*, event->spark, event->next */ ); } //@cindex print_eventq void print_eventq(hd) rtsEvent *hd; { rtsEvent *x; fprintf(stderr,"Event Queue with root at %p:\n", hd); for (x=hd; x!=NULL; x=x->next) { print_event(x); } } /* Spark queue functions are now all in Sparks.c!! */ //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code //@subsection Scheduling functions /* These functions are variants of thread initialisation and therefore related to initThread and friends in Schedule.c. However, they are specific to a GranSim setup in storing more info in the TSO's statistics buffer and sorting the thread queues etc. */ /* A large portion of startThread deals with maintaining a sorted thread queue, which is needed for the Priority Sparking option. Without that complication the code boils down to FIFO handling. */ //@cindex insertThread void insertThread(tso, proc) StgTSO* tso; PEs proc; { StgTSO *prev = NULL, *next = NULL; nat count = 0; rtsBool found = rtsFalse; ASSERT(CurrentProc==proc); ASSERT(!is_on_queue(tso,proc)); /* Idle proc: put the thread on the run queue same for pri spark and basic version */ if (run_queue_hds[proc] == END_TSO_QUEUE) { /* too strong! ASSERT((CurrentProc==MainProc && CurrentTime[MainProc]==0 && procStatus[MainProc]==Idle) || procStatus[proc]==Starting); */ run_queue_hds[proc] = run_queue_tls[proc] = tso; CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime; /* new_event of ContinueThread has been moved to do_the_startthread */ /* too strong! ASSERT(procStatus[proc]==Idle || procStatus[proc]==Fishing || procStatus[proc]==Starting); procStatus[proc] = Busy; */ return; } if (RtsFlags.GranFlags.Light) GranSimLight_insertThread(tso, proc); /* Only for Pri Scheduling: find place where to insert tso into queue */ if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0) /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */ for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0; (next != END_TSO_QUEUE) && !(found = tso->gran.pri >= next->gran.pri); prev = next, next = next->link, count++) { ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) && (prev==(StgTSO*)NULL || prev->link==next)); } ASSERT(!found || next != END_TSO_QUEUE); ASSERT(procStatus[proc]!=Idle); if (found) { /* found can only be rtsTrue if pri scheduling enabled */ ASSERT(RtsFlags.GranFlags.DoPriorityScheduling); if (RtsFlags.GranFlags.GranSimStats.Global) globalGranStats.non_end_add_threads++; /* Add tso to ThreadQueue between prev and next */ tso->link = next; if ( next == (StgTSO*)END_TSO_QUEUE ) { run_queue_tl = tso; } else { /* no back link for TSO chain */ } if ( prev == (StgTSO*)END_TSO_QUEUE ) { /* Never add TSO as first elem of thread queue; the first */ /* element should be the one that is currently running -- HWL */ IF_DEBUG(gran, belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n", tso, tso->gran.pri, run_queue_hd, proc, CurrentTime[proc])); } else { prev->link = tso; } } else { /* !found */ /* or not pri sparking! */ /* Add TSO to the end of the thread queue on that processor */ run_queue_tls[proc]->link = tso; run_queue_tls[proc] = tso; } ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0); CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead + RtsFlags.GranFlags.Costs.threadqueuetime; /* ToDo: check if this is still needed -- HWL if (RtsFlags.GranFlags.DoThreadMigration) ++SurplusThreads; if (RtsFlags.GranFlags.GranSimStats.Full && !(( event_type == GR_START || event_type == GR_STARTQ) && RtsFlags.GranFlags.labelling) ) DumpRawGranEvent(proc, creator, event_type+1, tso, node, tso->gran.sparkname, spark_queue_len(proc)); */ # if defined(GRAN_CHECK) /* Check if thread queue is sorted. Only for testing, really! HWL */ if ( RtsFlags.GranFlags.DoPriorityScheduling && (RtsFlags.GranFlags.Debug.sortedQ) ) { rtsBool sorted = rtsTrue; StgTSO *prev, *next; if (run_queue_hds[proc]==END_TSO_QUEUE || run_queue_hds[proc]->link==END_TSO_QUEUE) { /* just 1 elem => ok */ } else { /* Qu' wa'DIch yIleghQo' (ignore first elem)! */ for (prev = run_queue_hds[proc]->link, next = prev->link; (next != END_TSO_QUEUE) ; prev = next, next = prev->link) { ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) && (prev==(StgTSO*)NULL || prev->link==next)); sorted = sorted && (prev->gran.pri >= next->gran.pri); } } if (!sorted) { fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n", CurrentProc); G_THREADQ(run_queue_hd,0x1); } } # endif } /* insertThread, which is only used for GranSim Light, is similar to startThread in that it adds a TSO to a thread queue. However, it assumes that the thread queue is sorted by local clocks and it inserts the TSO at the right place in the queue. Don't create any event, just insert. */ //@cindex GranSimLight_insertThread rtsBool GranSimLight_insertThread(tso, proc) StgTSO* tso; PEs proc; { StgTSO *prev, *next; nat count = 0; rtsBool found = rtsFalse; ASSERT(RtsFlags.GranFlags.Light); /* In GrAnSim-Light we always have an idle `virtual' proc. The semantics of the one-and-only thread queue is different here: all threads in the queue are running (each on its own virtual processor); the queue is only needed internally in the simulator to interleave the reductions of the different processors. The one-and-only thread queue is sorted by the local clocks of the TSOs. */ ASSERT(run_queue_hds[proc] != END_TSO_QUEUE); ASSERT(tso->link == END_TSO_QUEUE); /* If only one thread in queue so far we emit DESCHEDULE in debug mode */ if (RtsFlags.GranFlags.GranSimStats.Full && (RtsFlags.GranFlags.Debug.checkLight) && (run_queue_hd->link == END_TSO_QUEUE)) { DumpRawGranEvent(proc, proc, GR_DESCHEDULE, run_queue_hds[proc], (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len // resched = rtsTrue; } /* this routine should only be used in a GrAnSim Light setup */ /* && CurrentProc must be 0 in GrAnSim Light setup */ ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0); /* Idle proc; same for pri spark and basic version */ if (run_queue_hd==END_TSO_QUEUE) { run_queue_hd = run_queue_tl = tso; /* MAKE_BUSY(CurrentProc); */ return rtsTrue; } for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0; (next != END_TSO_QUEUE) && !(found = (tso->gran.clock < next->gran.clock)); prev = next, next = next->link, count++) { ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) && (prev==(StgTSO*)NULL || prev->link==next)); } /* found can only be rtsTrue if pri sparking enabled */ if (found) { /* Add tso to ThreadQueue between prev and next */ tso->link = next; if ( next == END_TSO_QUEUE ) { run_queue_tls[proc] = tso; } else { /* no back link for TSO chain */ } if ( prev == END_TSO_QUEUE ) { run_queue_hds[proc] = tso; } else { prev->link = tso; } } else { /* !found */ /* or not pri sparking! */ /* Add TSO to the end of the thread queue on that processor */ run_queue_tls[proc]->link = tso; run_queue_tls[proc] = tso; } if ( prev == END_TSO_QUEUE ) { /* new head of queue */ new_event(proc, proc, CurrentTime[proc], ContinueThread, tso, (StgClosure*)NULL, (rtsSpark*)NULL); } /* if (RtsFlags.GranFlags.GranSimStats.Full && !(( event_type == GR_START || event_type == GR_STARTQ) && RtsFlags.GranFlags.labelling) ) DumpRawGranEvent(proc, creator, gr_evttype, tso, node, tso->gran.sparkname, spark_queue_len(proc)); */ return rtsTrue; } /* endThread is responsible for general clean-up after the thread tso has finished. This includes emitting statistics into the profile etc. */ void endThread(StgTSO *tso, PEs proc) { ASSERT(procStatus[proc]==Busy); // coming straight out of STG land ASSERT(tso->what_next==ThreadComplete); // ToDo: prune ContinueThreads for this TSO from event queue DumpEndEvent(proc, tso, rtsFalse /* not mandatory */); /* if this was the last thread on this PE then make it Idle */ if (run_queue_hds[proc]==END_TSO_QUEUE) { procStatus[CurrentProc] = Idle; } } //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code //@subsection Thread Queue routines /* Check whether given tso resides on the run queue of the current processor. Only used for debugging. */ //@cindex is_on_queue rtsBool is_on_queue (StgTSO *tso, PEs proc) { StgTSO *t; rtsBool found; for (t=run_queue_hds[proc], found=rtsFalse; t!=END_TSO_QUEUE && !(found = t==tso); t=t->link) /* nothing */ ; return found; } /* This routine is only used for keeping a statistics of thread queue lengths to evaluate the impact of priority scheduling. -- HWL {spark_queue_len}vo' jInIHta' */ //@cindex thread_queue_len nat thread_queue_len(PEs proc) { StgTSO *prev, *next; nat len; for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc]; next != END_TSO_QUEUE; len++, prev = next, next = prev->link) {} return (len); } //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code //@subsection GranSim functions /* ----------------------------------------------------------------- */ /* The main event handling functions; called from Schedule.c (schedule) */ /* ----------------------------------------------------------------- */ //@cindex do_the_globalblock void do_the_globalblock(rtsEvent* event) { PEs proc = event->proc; /* proc that requested node */ StgTSO *tso = event->tso; /* tso that requested node */ StgClosure *node = event->node; /* requested, remote node */ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n")); /* There should be no GLOBALBLOCKs in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); /* GlobalBlock events only valid with GUM fetching */ ASSERT(RtsFlags.GranFlags.DoBulkFetching); IF_GRAN_DEBUG(bq, // globalBlock, if (IS_LOCAL_TO(PROCS(node),proc)) { belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n", tso->id, tso, node, proc); }); /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */ if ( blockFetch(tso,proc,node) != 0 ) return; /* node has become local by now */ #if 0 ToDo: check whether anything has to be done at all after blockFetch -- HWL if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */ StgTSO* tso = run_queue_hds[proc]; /* awaken next thread */ if (tso != (StgTSO*)NULL) { new_event(proc, proc, CurrentTime[proc], ContinueThread, tso, (StgClosure*)NULL, (rtsSpark*)NULL); CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime; if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso, (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc)); // ToDo: check sparkname and spar_queue_len procStatus[proc] = Busy; /* might have been fetching */ } else { procStatus[proc] = Idle; /* no work on proc now */ } } else { /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */ /* other thread is already running */ /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL new_event(proc,proc,CurrentTime[proc], CONTINUETHREAD,EVENT_TSO(event), (RtsFlags.GranFlags.DoBulkFetching ? closure : EVENT_NODE(event)),NULL); */ } #endif } //@cindex do_the_unblock void do_the_unblock(rtsEvent* event) { PEs proc = event->proc, /* proc that requested node */ creator = event->creator; /* proc that requested node */ StgTSO* tso = event->tso; /* tso that requested node */ StgClosure* node = event->node; /* requested, remote node */ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n")) /* There should be no UNBLOCKs in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); /* UnblockThread means either FetchReply has arrived or a blocking queue has been awakened; ToDo: check with assertions ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node)); */ if (!RtsFlags.GranFlags.DoAsyncFetch) { /* block-on-fetch */ /* We count block-on-fetch as normal block time */ tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat; /* Dumping now done when processing the event No costs for contextswitch or thread queueing in this case if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso, (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc)); */ /* Maybe do this in FetchReply already if (procStatus[proc]==Fetching) procStatus[proc] = Busy; */ /* new_event(proc, proc, CurrentTime[proc], ContinueThread, tso, node, (rtsSpark*)NULL); */ } else { /* Asynchr comm causes additional costs here: */ /* Bring the TSO from the blocked queue into the threadq */ } /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */ new_event(proc, proc, CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime, ResumeThread, tso, node, (rtsSpark*)NULL); } //@cindex do_the_fetchnode void do_the_fetchnode(rtsEvent* event) { PEs proc = event->proc, /* proc that holds the requested node */ creator = event->creator; /* proc that requested node */ StgTSO* tso = event->tso; StgClosure* node = event->node; /* requested, remote node */ rtsFetchReturnCode rc; ASSERT(CurrentProc==proc); /* There should be no FETCHNODEs in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n")); CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime; /* ToDo: check whether this is the right place for dumping the event */ if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0); do { rc = handleFetchRequest(node, proc, creator, tso); if (rc == OutOfHeap) { /* trigger GC */ # if defined(GRAN_CHECK) && defined(GRAN) if (RtsFlags.GcFlags.giveStats) fprintf(RtsFlags.GcFlags.statsFile,"***** veQ boSwI' PackNearbyGraph(node %p, tso %p (%d))\n", node, tso, tso->id); # endif barf("//// do_the_fetchnode: out of heap after handleFetchRequest; ToDo: call GarbageCollect()"); prepend_event(event); GarbageCollect(GetRoots, rtsFalse); // HWL: ToDo: check whether a ContinueThread has to be issued // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse); # if 0 && defined(GRAN_CHECK) && defined(GRAN) if (RtsFlags.GcFlags.giveStats) { fprintf(RtsFlags.GcFlags.statsFile,"***** SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n", Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED); ??? fprintf(stderr,"***** No. of packets so far: %d (total size: %d)\n", globalGranStats.tot_packets, globalGranStats.tot_packet_size); } # endif event = grab_event(); // Hp -= PACK_HEAP_REQUIRED; // ??? /* GC knows that events are special and follows the pointer i.e. */ /* events are valid even if they moved. An EXIT is triggered */ /* if there is not enough heap after GC. */ } } while (rc == OutOfHeap); } //@cindex do_the_fetchreply void do_the_fetchreply(rtsEvent* event) { PEs proc = event->proc, /* proc that requested node */ creator = event->creator; /* proc that holds the requested node */ StgTSO* tso = event->tso; StgClosure* node = event->node; /* requested, remote node */ StgClosure* closure=(StgClosure*)NULL; ASSERT(CurrentProc==proc); ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching); IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n")); /* There should be no FETCHREPLYs in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); /* assign message unpack costs *before* dumping the event */ CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime; /* ToDo: check whether this is the right place for dumping the event */ if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc, creator, GR_REPLY, tso, node, tso->gran.sparkname, spark_queue_len(proc)); /* THIS SHOULD NEVER HAPPEN If tso is in the BQ of node this means that it actually entered the remote closure, due to a missing GranSimFetch at the beginning of the entry code; therefore, this is actually a faked fetch, triggered from within GranSimBlock; since tso is both in the EVQ and the BQ for node, we have to take it out of the BQ first before we can handle the FetchReply; ToDo: special cases in awakenBlockedQueue, since the BQ magically moved. */ if (tso->block_info.closure!=(StgClosure*)NULL) { IF_GRAN_DEBUG(bq, belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)", tso->id, tso, node)); // unlink_from_bq(tso, node); } if (RtsFlags.GranFlags.DoBulkFetching) { /* bulk (packet) fetching */ rtsPackBuffer *buffer = (rtsPackBuffer*)node; nat size = buffer->size; /* NB: Fetch misses can't occur with GUM fetching, as */ /* updatable closure are turned into RBHs and therefore locked */ /* for other processors that try to grab them. */ closure = UnpackGraph(buffer); CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime; } else // incremental fetching /* Copy or move node to CurrentProc */ if (fetchNode(node, creator, proc)) { /* Fetch has failed i.e. node has been grabbed by another PE */ PEs p = where_is(node); rtsTime fetchtime; if (RtsFlags.GranFlags.GranSimStats.Global) globalGranStats.fetch_misses++; IF_GRAN_DEBUG(thunkStealing, belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n", CurrentTime[proc],node,p,creator)); CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime; /* Count fetch again !? */ ++(tso->gran.fetchcount); tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime; fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) + RtsFlags.GranFlags.Costs.latency; /* Chase the grabbed node */ new_event(p, proc, fetchtime, FetchNode, tso, node, (rtsSpark*)NULL); # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */ IF_GRAN_DEBUG(blockOnFetch, BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/ IF_GRAN_DEBUG(blockOnFetch_sanity, tso->type |= FETCH_MASK_TSO;) # endif CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime; return; /* NB: no REPLy has been processed; tso still sleeping */ } /* -- Qapla'! Fetch has been successful; node is here, now */ ++(event->tso->gran.fetchcount); event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime; /* this is now done at the beginning of this routine if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso, (RtsFlags.GranFlags.DoBulkFetching ? closure : event->node), tso->gran.sparkname, spark_queue_len(proc)); */ ASSERT(OutstandingFetches[proc] > 0); --OutstandingFetches[proc]; new_event(proc, proc, CurrentTime[proc], ResumeThread, event->tso, (RtsFlags.GranFlags.DoBulkFetching ? closure : event->node), (rtsSpark*)NULL); } //@cindex do_the_movethread void do_the_movethread(rtsEvent* event) { PEs proc = event->proc, /* proc that requested node */ creator = event->creator; /* proc that holds the requested node */ StgTSO* tso = event->tso; IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n")); ASSERT(CurrentProc==proc); /* There should be no MOVETHREADs in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); /* MOVETHREAD events should never occur without -bM */ ASSERT(RtsFlags.GranFlags.DoThreadMigration); /* Bitmask of moved thread should be 0 */ ASSERT(PROCS(tso)==0); ASSERT(procStatus[proc] == Fishing || RtsFlags.GranFlags.DoAsyncFetch); ASSERT(OutstandingFishes[proc]>0); /* ToDo: exact costs for unpacking the whole TSO */ CurrentTime[proc] += 5l * RtsFlags.GranFlags.Costs.munpacktime; /* ToDo: check whether this is the right place for dumping the event */ if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc, creator, GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0); // ToDo: check cost functions --OutstandingFishes[proc]; SET_GRAN_HDR(tso, ThisPE); // adjust the bitmask for the TSO insertThread(tso, proc); if (procStatus[proc]==Fishing) procStatus[proc] = Idle; if (RtsFlags.GranFlags.GranSimStats.Global) globalGranStats.tot_TSOs_migrated++; } //@cindex do_the_movespark void do_the_movespark(rtsEvent* event) { PEs proc = event->proc, /* proc that requested spark */ creator = event->creator; /* proc that holds the requested spark */ StgTSO* tso = event->tso; rtsSparkQ spark = event->spark; IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n")) ASSERT(CurrentProc==proc); ASSERT(spark!=NULL); ASSERT(procStatus[proc] == Fishing || RtsFlags.GranFlags.DoAsyncFetch); ASSERT(OutstandingFishes[proc]>0); CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime; /* record movement of spark only if spark profiling is turned on */ if (RtsFlags.GranFlags.GranSimStats.Sparks) DumpRawGranEvent(proc, creator, SP_ACQUIRED, tso, spark->node, spark->name, spark_queue_len(proc)); /* global statistics */ if ( RtsFlags.GranFlags.GranSimStats.Global && !closure_SHOULD_SPARK(spark->node)) globalGranStats.withered_sparks++; /* Not adding the spark to the spark queue would be the right */ /* thing here, but it also would be cheating, as this info can't be */ /* available in a real system. -- HWL */ --OutstandingFishes[proc]; add_to_spark_queue(spark); IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag print_sparkq_stats()); /* Should we treat stolen sparks specially? Currently, we don't. */ if (procStatus[proc]==Fishing) procStatus[proc] = Idle; /* add_to_spark_queue will increase the time of the current proc. */ /* If proc was fishing, it is Idle now with the new spark in its spark pool. This means that the next time handleIdlePEs is called, a local FindWork will be created on this PE to turn the spark into a thread. Of course another PE might steal the spark in the meantime (that's why we are using events rather than inlining all the operations in the first place). */ } /* In the Constellation class version of GranSim the semantics of StarThread events has changed. Now, StartThread has to perform 3 basic operations: - create a new thread (previously this was done in ActivateSpark); - insert the thread into the run queue of the current processor - generate a new event for actually running the new thread Note that the insertThread is called via createThread. */ //@cindex do_the_startthread void do_the_startthread(rtsEvent *event) { PEs proc = event->proc; /* proc that requested node */ StgTSO *tso = event->tso; /* tso that requested node */ StgClosure *node = event->node; /* requested, remote node */ rtsSpark *spark = event->spark; GranEventType gr_evttype; ASSERT(CurrentProc==proc); ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); ASSERT(event->evttype == ResumeThread || event->evttype == StartThread); /* if this was called via StartThread: */ ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting); /* if this was called via ResumeThread: */ ASSERT(event->evttype!=ResumeThread || RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc)); /* startThread may have been called from the main event handler upon finding either a ResumeThread or a StartThread event; set the gr_evttype (needed for writing to .gr file) accordingly */ // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START; if ( event->evttype == StartThread ) { GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? GR_START : GR_STARTQ; tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread! pushClosure(tso, node); // ToDo: fwd info on local/global spark to thread -- HWL // tso->gran.exported = spark->exported; // tso->gran.locked = !spark->global; tso->gran.sparkname = spark->name; ASSERT(CurrentProc==proc); if (RtsFlags.GranFlags.GranSimStats.Full) DumpGranEvent(gr_evttype,tso); CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime; } else { // event->evttype == ResumeThread GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? GR_RESUME : GR_RESUMEQ; insertThread(tso, proc); ASSERT(CurrentProc==proc); if (RtsFlags.GranFlags.GranSimStats.Full) DumpGranEvent(gr_evttype,tso); } ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue procStatus[proc] = Busy; /* make sure that this thread is actually run */ new_event(proc, proc, CurrentTime[proc], ContinueThread, tso, node, (rtsSpark*)NULL); /* A wee bit of statistics gathering */ if (RtsFlags.GranFlags.GranSimStats.Global) { globalGranStats.tot_add_threads++; globalGranStats.tot_tq_len += thread_queue_len(CurrentProc); } } //@cindex do_the_findwork void do_the_findwork(rtsEvent* event) { PEs proc = event->proc, /* proc to search for work */ creator = event->creator; /* proc that requested work */ rtsSparkQ spark = event->spark; /* ToDo: check that this size is safe -- HWL */ #if 0 ToDo: check available heap nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS; // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize; #endif IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n")); /* If GUM style fishing is enabled, the contents of the spark field says what to steal (spark(1) or thread(2)); */ ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0)); /* Make sure that we have enough heap for creating a new thread. This is a conservative estimate of the required heap. This eliminates special checks for GC around NewThread within ActivateSpark. */ #if 0 ToDo: check available heap if (Hp + req_heap > HpLim ) { IF_DEBUG(gc, belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");) GarbageCollect(GetRoots); // ReallyPerformThreadGC(req_heap, rtsFalse); old -- HWL Hp -= req_heap; if (procStatus[CurrentProc]==Sparking) procStatus[CurrentProc]=Idle; return; } #endif if ( RtsFlags.GranFlags.DoAlwaysCreateThreads || RtsFlags.GranFlags.Fishing || ((procStatus[proc]==Idle || procStatus[proc]==Sparking) && (RtsFlags.GranFlags.FetchStrategy >= 2 || OutstandingFetches[proc] == 0)) ) { rtsBool found; rtsSparkQ prev, spark; /* ToDo: check */ ASSERT(procStatus[proc]==Sparking || RtsFlags.GranFlags.DoAlwaysCreateThreads || RtsFlags.GranFlags.Fishing); /* SImmoHwI' yInej! Search spark queue! */ /* gimme_spark (event, &found, &spark); */ findLocalSpark(event, &found, &spark); if (!found) { /* pagh vumwI' */ /* If no spark has been found this can mean 2 things: 1/ The FindWork was a fish (i.e. a message sent by another PE) and the spark pool of the receiver is empty --> the fish has to be forwarded to another PE 2/ The FindWork was local to this PE (i.e. no communication; in this case creator==proc) and the spark pool of the PE is not empty contains only sparks of closures that should not be sparked (note: if the spark pool were empty, handleIdlePEs wouldn't have generated a FindWork in the first place) --> the PE has to be made idle to trigger stealing sparks the next time handleIdlePEs is performed */ ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL); if (creator==proc) { /* local FindWork */ if (procStatus[proc]==Busy) { belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx", proc, CurrentTime[proc]); procStatus[proc] = Idle; } } else { /* global FindWork i.e. a Fish */ ASSERT(RtsFlags.GranFlags.Fishing); /* actually this generates another request from the originating PE */ ASSERT(OutstandingFishes[creator]>0); OutstandingFishes[creator]--; /* ToDo: assign costs for sending fish to proc not to creator */ stealSpark(creator); /* might steal from same PE; ToDo: fix */ ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing); /* any assertions on state of proc possible here? */ } } else { /* DaH chu' Qu' yIchen! Now create new work! */ IF_GRAN_DEBUG(findWork, belch("+- munching spark %p; creating thread for node %p", spark, spark->node)); activateSpark (event, spark); ASSERT(spark != (rtsSpark*)NULL); spark = delete_from_sparkq (spark, proc, rtsTrue); } IF_GRAN_DEBUG(findWork, belch("+- Contents of spark queues at the end of FindWork @ %lx", CurrentTime[proc]); print_sparkq_stats()); /* ToDo: check ; not valid if GC occurs in ActivateSpark */ ASSERT(!found || /* forward fish or */ (proc!=creator || /* local spark or */ (proc==creator && procStatus[proc]==Starting)) || //(!found && procStatus[proc]==Idle) || RtsFlags.GranFlags.DoAlwaysCreateThreads); } else { IF_GRAN_DEBUG(findWork, belch("+- RTS refuses to findWork on PE %d @ %lx", proc, CurrentTime[proc]); belch(" procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d", proc, proc_status_names[procStatus[proc]], RtsFlags.GranFlags.FetchStrategy, proc, OutstandingFetches[proc])); } } //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code //@subsection GranSimLight routines /* This code is called from the central scheduler after having rgabbed a new event and is only needed for GranSim-Light. It mainly adjusts the ActiveTSO so that all costs that have to be assigned from within the scheduler are assigned to the right TSO. The choice of ActiveTSO depends on the type of event that has been found. */ void GranSimLight_enter_system(event, ActiveTSOp) rtsEvent *event; StgTSO **ActiveTSOp; { StgTSO *ActiveTSO = *ActiveTSOp; ASSERT (RtsFlags.GranFlags.Light); /* Restore local clock of the virtual processor attached to CurrentTSO. All costs will be associated to the `virt. proc' on which the tso is living. */ if (ActiveTSO != NULL) { /* already in system area */ ActiveTSO->gran.clock = CurrentTime[CurrentProc]; if (RtsFlags.GranFlags.DoFairSchedule) { if (RtsFlags.GranFlags.GranSimStats.Full && RtsFlags.GranFlags.Debug.checkLight) DumpGranEvent(GR_SYSTEM_END,ActiveTSO); } } switch (event->evttype) { case ContinueThread: case FindWork: /* inaccurate this way */ ActiveTSO = run_queue_hd; break; case ResumeThread: case StartThread: case MoveSpark: /* has tso of virt proc in tso field of event */ ActiveTSO = event->tso; break; default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n", event_names[event->evttype],event->evttype); } CurrentTime[CurrentProc] = ActiveTSO->gran.clock; if (RtsFlags.GranFlags.DoFairSchedule) { if (RtsFlags.GranFlags.GranSimStats.Full && RtsFlags.GranFlags.Debug.checkLight) DumpGranEvent(GR_SYSTEM_START,ActiveTSO); } } void GranSimLight_leave_system(event, ActiveTSOp) rtsEvent *event; StgTSO **ActiveTSOp; { StgTSO *ActiveTSO = *ActiveTSOp; ASSERT(RtsFlags.GranFlags.Light); /* Save time of `virt. proc' which was active since last getevent and restore time of `virt. proc' where CurrentTSO is living on. */ if(RtsFlags.GranFlags.DoFairSchedule) { if (RtsFlags.GranFlags.GranSimStats.Full && RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags DumpGranEvent(GR_SYSTEM_END,ActiveTSO); } ActiveTSO->gran.clock = CurrentTime[CurrentProc]; ActiveTSO = (StgTSO*)NULL; CurrentTime[CurrentProc] = CurrentTSO->gran.clock; if (RtsFlags.GranFlags.DoFairSchedule /* && resched */ ) { // resched = rtsFalse; if (RtsFlags.GranFlags.GranSimStats.Full && RtsFlags.GranFlags.Debug.checkLight) DumpGranEvent(GR_SCHEDULE,run_queue_hd); } /* if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure && (TimeOfNextEvent == 0 || TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000header.gran.procs |= PE_NUMBER(to); /* Copy node */ else node->header.gran.procs = PE_NUMBER(to); /* Move node */ return Ok; } /* Process a fetch request. Cost of sending a packet of size n = C + P*n where C = packet construction constant, P = cost of packing one word into a packet [Should also account for multiple packets]. */ //@cindex handleFetchRequest rtsFetchReturnCode handleFetchRequest(node,to,from,tso) StgClosure* node; // the node which is requested PEs to, from; // fetch request: from -> to StgTSO* tso; // the tso which needs the node { ASSERT(!RtsFlags.GranFlags.Light); /* ToDo: check assertion */ ASSERT(OutstandingFetches[from]>0); /* probably wrong place; */ ASSERT(CurrentProc==to); if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */ { /* start tso */ IF_GRAN_DEBUG(thunkStealing, fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n", node, info_type(node), from)); if (RtsFlags.GranFlags.DoBulkFetching) { nat size; rtsPackBuffer *graph; /* Create a 1-node-buffer and schedule a FETCHREPLY now */ graph = PackOneNode(node, tso, &size); new_event(from, to, CurrentTime[to], FetchReply, tso, (StgClosure *)graph, (rtsSpark*)NULL); } else { new_event(from, to, CurrentTime[to], FetchReply, tso, node, (rtsSpark*)NULL); } IF_GRAN_DEBUG(thunkStealing, belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from)); return (NodeIsLocal); } else if (IS_LOCAL_TO(PROCS(node), to) ) /* Is node still here? */ { if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */ nat size; /* (code from GUM) */ StgClosure* graph; if (IS_BLACK_HOLE(node)) { /* block on BH or RBH */ new_event(from, to, CurrentTime[to], GlobalBlock, tso, node, (rtsSpark*)NULL); /* Note: blockFetch is done when handling GLOBALBLOCK event; make sure the TSO stays out of the run queue */ /* When this thread is reawoken it does the usual: it tries to enter the updated node and issues a fetch if it's remote. It has forgotten that it has sent a fetch already (i.e. a FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */ --OutstandingFetches[from]; IF_GRAN_DEBUG(thunkStealing, belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ", node, to, from)); if (RtsFlags.GranFlags.GranSimStats.Global) { globalGranStats.tot_FMBQs++; } return (NodeIsBH); } /* The tso requesting the node is blocked and cannot be on a run queue */ ASSERT(!is_on_queue(tso, from)); // ToDo: check whether graph is ever used as an rtsPackBuffer!! if ((graph = (StgClosure *)PackNearbyGraph(node, tso, &size, 0)) == NULL) return (OutOfHeap); /* out of heap */ /* Actual moving/copying of node is done on arrival; see FETCHREPLY */ /* Send a reply to the originator */ /* ToDo: Replace that by software costs for doing graph packing! */ CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime; new_event(from, to, CurrentTime[to]+RtsFlags.GranFlags.Costs.latency, FetchReply, tso, (StgClosure *)graph, (rtsSpark*)NULL); CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime; return (Ok); } else { /* incremental (single closure) fetching */ /* Actual moving/copying of node is done on arrival; see FETCHREPLY */ /* Send a reply to the originator */ CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime; new_event(from, to, CurrentTime[to]+RtsFlags.GranFlags.Costs.latency, FetchReply, tso, node, (rtsSpark*)NULL); CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime; return (Ok); } } else /* Qu'vatlh! node has been grabbed by another proc => forward */ { PEs node_loc = where_is(node); rtsTime fetchtime; IF_GRAN_DEBUG(thunkStealing, belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n", node,node_loc,to,from,CurrentTime[to])); if (RtsFlags.GranFlags.GranSimStats.Global) { globalGranStats.fetch_misses++; } /* Prepare FORWARD message to proc p_new */ CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime; fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) + RtsFlags.GranFlags.Costs.latency; new_event(node_loc, from, fetchtime, FetchNode, tso, node, (rtsSpark*)NULL); CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime; return (NodeHasMoved); } } /* blockFetch blocks a BlockedFetch node on some kind of black hole. Taken from gum/HLComms.lc. [find a better place for that ?] -- HWL {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't create @FMBQ@'s (FetchMe blocking queues) to cope with global blocking. Instead, non-local TSO are put into the BQ in the same way as local TSOs. However, we have to check if a TSO is local or global in order to account for the latencies involved and for keeping track of the number of fetches that are really going on. */ //@cindex blockFetch rtsFetchReturnCode blockFetch(tso, proc, bh) StgTSO* tso; /* TSO which gets blocked */ PEs proc; /* PE where that tso was running */ StgClosure* bh; /* closure to block on (BH, RBH, BQ) */ { StgInfoTable *info; IF_GRAN_DEBUG(bq, fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n", tso, tso->id, proc, bh, info_type(bh), where_is(bh))); if (!IS_BLACK_HOLE(bh)) { /* catches BHs and RBHs */ IF_GRAN_DEBUG(bq, fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n", bh, info_type(bh), tso, tso->id, proc)); /* No BH anymore => immediately unblock tso */ new_event(proc, proc, CurrentTime[proc], UnblockThread, tso, bh, (rtsSpark*)NULL); /* Is this always a REPLY to a FETCH in the profile ? */ if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0); return (NodeIsNoBH); } /* DaH {BQ}Daq Qu' Suq 'e' wISov! Now we know that we have to put the tso into the BQ. 2 cases: If block-on-fetch, tso is at head of threadq => => take it out of threadq and into BQ If reschedule-on-fetch, tso is only pointed to be event => just put it into BQ ngoq ngo'!! if (!RtsFlags.GranFlags.DoAsyncFetch) { GranSimBlock(tso, proc, bh); } else { if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0); ++(tso->gran.blockcount); tso->gran.blockedat = CurrentTime[proc]; } */ /* after scheduling the GlobalBlock event the TSO is not put into the run queue again; it is only pointed to via the event we are processing now; in GranSim 4.xx there is no difference between synchr and asynchr comm here */ ASSERT(!is_on_queue(tso, proc)); ASSERT(tso->link == END_TSO_QUEUE); GranSimBlock(tso, proc, bh); /* GranSim statistics gathering */ /* Now, put tso into BQ (similar to blocking entry codes) */ info = get_itbl(bh); switch (info -> type) { case RBH: case BLACKHOLE: case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here case SE_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here /* basically an inlined version of BLACKHOLE_entry -- HWL */ /* Change the BLACKHOLE into a BLACKHOLE_BQ */ ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info; /* Put ourselves on the blocking queue for this black hole */ // tso->link=END_TSO_QUEUE; not necessary; see assertion above ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso; tso->block_info.closure = bh; recordMutable((StgMutClosure *)bh); break; case BLACKHOLE_BQ: /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */ tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue); ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso; recordMutable((StgMutClosure *)bh); # if 0 && defined(GC_MUT_REQUIRED) ToDo: check whether recordMutable is necessary -- HWL /* * If we modify a black hole in the old generation, we have to make * sure it goes on the mutables list */ if (bh <= StorageMgrInfo.OldLim) { MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables; StorageMgrInfo.OldMutables = bh; } else MUT_LINK(bh) = MUT_NOT_LINKED; # endif break; case FETCH_ME_BQ: barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n", bh, tso, tso->id); default: { G_PRINT_NODE(bh); barf("Qagh: thought %p was a black hole (IP %p (%s))", bh, info, info_type(bh)); } } return (Ok); } //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code //@subsection Idle PEs /* Export work to idle PEs. This function is called from @ReSchedule@ before dispatching on the current event. @HandleIdlePEs@ iterates over all PEs, trying to get work for idle PEs. Note, that this is a simplification compared to GUM's fishing model. We try to compensate for that by making the cost for stealing work dependent on the number of idle processors and thereby on the probability with which a randomly sent fish would find work. */ //@cindex handleIdlePEs void handleIdlePEs(void) { PEs p; IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n")) /* Should never be entered in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); /* Could check whether there are idle PEs if it's a cheap check */ for (p = 0; p < RtsFlags.GranFlags.proc; p++) if (procStatus[p]==Idle) /* && IS_SPARKING(p) && IS_STARTING(p) */ /* First look for local work i.e. examine local spark pool! */ if (pending_sparks_hds[p]!=(rtsSpark *)NULL) { new_event(p, p, CurrentTime[p], FindWork, (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL); procStatus[p] = Sparking; } else if ((RtsFlags.GranFlags.maxFishes==0 || OutstandingFishes[p]= 4 || OutstandingFetches[p] == 0)) { if (SurplusThreads > 0l) /* Steal a thread */ stealThread(p); if (procStatus[p]!=Idle) break; } if (SparksAvail > 0 && (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */ stealSpark(p); if (SurplusThreads > 0 && (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */ stealThread(p); } } /* Steal a spark and schedule moving it to proc. We want to look at PEs in clock order -- most retarded first. Currently sparks are only stolen from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually, this should be changed to first steal from the former then from the latter. We model a sort of fishing mechanism by counting the number of sparks and threads we are currently stealing. */ /* Return a random nat value in the intervall [from, to) */ static nat natRandom(from, to) nat from, to; { nat r, d; ASSERT(from<=to); d = to - from; /* random returns a value in [0, RAND_MAX] */ r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX); r = (r==to) ? from : r; ASSERT(from<=r && (r CurrentTime[pes_by_time[j]]) { rtsTime temp = pes_by_time[i]; pes_by_time[i] = pes_by_time[j]; pes_by_time[j] = temp; } /* Choose random processor to steal spark from; first look at processors */ /* that are earlier than the current one (i.e. proc) */ for(first=0; (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]); ++first) /* nothing */ ; /* if the assertion below is true we can get rid of first */ /* ASSERT(first==n); */ /* ToDo: check if first is really needed; find cleaner solution */ *firstp = first; *np = n; } /* Steal a spark (piece of work) from any processor and bring it to proc. */ //@cindex stealSpark static rtsBool stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); } /* Steal a thread from any processor and bring it to proc i.e. thread migration */ //@cindex stealThread static rtsBool stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); } /* Steal a spark or a thread and schedule moving it to proc. */ //@cindex stealSomething static rtsBool stealSomething(proc, steal_spark, steal_thread) PEs proc; // PE that needs work (stealer) rtsBool steal_spark, steal_thread; // should a spark and/or thread be stolen { PEs p; rtsTime fish_arrival_time; rtsSpark *spark, *prev, *next; rtsBool stolen = rtsFalse; ASSERT(steal_spark || steal_thread); /* Should never be entered in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration); if (!RtsFlags.GranFlags.Fishing) { // ToDo: check if stealing threads is prefered over stealing sparks if (steal_spark) { if (stealSparkMagic(proc)) return rtsTrue; else // no spark found if (steal_thread) return stealThreadMagic(proc); else // no thread found return rtsFalse; } else { // ASSERT(steal_thread); return stealThreadMagic(proc); } barf("stealSomething: never reached"); } /* The rest of this function does GUM style fishing */ p = findRandomPE(proc); /* find a random PE other than proc */ /* Message packing costs for sending a Fish; qeq jabbI'ID */ CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime; /* use another GranEvent for requesting a thread? */ if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks) DumpRawGranEvent(p, proc, SP_REQUESTED, (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0); /* time of the fish arrival on the remote PE */ fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency; /* Phps use an own Fish event for that? */ /* The contents of the spark component is a HACK: 1 means give me a spark; 2 means give me a thread 0 means give me nothing (this should never happen) */ new_event(p, proc, fish_arrival_time, FindWork, (StgTSO*)NULL, (StgClosure*)NULL, (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0)); ++OutstandingFishes[proc]; /* only with Async fetching? */ if (procStatus[proc]==Idle) procStatus[proc]=Fishing; /* time needed to clean up buffers etc after sending a message */ CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime; /* If GUM style fishing stealing always succeeds because it only consists of sending out a fish; of course, when the fish may return empty-handed! */ return rtsTrue; } /* This version of stealing a spark makes use of the global info on all spark pools etc which is not available in a real parallel system. This could be extended to test e.g. the impact of perfect load information. */ //@cindex stealSparkMagic static rtsBool stealSparkMagic(proc) PEs proc; { PEs p=0, i=0, j=0, n=0, first, upb; rtsSpark *spark=NULL, *next; PEs pes_by_time[MAX_PROC]; rtsBool stolen = rtsFalse; rtsTime stealtime; /* Should never be entered in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); sortPEsByTime(proc, pes_by_time, &first, &n); while (!stolen && n>0) { upb = (first==0) ? n : first; i = natRandom(0,upb); /* choose a random eligible PE */ p = pes_by_time[i]; IF_GRAN_DEBUG(randomSteal, belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)", p, proc)); ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */ /* Now go through rtsSparkQ and steal the first eligible spark */ spark = pending_sparks_hds[p]; while (!stolen && spark != (rtsSpark*)NULL) { /* NB: no prev pointer is needed here because all sparks that are not chosen are pruned */ if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) && spark->next==(rtsSpark*)NULL) { /* Be social! Don't steal the only spark of an idle processor not {spark} neH yInIH !! */ break; /* next PE */ } else if (closure_SHOULD_SPARK(spark->node)) { /* Don't Steal local sparks; ToDo: optionally prefer local over global sparks if (!spark->global) { prev=spark; continue; next spark } */ /* found a spark! */ /* Prepare message for sending spark */ CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime; if (RtsFlags.GranFlags.GranSimStats.Sparks) DumpRawGranEvent(p, (PEs)0, SP_EXPORTED, (StgTSO*)NULL, spark->node, spark->name, spark_queue_len(p)); stealtime = (CurrentTime[p] > CurrentTime[proc] ? CurrentTime[p] : CurrentTime[proc]) + sparkStealTime(); new_event(proc, p /* CurrentProc */, stealtime, MoveSpark, (StgTSO*)NULL, spark->node, spark); stolen = rtsTrue; ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */ if (procStatus[proc]==Idle) procStatus[proc] = Fishing; ++(spark->global); /* record that this is a global spark */ ASSERT(SparksAvail>0); --SparksAvail; /* on-the-fly sparks are not available */ next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose! CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime; } else /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */ { IF_GRAN_DEBUG(checkSparkQ, belch("^^ pruning spark %p (node %p) in stealSparkMagic", spark, spark->node)); /* if the spark points to a node that should not be sparked, prune the spark queue at this point */ if (RtsFlags.GranFlags.GranSimStats.Sparks) DumpRawGranEvent(p, (PEs)0, SP_PRUNED, (StgTSO*)NULL, spark->node, spark->name, spark_queue_len(p)); if (RtsFlags.GranFlags.GranSimStats.Global) globalGranStats.pruned_sparks++; ASSERT(SparksAvail>0); --SparksAvail; spark = delete_from_sparkq(spark, p, rtsTrue); } /* unlink spark (may have been freed!) from sparkq; if (prev == NULL) // spark was head of spark queue pending_sparks_hds[p] = spark->next; else prev->next = spark->next; if (spark->next == NULL) pending_sparks_tls[p] = prev; else next->prev = prev; */ } /* while ... iterating over sparkq */ /* ToDo: assert that PE p still has work left after stealing the spark */ if (!stolen && (n>0)) { /* nothing stealable from proc p :( */ ASSERT(pes_by_time[i]==p); /* remove p from the list (at pos i) */ for (j=i; j+10) && (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]); first--) /* nothing */ ; } } /* while ... iterating over PEs in pes_by_time */ IF_GRAN_DEBUG(randomSteal, if (stolen) belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)", spark, spark->node, proc, p, SparksAvail, idlers()); else belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)", proc, SparksAvail, idlers())); if (RtsFlags.GranFlags.GranSimStats.Global && stolen && (i!=0)) { /* only for statistics */ globalGranStats.rs_sp_count++; globalGranStats.ntimes_total += n; globalGranStats.fl_total += first; globalGranStats.no_of_steals++; } return stolen; } /* The old stealThread code, which makes use of global info and does not send out fishes. NB: most of this is the same as in stealSparkMagic; only the pieces specific to processing thread queues are different; long live polymorphism! */ //@cindex stealThreadMagic static rtsBool stealThreadMagic(proc) PEs proc; { PEs p=0, i=0, j=0, n=0, first, upb; StgTSO *tso=END_TSO_QUEUE; PEs pes_by_time[MAX_PROC]; rtsBool stolen = rtsFalse; rtsTime stealtime; /* Should never be entered in GrAnSim Light setup */ ASSERT(!RtsFlags.GranFlags.Light); sortPEsByTime(proc, pes_by_time, &first, &n); while (!stolen && n>0) { upb = (first==0) ? n : first; i = natRandom(0,upb); /* choose a random eligible PE */ p = pes_by_time[i]; IF_GRAN_DEBUG(randomSteal, belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)", p, proc)); /* Steal the first exportable thread in the runnable queue but never steal the first in the queue for social reasons; not Qu' wa'DIch yInIH !! */ /* Would be better to search through queue and have options which of the threads to pick when stealing */ if (run_queue_hds[p] == END_TSO_QUEUE) { IF_GRAN_DEBUG(randomSteal, belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)", p, proc)); } else { tso = run_queue_hds[p]->link; /* tso is *2nd* thread in thread queue */ /* Found one */ stolen = rtsTrue; /* update links in queue */ run_queue_hds[p]->link = tso->link; if (run_queue_tls[p] == tso) run_queue_tls[p] = run_queue_hds[p]; /* ToDo: Turn magic constants into params */ CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime; stealtime = (CurrentTime[p] > CurrentTime[proc] ? CurrentTime[p] : CurrentTime[proc]) + sparkStealTime() + 4l * RtsFlags.GranFlags.Costs.additional_latency + 5l * RtsFlags.GranFlags.Costs.munpacktime; /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */ SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */); /* Move from one queue to another */ new_event(proc, p, stealtime, MoveThread, tso, (StgClosure*)NULL, (rtsSpark*)NULL); /* MAKE_BUSY(proc); not yet; only when thread is in threadq */ ++OutstandingFishes[proc]; if (procStatus[proc]) procStatus[proc] = Fishing; --SurplusThreads; if(RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(p, proc, GR_STEALING, tso, (StgClosure*)NULL, (StgInt)0, 0); /* costs for tidying up buffer after having sent it */ CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime; } /* ToDo: assert that PE p still has work left after stealing the spark */ if (!stolen && (n>0)) { /* nothing stealable from proc p :( */ ASSERT(pes_by_time[i]==p); /* remove p from the list (at pos i) */ for (j=i; j+10) && (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]); first--) /* nothing */ ; } } /* while ... iterating over PEs in pes_by_time */ IF_GRAN_DEBUG(randomSteal, if (stolen) belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)", tso->id, tso, proc, p, SparksAvail, idlers()); else belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)", proc, SparksAvail, idlers())); if (RtsFlags.GranFlags.GranSimStats.Global && stolen && (i!=0)) { /* only for statistics */ /* ToDo: more statistics on avg thread queue lenght etc */ globalGranStats.rs_t_count++; globalGranStats.no_of_migrates++; } return stolen; } //@cindex sparkStealTime static rtsTime sparkStealTime(void) { double fishdelay, sparkdelay, latencydelay; fishdelay = (double)RtsFlags.GranFlags.proc/2; sparkdelay = fishdelay - ((fishdelay-1.0)/(double)(RtsFlags.GranFlags.proc-1))*((double)idlers()); latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency); return((rtsTime)latencydelay); } //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code //@subsection Routines directly called from Haskell world /* The @GranSim...@ routines in here are directly called via macros from the threaded world. First some auxiliary routines. */ /* Take the current thread off the thread queue and thereby activate the next thread. It's assumed that the next ReSchedule after this uses NEW_THREAD as param. This fct is called from GranSimBlock and GranSimFetch */ //@cindex ActivateNextThread void ActivateNextThread (proc) PEs proc; { StgTSO *t; /* This routine is entered either via GranSimFetch or via GranSimBlock. It has to prepare the CurrentTSO for being blocked and update the run queue and other statistics on PE proc. The actual enqueuing to the blocking queue (if coming from GranSimBlock) is done in the entry code of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc). */ /* ToDo: add assertions here!! */ //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // Only necessary if the running thread is at front of the queue // run_queue_hds[proc] = run_queue_hds[proc]->link; ASSERT(CurrentProc==proc); ASSERT(!is_on_queue(CurrentTSO,proc)); if (run_queue_hds[proc]==END_TSO_QUEUE) { /* NB: this routine is only entered with asynchr comm (see assertion) */ procStatus[proc] = Idle; } else { /* ToDo: check cost assignment */ CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime; if (RtsFlags.GranFlags.GranSimStats.Full && (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight)) /* right flag !?? ^^^ */ DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc], (StgClosure*)NULL, (StgInt)0, 0); } } /* The following GranSim fcts are stg-called from the threaded world. */ /* Called from HP_CHK and friends (see StgMacros.h) */ //@cindex GranSimAllocate void GranSimAllocate(n) StgInt n; { CurrentTSO->gran.allocs += n; ++(CurrentTSO->gran.basicblocks); if (RtsFlags.GranFlags.GranSimStats.Heap) { DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO, (StgClosure*)NULL, (StgInt)0, n); } CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost; CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost; } /* Subtract the values added above, if a heap check fails and so has to be redone. */ //@cindex GranSimUnallocate void GranSimUnallocate(n) StgInt n; { CurrentTSO->gran.allocs -= n; --(CurrentTSO->gran.basicblocks); CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost; CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost; } /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */ //@cindex GranSimExec void GranSimExec(ariths,branches,loads,stores,floats) StgWord ariths,branches,loads,stores,floats; { StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths + RtsFlags.GranFlags.Costs.branch_cost*branches + RtsFlags.GranFlags.Costs.load_cost * loads + RtsFlags.GranFlags.Costs.store_cost*stores + RtsFlags.GranFlags.Costs.float_cost*floats; CurrentTSO->gran.exectime += cost; CurrentTime[CurrentProc] += cost; } /* Fetch the node if it isn't local -- result indicates whether fetch has been done. This is GRIP-style single item fetching. */ //@cindex GranSimFetch StgInt GranSimFetch(node /* , liveness_mask */ ) StgClosure *node; /* StgInt liveness_mask; */ { /* reset the return value (to be checked within STG land) */ NeedToReSchedule = rtsFalse; if (RtsFlags.GranFlags.Light) { /* Always reschedule in GrAnSim-Light to prevent one TSO from running off too far new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], ContinueThread,CurrentTSO,node,NULL); */ return(0); } /* Faking an RBH closure: If the bitmask of the closure is 0 then this node is a fake RBH; */ if (node->header.gran.procs == Nowhere) { IF_GRAN_DEBUG(bq, belch("## Found fake RBH (node %p); delaying TSO %d (%p)", node, CurrentTSO->id, CurrentTSO)); new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000, ContinueThread, CurrentTSO, node, (rtsSpark*)NULL); /* Rescheduling (GranSim internal) is necessary */ NeedToReSchedule = rtsTrue; return(1); } /* Note: once a node has been fetched, this test will be passed */ if (!IS_LOCAL_TO(PROCS(node),CurrentProc)) { PEs p = where_is(node); rtsTime fetchtime; IF_GRAN_DEBUG(thunkStealing, if (p==CurrentProc) belch("GranSimFetch: Trying to fetch from own processor%u\n", p);); CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime; /* NB: Fetch is counted on arrival (FetchReply) */ fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) + RtsFlags.GranFlags.Costs.latency; new_event(p, CurrentProc, fetchtime, FetchNode, CurrentTSO, node, (rtsSpark*)NULL); if (fetchtimegran.blockedat = CurrentTime[CurrentProc]; ++OutstandingFetches[CurrentProc]; if (RtsFlags.GranFlags.DoAsyncFetch) /* if asynchr comm is turned on, activate the next thread in the q */ ActivateNextThread(CurrentProc); else procStatus[CurrentProc] = Fetching; #if 0 /* ToDo: nuke the entire if (anything special for fair schedule?) */ if (RtsFlags.GranFlags.DoAsyncFetch) { /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */ if(!RtsFlags.GranFlags.DoFairSchedule) { /* now done in do_the_fetchnode if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO, node, (StgInt)0, 0); */ ActivateNextThread(CurrentProc); # if 0 && defined(GRAN_CHECK) if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) { if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) { fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n", CurrentTSO,CurrentTime[CurrentProc]); stg_exit(EXIT_FAILURE); } else { TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO; } } # endif CurrentTSO->link = END_TSO_QUEUE; /* CurrentTSO = END_TSO_QUEUE; */ /* CurrentTSO is pointed to by the FetchNode event; it is on no run queue any more */ } else { /* fair scheduling currently not supported -- HWL */ barf("Asynchr communication is not yet compatible with fair scheduling\n"); } } else { /* !RtsFlags.GranFlags.DoAsyncFetch */ procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch; /* now done in do_the_fetchnode if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO, node, (StgInt)0, 0); */ IF_GRAN_DEBUG(blockOnFetch, BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/ } #endif /* 0 */ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime; /* Rescheduling (GranSim internal) is necessary */ NeedToReSchedule = rtsTrue; return(1); } return(0); } //@cindex GranSimSpark void GranSimSpark(local,node) StgInt local; StgClosure *node; { /* ++SparksAvail; Nope; do that in add_to_spark_queue */ if (RtsFlags.GranFlags.GranSimStats.Sparks) DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK, END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1); /* Force the PE to take notice of the spark */ if(RtsFlags.GranFlags.DoAlwaysCreateThreads) { new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], FindWork, END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL); if (CurrentTime[CurrentProc]gran.localsparks; else ++CurrentTSO->gran.globalsparks; } //@cindex GranSimSparkAt void GranSimSparkAt(spark,where,identifier) rtsSpark *spark; StgClosure *where; /* This should be a node; alternatively could be a GA */ StgInt identifier; { PEs p = where_is(where); GranSimSparkAtAbs(spark,p,identifier); } //@cindex GranSimSparkAtAbs void GranSimSparkAtAbs(spark,proc,identifier) rtsSpark *spark; PEs proc; StgInt identifier; { rtsTime exporttime; if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */ return; /* turned a spark into a NULL. */ /* ++SparksAvail; Nope; do that in add_to_spark_queue */ if(RtsFlags.GranFlags.GranSimStats.Sparks) DumpRawGranEvent(proc,0,SP_SPARKAT, END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc)); if (proc!=CurrentProc) { CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime; exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]? CurrentTime[proc]: CurrentTime[CurrentProc]) + RtsFlags.GranFlags.Costs.latency; } else { exporttime = CurrentTime[CurrentProc]; } if ( RtsFlags.GranFlags.Light ) /* Need CurrentTSO in event field to associate costs with creating spark even in a GrAnSim Light setup */ new_event(proc, CurrentProc, exporttime, MoveSpark, CurrentTSO, spark->node, spark); else new_event(proc, CurrentProc, exporttime, MoveSpark, (StgTSO*)NULL, spark->node, spark); /* Bit of a hack to treat placed sparks the same as stolen sparks */ ++OutstandingFishes[proc]; /* Force the PE to take notice of the spark (FINDWORK is put after a MoveSpark into the sparkq!) */ if (RtsFlags.GranFlags.DoAlwaysCreateThreads) { new_event(CurrentProc,CurrentProc,exporttime+1, FindWork, (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL); } if (exporttimegran.globalsparks; } else { ++CurrentTSO->gran.localsparks; } } /* This function handles local and global blocking. It's called either from threaded code (RBH_entry, BH_entry etc) or from blockFetch when trying to fetch an BH or RBH */ //@cindex GranSimBlock void GranSimBlock(tso, proc, node) StgTSO *tso; PEs proc; StgClosure *node; { PEs node_proc = where_is(node), tso_proc = where_is((StgClosure *)tso); ASSERT(tso_proc==CurrentProc); // ASSERT(node_proc==CurrentProc); IF_GRAN_DEBUG(bq, if (node_proc!=CurrentProc) belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)", tso->id, tso, tso_proc, node, node_proc)); ASSERT(tso->link==END_TSO_QUEUE); ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already! //ASSERT(tso==run_queue_hds[proc]); IF_DEBUG(gran, belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx", tso->id, tso, proc, node, CurrentTime[proc])); /* THIS SHOULD NEVER HAPPEN! If tso tries to block on a remote node (i.e. node_proc!=CurrentProc) we have missed a GranSimFetch before entering this closure; we hack around it for now, faking a FetchNode; because GranSimBlock is entered via a BLACKHOLE(_BQ) closure, tso will be blocked on this closure until the FetchReply occurs. ngoq Dogh! if (node_proc!=CurrentProc) { StgInt ret; ret = GranSimFetch(node); IF_GRAN_DEBUG(bq, if (ret) belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d", node, node_proc, CurrentProc);); return; } */ if (RtsFlags.GranFlags.GranSimStats.Full) DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0); ++(tso->gran.blockcount); /* Distinction between local and global block is made in blockFetch */ tso->gran.blockedat = CurrentTime[proc]; CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime; ActivateNextThread(proc); /* tso->link = END_TSO_QUEUE; not really necessary; only for testing */ } #endif /* GRAN */ //@node Index, , Dumping routines, GranSim specific code //@subsection Index //@index //* ActivateNextThread:: @cindex\s-+ActivateNextThread //* CurrentProc:: @cindex\s-+CurrentProc //* CurrentTime:: @cindex\s-+CurrentTime //* GranSimAllocate:: @cindex\s-+GranSimAllocate //* GranSimBlock:: @cindex\s-+GranSimBlock //* GranSimExec:: @cindex\s-+GranSimExec //* GranSimFetch:: @cindex\s-+GranSimFetch //* GranSimLight_insertThread:: @cindex\s-+GranSimLight_insertThread //* GranSimSpark:: @cindex\s-+GranSimSpark //* GranSimSparkAt:: @cindex\s-+GranSimSparkAt //* GranSimSparkAtAbs:: @cindex\s-+GranSimSparkAtAbs //* GranSimUnallocate:: @cindex\s-+GranSimUnallocate //* any_idle:: @cindex\s-+any_idle //* blockFetch:: @cindex\s-+blockFetch //* do_the_fetchnode:: @cindex\s-+do_the_fetchnode //* do_the_fetchreply:: @cindex\s-+do_the_fetchreply //* do_the_findwork:: @cindex\s-+do_the_findwork //* do_the_globalblock:: @cindex\s-+do_the_globalblock //* do_the_movespark:: @cindex\s-+do_the_movespark //* do_the_movethread:: @cindex\s-+do_the_movethread //* do_the_startthread:: @cindex\s-+do_the_startthread //* do_the_unblock:: @cindex\s-+do_the_unblock //* fetchNode:: @cindex\s-+fetchNode //* ga_to_proc:: @cindex\s-+ga_to_proc //* get_next_event:: @cindex\s-+get_next_event //* get_time_of_next_event:: @cindex\s-+get_time_of_next_event //* grab_event:: @cindex\s-+grab_event //* handleFetchRequest:: @cindex\s-+handleFetchRequest //* handleIdlePEs:: @cindex\s-+handleIdlePEs //* idlers:: @cindex\s-+idlers //* insertThread:: @cindex\s-+insertThread //* insert_event:: @cindex\s-+insert_event //* is_on_queue:: @cindex\s-+is_on_queue //* is_unique:: @cindex\s-+is_unique //* new_event:: @cindex\s-+new_event //* prepend_event:: @cindex\s-+prepend_event //* print_event:: @cindex\s-+print_event //* print_eventq:: @cindex\s-+print_eventq //* prune_eventq :: @cindex\s-+prune_eventq //* spark queue:: @cindex\s-+spark queue //* sparkStealTime:: @cindex\s-+sparkStealTime //* stealSomething:: @cindex\s-+stealSomething //* stealSpark:: @cindex\s-+stealSpark //* stealSparkMagic:: @cindex\s-+stealSparkMagic //* stealThread:: @cindex\s-+stealThread //* stealThreadMagic:: @cindex\s-+stealThreadMagic //* thread_queue_len:: @cindex\s-+thread_queue_len //* traverse_eventq_for_gc:: @cindex\s-+traverse_eventq_for_gc //* where_is:: @cindex\s-+where_is //@end index