Skip to content

Commit 352e9f4

Browse files
authored
DecrementToEnqueue (#153)
* DecrementToEnqueue * added decrement and enqueue example * add multi-pe test * add counter reset and message free * provide default initialCount and newCount * fix args in examples
1 parent ecdb65d commit 352e9f4

File tree

7 files changed

+268
-0
lines changed

7 files changed

+268
-0
lines changed

include/converse.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,23 @@ void CmiLock(CmiNodeLock lock);
752752
void CmiUnlock(CmiNodeLock lock);
753753
int CmiTryLock(CmiNodeLock lock);
754754

755+
//decrementToEnqueue
756+
typedef struct DecrementToEnqueueMsg{
757+
unsigned int *counter;
758+
void *msg;
759+
} DecrementToEnqueueMsg;
760+
761+
void CmiDecrementCounter(DecrementToEnqueueMsg *dteMsg);
762+
#ifdef __cplusplus
763+
//default set to 10
764+
DecrementToEnqueueMsg *CmiCreateDecrementToEnqueue(void *msg, unsigned int initialCount = 10);
765+
void CmiResetCounter(DecrementToEnqueueMsg *dteMsg, unsigned int newCount = 10);
766+
#else
767+
DecrementToEnqueueMsg *CmiCreateDecrementToEnqueue(void *msg, unsigned int initialCount);
768+
void CmiResetCounter(DecrementToEnqueueMsg *dteMsg, unsigned int newCount);
769+
#endif
770+
void CmiFreeDecrementToEnqueue(DecrementToEnqueueMsg *dteMsg);
771+
755772
// error checking
756773

757774
// do we want asserts to be defaulted to be on or off(right now it is on)

src/convcore.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,71 @@ int CmiRegisterHandlerEx(CmiHandlerEx h, void *userPtr) {
664664
return handlerVector->size() - 1;
665665
}
666666

667+
//decrement to enqueue
668+
669+
DecrementToEnqueueMsg *CmiCreateDecrementToEnqueue(void *msg, unsigned int initialCount){
670+
if(initialCount == 0){
671+
CmiAbort("CmiCreateDecrementToEnqueue: initialCount cannot be zero\n");
672+
}
673+
DecrementToEnqueueMsg *dteMsg = static_cast<DecrementToEnqueueMsg *>(malloc(sizeof(DecrementToEnqueueMsg)));
674+
dteMsg->counter = static_cast<unsigned int *>(malloc(sizeof(unsigned int)));
675+
*(dteMsg->counter) = initialCount;
676+
dteMsg->msg = msg;
677+
return dteMsg;
678+
}
679+
680+
void CmiDecrementCounter(DecrementToEnqueueMsg *dteMsg){
681+
if(dteMsg == nullptr){
682+
CmiAbort("CmiDecrementCounter: dteMsg is nullptr\n");
683+
}
684+
if(dteMsg->counter == nullptr){
685+
// In concurrent scenarios the counter pointer may have been freed by
686+
// another thread (race between decrements). Instead of aborting, be
687+
// tolerant and return silently since the counter has already been
688+
// consumed.
689+
return;
690+
}
691+
unsigned int oldValue = __atomic_fetch_sub(dteMsg->counter, 1, __ATOMIC_SEQ_CST);
692+
if(oldValue == 0){
693+
CmiAbort("CmiDecrementCounter: counter already zero, cannot decrement further\n");
694+
}
695+
if(oldValue == 1){
696+
//get dest PE from message header
697+
CmiMessageHeader *header = static_cast<CmiMessageHeader *>(dteMsg->msg);
698+
int destPE = header->destPE;
699+
CmiAssertMsg(
700+
destPE >= 0 && destPE < Cmi_npes,
701+
"CmiDecrementCounter: destPE out of range"
702+
);
703+
// enqueue the message without freeing
704+
CmiSyncSend(destPE, header->messageSize, dteMsg->msg);
705+
}
706+
}
707+
708+
void CmiResetCounter(DecrementToEnqueueMsg *dteMsg, unsigned int newCount){
709+
if(dteMsg == nullptr){
710+
CmiAbort("CmiResetCounter: dteMsg is nullptr\n");
711+
}
712+
if(dteMsg->counter == nullptr){
713+
CmiAbort("CmiResetCounter: counter is nullptr\n");
714+
}
715+
if(newCount == 0){
716+
CmiAbort("CmiResetCounter: newCount cannot be zero\n");
717+
}
718+
__atomic_store_n(dteMsg->counter, newCount, __ATOMIC_SEQ_CST);
719+
}
720+
721+
void CmiFreeDecrementToEnqueue(DecrementToEnqueueMsg *dteMsg){
722+
if(dteMsg == nullptr){
723+
CmiAbort("CmiFreeDecrementToEnqueue: dteMsg is nullptr\n");
724+
}
725+
if(dteMsg->counter == nullptr){
726+
CmiAbort("CmiFreeDecrementToEnqueue: counter is nullptr\n");
727+
}
728+
free(dteMsg->counter);
729+
free(dteMsg);
730+
}
731+
667732
void CmiNodeBarrier(void) {
668733
static Barrier nodeBarrier(CmiMyNodeSize());
669734
int64_t ticket = nodeBarrier.arrive();

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ add_subdirectory(broadcast)
22
add_subdirectory(group)
33
add_subdirectory(within-node-bcast)
44
add_subdirectory(conds)
5+
add_subdirectory(decrement_enqueue)
6+
add_subdirectory(decrement_bcast)
57
add_subdirectory(idle)
68
add_subdirectory(kNeighbors)
79
add_subdirectory(orig-converse/pingpong)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
add_reconverse_executable(decrement_bcast decrement_bcast.cpp)
2+
# run with 4 PEs by default to exercise multiple PEs; override in ctest if desired
3+
add_test(NAME decrement_bcast COMMAND decrement_bcast +pe 4)
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Multi-PE reconverse test for CmiCreateDecrementToEnqueue / CmiDecrementCounter
2+
// Intended to run with an arbitrary number of PEs.
3+
// Assumption: the counter is created on PE 0 with initialCount = 4 * CmiNumPes()
4+
// (the user requested "4*CmiMyPe()" but that would be zero on PE 0; to make the
5+
// test meaningful across arbitrary PEs we initialize to 4 * number of PEs so
6+
// each PE can send 4 messages to PE 0).
7+
8+
#include "converse.h"
9+
#include <string.h>
10+
11+
// Global pointer to the DecrementToEnqueueMsg created on PE 0. Other PEs do not
12+
// need direct access to its fields; PE 0 will own and use this pointer when
13+
// decrementing. Making it global simplifies the broadcast message.
14+
static DecrementToEnqueueMsg *g_dte = NULL;
15+
static int g_decInvH = -1;
16+
17+
// Handler called when final message is delivered (counter reached zero).
18+
void exit_handler(void *msg) {
19+
CmiPrintf("Exit handler: counter reached zero on PE %d\n", CmiMyPe());
20+
CmiFreeDecrementToEnqueue(g_dte);
21+
CmiExit(0);
22+
}
23+
24+
// Handler that will be invoked on PE0 for each incoming decrement-invoker
25+
// message. It calls CmiDecrementCounter on the global DTE.
26+
void decrement_invoker(void *msg) {
27+
(void)msg; // incoming message payload is not used
28+
29+
//CmiPrintf("decrement_invoker: PE %d decrementing counter\n", CmiMyPe());
30+
31+
// Call the decrement operation on the global DTE
32+
CmiDecrementCounter(g_dte);
33+
34+
// Free the incoming message
35+
CmiFree(msg);
36+
}
37+
38+
// Handler invoked on every PE when the broadcast arrives. Each PE will send 4
39+
// small messages to PE 0; those messages will trigger decrement_invoker on PE0.
40+
void broadcast_handler(void *msg) {
41+
// The broadcast carries no extra payload for this test; simply send 4
42+
// messages to PE 0. PE 0 owns the global DTE (g_dte) and will perform the
43+
// decrements when it receives these messages.
44+
(void)msg; // unused
45+
46+
int dest = 0;
47+
48+
// Send 4 messages to PE 0. The messages themselves carry no useful payload
49+
// other than the header and are used only to trigger the decrement handler
50+
// on PE 0.
51+
//CmiPrintf("broadcast_handler: PE %d sending 4 decrement messages to PE 0\n", CmiMyPe());
52+
for (int i = 0; i < 4; ++i) {
53+
int sendSize = (int)sizeof(CmiMessageHeader);
54+
char *smsg = (char *)CmiAlloc(sendSize);
55+
memset(smsg, 0, sendSize);
56+
57+
// set handler to the pre-registered decrement_invoker
58+
CmiSetHandler(smsg, g_decInvH);
59+
CmiMessageHeader *sendHdr = (CmiMessageHeader *)smsg;
60+
sendHdr->destPE = dest;
61+
sendHdr->messageSize = sendSize;
62+
63+
// send to PE 0 and free the buffer if the send copies it; use SyncSendAndFree
64+
CmiSyncSendAndFree(dest, sendSize, smsg);
65+
}
66+
}
67+
68+
// Start function: PE 0 creates the DecrementToEnqueueMsg with initialCount =
69+
// 4 * CmiNumPes(), then broadcasts a message carrying the dte pointer. All
70+
// PEs will receive the broadcast and send 4 messages to PE 0 which trigger
71+
// decrements.
72+
void test_start(int argc, char **argv) {
73+
(void)argc; (void)argv;
74+
int exitH = CmiRegisterHandler((CmiHandler)exit_handler);
75+
int bcastH = CmiRegisterHandler((CmiHandler)broadcast_handler);
76+
77+
// register the decrement invoker once and store its handler globally so
78+
// broadcast_handler can reuse it when composing outgoing messages.
79+
g_decInvH = CmiRegisterHandler((CmiHandler)decrement_invoker);
80+
81+
int numPes = CmiNumPes();
82+
int initial = 4 * numPes; // 4 messages per PE
83+
84+
if (CmiMyPe() == 0) {
85+
// create final message that will be sent when counter reaches zero
86+
int finalSize = (int)sizeof(CmiMessageHeader);
87+
void *finalMsg = CmiAlloc(finalSize);
88+
memset(finalMsg, 0, finalSize);
89+
CmiSetHandler(finalMsg, exitH);
90+
CmiMessageHeader *fhdr = (CmiMessageHeader *)finalMsg;
91+
fhdr->destPE = 0;
92+
fhdr->messageSize = finalSize;
93+
94+
g_dte = CmiCreateDecrementToEnqueue(finalMsg, (unsigned int)initial);
95+
// Ensure stores to g_dte and its internals are visible to other PEs/threads.
96+
//CmiMemoryWriteFence();
97+
//CmiPrintf("[PE %d] created g_dte=%p, counter=%p (initial=%d)\n", CmiMyPe(), (void*)g_dte, (void*)(g_dte?g_dte->counter:NULL), initial);
98+
}
99+
100+
// Ensure that PE 0 has finished creating g_dte before anyone reacts to the
101+
// broadcast. This prevents races where receivers send messages that reach
102+
// PE 0 before g_dte is initialized, which would cause CmiDecrementCounter to
103+
// see a null counter.
104+
CmiNodeAllBarrier();
105+
//CmiPrintf("[PE %d] passed node barrier\n", CmiMyPe());
106+
107+
// Build a small broadcast message. Receivers will consult the global g_dte
108+
// (which is valid on PE 0) and send messages to PE 0 to trigger decrements.
109+
int bsize = (int)sizeof(CmiMessageHeader);
110+
void *bmsg = CmiAlloc(bsize);
111+
memset(bmsg, 0, bsize);
112+
CmiMessageHeader *bhdr = (CmiMessageHeader *)bmsg;
113+
bhdr->messageSize = bsize;
114+
CmiSetHandler(bmsg, bcastH);
115+
116+
// Broadcast to all PEs
117+
if (CmiMyPe() == 0) CmiSyncBroadcastAllAndFree(bsize, bmsg);
118+
119+
// Return from start; scheduler will process incoming messages and the exit
120+
// handler will terminate when the counter reaches zero.
121+
}
122+
123+
int main(int argc, char **argv) {
124+
ConverseInit(argc, argv, test_start, 0, 0);
125+
return 0;
126+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
add_reconverse_executable(decrement_enqueue decrement_enqueue.cpp)
2+
add_test(NAME decrement_enqueue COMMAND decrement_enqueue +pe 1)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Simple reconverse test for CmiCreateDecrementToEnqueue / CmiDecrementCounter
2+
// Intended to run with 1 PE on 1 process.
3+
4+
#include "converse.h"
5+
#include <string.h>
6+
7+
DecrementToEnqueueMsg *dte;
8+
9+
// Handler invoked when the decrement counter reaches zero. Exits the program.
10+
void exit_handler(void *msg) {
11+
// free DecrementToEnqueueMsg
12+
CmiFreeDecrementToEnqueue(dte);
13+
CmiPrintf("Exit handler called, exiting...\n");
14+
CmiExit(0);
15+
}
16+
17+
// Start function invoked by ConverseInit.
18+
void test_start(int argc, char **argv) {
19+
CmiPrintf("Starting decrement_enqueue test...\n");
20+
// Only PE 0 will create and drive the counter per the test spec.
21+
if (CmiMyPe() != 0) return;
22+
23+
int handler = CmiRegisterHandler((CmiHandler)exit_handler);
24+
25+
// Create a minimal message consisting only of the message header.
26+
int msgSize = (int)sizeof(CmiMessageHeader);
27+
void *msg = CmiAlloc(msgSize);
28+
memset(msg, 0, msgSize);
29+
30+
// Set handler, destination and size on the header so CmiDecrementCounter
31+
// can inspect them when it enqueues the message.
32+
CmiSetHandler(msg, handler);
33+
CmiMessageHeader *hdr = (CmiMessageHeader *)msg;
34+
hdr->destPE = (CmiUInt4)CmiMyPe();
35+
hdr->messageSize = msgSize;
36+
37+
// Create the decrement-to-enqueue helper with initial count 16.
38+
dte = CmiCreateDecrementToEnqueue(msg, 16u);
39+
40+
// Decrement 16 times; on the 16th call the message will be sent and the
41+
// registered handler will call CmiExit.
42+
for (int i = 0; i < 16; ++i) {
43+
CmiDecrementCounter(dte);
44+
}
45+
46+
// Return from start; scheduler will run and the exit handler will stop it.
47+
}
48+
49+
int main(int argc, char **argv) {
50+
// Start the Converse runtime with our test_start function.
51+
ConverseInit(argc, argv, test_start, 0, 0);
52+
return 0;
53+
}

0 commit comments

Comments
 (0)