?? xdev_mxdev_mxdevice.c
字號:
/** The MIT License Copyright (c) 2005 - 2007 1. Distributed Systems Group, University of Portsmouth (2005) 2. Aamir Shafi (2005 - 2007) 3. Bryan Carpenter (2005 - 2007) 4. Mark Baker (2005 - 2007)*/ #include "myriexpress.h"#include <stdio.h>#include "xdev_mxdev_MXDevice.h"#include "jni.h"#include "mxdev_const.h" //***********************************************************//** block diagram for the tag ..it is a 64 bit number **//** <---16-----><------16------><--1--><--------31------->** //** ------------------------------------------------------**//** | context | src | PRI | tag |**//** ------------------------------------------------------**//***********************************************************static JavaVM *jvm;int myRank ; int procs ;jclass CL_mpjdev_Status ;jclass CL_mpjbuf_Buffer;jclass CL_xdev_mxdev_MXProcessID;jclass CL_mpjbuf_NIOBuffer;jclass CL_mpjbuf_Type;jclass CL_xdev_ProcessID; jclass CL_xdev_mxdev_MXRequest ; jfieldID FID_mpjbuf_Buffer_staticBuffer;jfieldID FID_mpjbuf_Buffer_dynamicBuffer;jfieldID FID_mpjbuf_Buffer_size;jfieldID FID_mpjbuf_Buffer_capacity;jfieldID FID_mpjbuf_NIOBuffer_buffer;jfieldID FID_mpjbuf_Type_code;jfieldID FID_xdev_ProcessID_uuid;jfieldID FID_xdev_mxdev_MXRequest_status ; jfieldID FID_xdev_mxdev_MXRequest_requestStruct ; jfieldID processhandleID; jfieldID processidID; jfieldID status_src_ID ; jfieldID status_tag_ID ;jfieldID countInBytesID;/* Caching of JNI stuff ..*/jint JNI_OnLoad(JavaVM *vm, void *reserved) { JNIEnv *env; jvm=vm; if (JNI_OK!=(*vm)->GetEnv(vm,(void **)&env,JNI_VERSION_1_4)) { exit(1); } // why am i not deleting these two global references ... (?) CL_mpjbuf_Buffer = (*env)->NewGlobalRef(env, (*env)->FindClass(env,"mpjbuf/Buffer")); CL_xdev_mxdev_MXProcessID = (*env)->NewGlobalRef(env, (*env)->FindClass(env,"xdev/mxdev/MXProcessID")); CL_xdev_mxdev_MXRequest = (*env)->FindClass(env,"xdev/mxdev/MXRequest"); CL_mpjbuf_NIOBuffer = (*env)->NewGlobalRef(env, (*env)->FindClass(env,"mpjbuf/NIOBuffer")); CL_mpjbuf_Type = (*env)->FindClass(env,"mpjbuf/Type"); CL_xdev_ProcessID = (*env)->FindClass(env,"xdev/ProcessID"); CL_mpjdev_Status = (*env)->FindClass(env,"mpjdev/Status"); FID_mpjbuf_Buffer_size = (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"size","I"); FID_mpjbuf_Buffer_capacity = (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"capacity","I"); FID_mpjbuf_Buffer_staticBuffer = (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"staticBuffer","Lmpjbuf/RawBuffer;"); FID_mpjbuf_Buffer_dynamicBuffer = (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"dynamicBuffer","[B"); FID_mpjbuf_NIOBuffer_buffer = (*env)->GetFieldID(env,CL_mpjbuf_NIOBuffer,"buffer","Ljava/nio/ByteBuffer;"); FID_mpjbuf_Type_code = (*env)->GetFieldID(env,CL_mpjbuf_Type,"code","I"); FID_xdev_ProcessID_uuid = (*env)->GetFieldID(env,CL_xdev_ProcessID,"uuid","Ljava/util/UUID;"); processhandleID = (*env)->GetFieldID(env, CL_xdev_mxdev_MXProcessID,"processHandle", "J") ; processidID = (*env)->GetFieldID(env, CL_xdev_mxdev_MXProcessID, "id", "I"); status_src_ID = (*env)->GetFieldID(env, CL_mpjdev_Status, "source", "I"); status_tag_ID = (*env)->GetFieldID(env, CL_mpjdev_Status, "tag", "I"); countInBytesID = (*env)->GetFieldID(env, CL_mpjdev_Status, "countInBytes", "I"); FID_xdev_mxdev_MXRequest_status = (*env)->GetFieldID(env, CL_xdev_mxdev_MXRequest,"status","Lmpjdev/Status;"); FID_xdev_mxdev_MXRequest_requestStruct = (*env)->GetFieldID(env, CL_xdev_mxdev_MXRequest,"requestStruct","J"); if (FID_mpjbuf_Buffer_staticBuffer && FID_mpjbuf_Buffer_size \ && FID_mpjbuf_NIOBuffer_buffer && FID_mpjbuf_Buffer_capacity \ && FID_xdev_ProcessID_uuid) { return JNI_VERSION_1_4; } else { {fprintf(stderr,"\n Fatal error getting FIDs"); exit(3);} }}/* Caching of JNI stuff (done)..*/mx_endpoint_t local_endpoint; uint32_t filter = 0xcafebabe; //extern mx_endpoint_t local_endpoint;static mx_endpoint_addr_t * peer_endpoints = NULL; /* * Class: xdev_mxdev_MXDevice * Method: nativeInit * Signature: ([Ljava/lang/String;I[Ljava/lang/String;[II[Lxdev/mxdev/MXProcessID;JJ)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeInit (JNIEnv *env, jobject jthis, jobjectArray argv, jint rank, jobjectArray processNames, jintArray ranks, jint nprocs, jobjectArray pids , jlong msb, jlong lsb) { mx_return_t rc; rc = mx_init(); mx_set_error_handler( MX_ERRORS_RETURN ); if(rc == MX_SUCCESS) { //printf("mx_init called \n"); } rc = mx_open_endpoint(MX_ANY_NIC, MX_ANY_ENDPOINT, filter, 0, 0, &local_endpoint); //printf(" nic id upper <%d> \n", MX_U32(nic_id)); //printf(" nic id lower <%d> \n", MX_L32(nic_id)); if(rc == MX_SUCCESS) { //printf("opened a local end-point \n"); } //sleep(5); jclass CL_java_util_UUID = (*env)->FindClass(env, "java/util/UUID"); jmethodID uuid_c = (*env)->GetMethodID(env, CL_java_util_UUID, "<init>", "(JJ)V") ; jmethodID pid_c = (*env)->GetMethodID(env, CL_xdev_mxdev_MXProcessID, "<init>", "(Ljava/util/UUID;)V") ; myRank = rank ; procs = nprocs ; peer_endpoints = (mx_endpoint_addr_t *) malloc(procs*sizeof(mx_endpoint_addr_t)); //.. .. uint64_t nic_id; //printf("native init method \n"); jobject pid; int len = (*env)->GetArrayLength(env,processNames); char** pNames = (char**)calloc(len, sizeof(char*)); int i=0; char *pName; for (i=0; i<len; i++) { pName =(jstring)(*env)->GetObjectArrayElement(env,processNames,i); pNames[i] = (*env)->GetStringUTFChars(env,pName,0); } /* connect loop */ for(i=0 ; i<nprocs ; i++) { //printf(" connecting to <%s> \n", pNames[i]); rc = mx_hostname_to_nic_id( pNames[i] , &nic_id); if(rc == MX_SUCCESS) { //printf("getting nic_id from hostname \n"); } NC: //printf("calling connect() \n"); //printf(" nic id upper <%d> \n", MX_U32(nic_id)); //printf(" nic id lower <%d> \n", MX_L32(nic_id)); rc = mx_connect(local_endpoint, nic_id, 0, filter, MX_INFINITE, &peer_endpoints [i]); //printf("called connect() \n"); if(rc == MX_SUCCESS) { //printf("connected to remote host <%s> \n", pNames[i]); } else { //printf("could not connect to <%s> \n", pNames[i]); //printf("trying again ..."); goto NC; //need to get rid of this ..we just need a do while loop. } } //fflush(stdout); // ids set up could be done here ... //printf("nativeIdsSetup rank <%d> of <%d> \n", myRank, procs); // sending accessories mx_segment_t buffer_desc[3]; mx_request_t send_handle[procs] ; uint64_t send_tag ; mx_status_t send_status ; uint32_t result ; // irecv accessories mx_segment_t recv_buffer[3]; mx_request_t recv_handle ; uint64_t match_tag; uint64_t match_mask; mx_status_t recv_status; uint32_t recv_result; buffer_desc[0].segment_ptr = &msb; buffer_desc[0].segment_length = 64; buffer_desc[1].segment_ptr = &lsb; buffer_desc[1].segment_length = 64; buffer_desc[2].segment_ptr = &rank; buffer_desc[2].segment_length = 4; //printf("rank <%d> \n",rank); //this needs to be fixed ..basically use the tag macros once I have //defined them ... send_tag = myRank << 32; //| UINT64_C(0x0000000000000000); for(i=0 ; i<procs ; i++) { if( i == myRank) { continue; } //printf("pro <%d> calling send to pro <%d> \n", // myRank, i); fflush(stdout); rc = mx_isend(local_endpoint, buffer_desc, 3, peer_endpoints[i], send_tag, NULL, &send_handle[i]); //if( rc == MX_SUCCESS ) { // printf("pro <%d> called isend to pro <%d> \n", // myRank, i); fflush(stdout); //} } //printf("pro <%d> reached A \n",myRank ); fflush(stdout); jlong _msb, _lsb; jint _rank ; match_mask = UINT64_C(0x00000000ffffffff); for(i=0 ; i<procs ; i++) { if( i == myRank) { continue; } recv_buffer[0].segment_ptr = &_msb; recv_buffer[0].segment_length = 64; recv_buffer[1].segment_ptr = &_lsb; recv_buffer[1].segment_length = 64; recv_buffer[2].segment_ptr = &_rank; recv_buffer[2].segment_length = 4; match_tag = i << 32 ; //printf("pro <%d> receiving from process <%d> \n", // myRank, i); fflush(stdout); rc = mx_irecv(local_endpoint, recv_buffer, 3, match_tag, match_mask, NULL , &recv_handle); //printf(" calling wait \n"); fflush(stdout); rc = mx_wait(local_endpoint, & recv_handle, MX_INFINITE, &recv_status, &result); if(rc == MX_SUCCESS) { // printf("pro <%d> received from process <%d> \n", // myRank, i); fflush(stdout); }else { printf("error \n"); } jobject uid = (*env)->NewObject(env, CL_java_util_UUID, uuid_c, _msb, _lsb); pid = (*env)->NewObject(env, CL_xdev_mxdev_MXProcessID, pid_c, uid); (*env)->SetLongField(env,pid, processhandleID, (jlong)&peer_endpoints[i]); (*env)->SetIntField(env,pid, processidID, _rank); (*env)->SetObjectArrayElement(env,pids,i,pid); } //printf("process <%d> reached B \n",myRank ); fflush(stdout); for(i=0 ; i<procs ; i++) { if( i == myRank) { continue; } //printf("pro <%d> calling send_wait to process <%d> \n", // myRank, i); fflush(stdout); mx_wait(local_endpoint, &send_handle[i], MX_INFINITE, &recv_status, &recv_result); //printf("pro <%d> called send_wait to process <%d> \n", // myRank, i); fflush(stdout); } //a. <get objectarrayelement for this process> pid = (*env)->GetObjectArrayElement(env,pids,myRank); //b. <set long field for it ...> (*env)->SetLongField(env,pid, processhandleID, (jlong)&peer_endpoints[myRank]); //c. <set integer field for it ...> (*env)->SetIntField(env,pid, processidID, myRank); //printf("process <%d> reached C \n",myRank ); fflush(stdout) ;}/* * Class: xdev_mxdev_MXDevice * Method: nativeSsend * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IIII)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeSsend (JNIEnv *env, jobject this, jobject buf, jobject dstID, jint tag, jint context, jint sbuf_length, jint dbuf_length) { /* MX accessories for calling mx_isend */ mx_return_t rc ; mx_request_t send_handle; mx_status_t status; mx_segment_t buffer_desc[1]; uint64_t match_send, dbuf_tag ; uint32_t result; mx_endpoint_addr_t * dest; dest = (mx_endpoint_addr_t *) ((*env)->GetLongField(env, dstID, processhandleID )) ; match_send = PRI_MATCH(context, myRank, tag); //printf("send_recv U32 <%x> \n",MX_U32(match_send));fflush(stdout); //printf("send_recv L32 <%x> \n",MX_L32(match_send));fflush(stdout); /* static buffer related declarations */ char *buffer_address=NULL; jobject staticBuffer; jbyteArray directbuffer; /* dynamic buffer related declarations */ jboolean isCopy=JNI_TRUE; jbyteArray dynamicBuffer ; jbyte* dBuffer; /* get static buffer related stuff */ staticBuffer = (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer); directbuffer = (jbyteArray) (*env)->GetObjectField(env, staticBuffer, FID_mpjbuf_NIOBuffer_buffer); buffer_address = (char *)(*env)->GetDirectBufferAddress(env, (jobject)directbuffer); /* get dynamic buffer related stuff */ dynamicBuffer = (jbyteArray) (*env)->GetObjectField(env,buf, FID_mpjbuf_Buffer_dynamicBuffer); if(dbuf_length > 0) { dBuffer = (*env)->GetByteArrayElements(env, dynamicBuffer, &isCopy); } //.. write the first eight bytes .. // _________________________ // | E | X | X | X | DSIZE | // ------------------------- char encoding = 1; buffer_address[0] = encoding ; buffer_address[4] = (((unsigned int) dbuf_length) >> 24) & 0xFF ; buffer_address[5] = (((unsigned int) dbuf_length) >> 16) & 0xFF; buffer_address[6] = (((unsigned int) dbuf_length) >> 8) & 0xFF ; buffer_address[7] = ((unsigned int) dbuf_length) & 0xFF; /* compose message, sort out tag/context and remote endpoints */ buffer_desc[0].segment_ptr = buffer_address ; buffer_desc[0].segment_length = sbuf_length+8; //+offset; /* send message */ //printf("native:send sending \n"); rc = mx_issend(local_endpoint, buffer_desc, 1, * dest, match_send, NULL, &send_handle); //printf("native:send sent \n"); mx_segment_t dbuf_desc [1] ; dbuf_tag = SEC_MATCH(context, myRank, tag); mx_request_t dbufsend_handle; dbuf_desc[0].segment_ptr = dBuffer ; dbuf_desc[0].segment_length = dbuf_length; if(dbuf_length > 0) { rc = mx_issend(local_endpoint, dbuf_desc, 1, * dest, dbuf_tag, NULL, &dbufsend_handle); } // so which one should be called here first? rc = mx_wait(local_endpoint, &send_handle, MX_INFINITE, &status, &result); if(dbuf_length > 0) { rc = mx_wait(local_endpoint, &dbufsend_handle, MX_INFINITE, &status, &result); }}/* * Class: xdev_mxdev_MXDevice * Method: nativeSend * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IIII)V */ JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeSend (JNIEnv *env, jobject this, jobject buf, jobject dstID, jint tag, jint context, jint sbuf_length, jint dbuf_length) { //printf(" nativeSend first statement \n"); fflush(stdout); /* MX accessories for calling mx_isend */ mx_return_t rc ; mx_request_t send_handle; mx_status_t status; mx_segment_t buffer_desc[1]; uint64_t match_send, dbuf_tag ; uint32_t result; mx_endpoint_addr_t * dest; dest = (mx_endpoint_addr_t *) ((*env)->GetLongField(env, dstID, processhandleID )) ; match_send = PRI_MATCH(context, myRank, tag); //printf("send_recv U32 <%x> \n",MX_U32(match_send));fflush(stdout); //printf("send_recv L32 <%x> \n",MX_L32(match_send));fflush(stdout); /* static buffer related declarations */ char *buffer_address=NULL; jobject staticBuffer; jbyteArray directbuffer; /* dynamic buffer related declarations */ jboolean isCopy=JNI_TRUE; jbyteArray dynamicBuffer ; jbyte* dBuffer; /* get static buffer related stuff */ staticBuffer = (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer); directbuffer = (jbyteArray) (*env)->GetObjectField(env, staticBuffer, FID_mpjbuf_NIOBuffer_buffer); buffer_address = (char *)(*env)->GetDirectBufferAddress(env,
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -