?? xdev_mxdev_mxdevice.c
字號(hào):
(jobject)directbuffer); //printf("1 \n");fflush(stdout); /* get dynamic buffer related stuff */ dynamicBuffer = (jbyteArray) (*env)->GetObjectField(env,buf, FID_mpjbuf_Buffer_dynamicBuffer); if(dbuf_length > 0) { dBuffer = (*env)->GetByteArrayElements(env, dynamicBuffer, &isCopy); } //.. write the first eight bytes .. // _________________________ // | E | X | X | X | DSIZE | // ------------------------- char encoding = 1; buffer_address[0] = encoding ; buffer_address[4] = (((unsigned int) dbuf_length) >> 24) & 0xFF ; buffer_address[5] = (((unsigned int) dbuf_length) >> 16) & 0xFF; buffer_address[6] = (((unsigned int) dbuf_length) >> 8) & 0xFF ; buffer_address[7] = ((unsigned int) dbuf_length) & 0xFF; /* compose message, sort out tag/context and remote endpoints */ buffer_desc[0].segment_ptr = buffer_address ; buffer_desc[0].segment_length = sbuf_length+8; //+offset; /* send message */ //printf("native:send sending \n"); rc = mx_isend(local_endpoint, buffer_desc, 1, * dest, match_send, NULL, &send_handle); //printf("native:send sent \n"); mx_segment_t dbuf_desc [1] ; dbuf_tag = SEC_MATCH(context, myRank, tag); mx_request_t dbufsend_handle; dbuf_desc[0].segment_ptr = dBuffer ; dbuf_desc[0].segment_length = dbuf_length; if(dbuf_length > 0) { rc = mx_isend(local_endpoint, dbuf_desc, 1, * dest, dbuf_tag, NULL, &dbufsend_handle); } // so which one should be called here first? rc = mx_wait(local_endpoint, &send_handle, MX_INFINITE, &status, &result); if(dbuf_length > 0) { rc = mx_wait(local_endpoint, &dbufsend_handle, MX_INFINITE, &status, &result); } //printf("nativeSend finished \n"); fflush(stdout); }/* * Class: xdev_mxdev_MXDevice * Method: nativeIssend * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IILxdev/mxdev/MXSendRequest;)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeIssend (JNIEnv *env, jobject obj1, jobject buf, jobject dstID, jint tag, jint context, jint sbuf_length, jint dbuf_length, jobject req) { //.. .. int offset = 8; /* stuff required for non-blocking */ jclass req_class = (*env)->GetObjectClass(env, req); jfieldID reqhandleID = (*env)->GetFieldID(env, req_class, "handle", "J") ; jfieldID dbufreqhandleID = (*env)->GetFieldID(env, req_class, "dbufHandle", "J"); jfieldID lepID = (*env)->GetFieldID(env, req_class, "localEndpointHandle", "J") ; jfieldID bufferhandleID = (*env)->GetFieldID(env, req_class, "bufferHandle", "Lmpjbuf/Buffer;") ; jfieldID dbuflenID = (*env)->GetFieldID(env, req_class, "dbuflen", "I") ; /* MX accessories for calling mx_isend */ mx_return_t rc ; mx_request_t send_handle; mx_status_t status; mx_segment_t buffer_desc[1]; uint64_t match_send, dbuf_tag ; uint32_t result; mx_endpoint_addr_t * dest; dest = (mx_endpoint_addr_t *) ((*env)->GetLongField(env, dstID, processhandleID )) ; match_send = PRI_MATCH(context, myRank, tag); //printf("send_recv U32 <%x> \n",MX_U32(match_send));fflush(stdout); //printf("send_recv L32 <%x> \n",MX_L32(match_send));fflush(stdout); /* static buffer related declarations */ char *buffer_address=NULL; jobject staticBuffer; jbyteArray directbuffer; /* dynamic buffer related declarations */ jboolean isCopy=JNI_TRUE; jbyteArray dynamicBuffer ; jbyte* dBuffer; /* get static buffer related stuff */ staticBuffer = (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer); directbuffer = (jbyteArray) (*env)->GetObjectField(env, staticBuffer, FID_mpjbuf_NIOBuffer_buffer); buffer_address = (char *)(*env)->GetDirectBufferAddress(env, (jobject)directbuffer); /* get dynamic buffer related stuff */ dynamicBuffer = (jbyteArray) (*env)->GetObjectField(env,buf, FID_mpjbuf_Buffer_dynamicBuffer); if(dbuf_length > 0) { dBuffer = (*env)->GetByteArrayElements(env, dynamicBuffer, &isCopy); } //.. write the first eight bytes .. // _________________________ // | E | X | X | X | DSIZE | // ------------------------- char encoding = 1; buffer_address[0] = encoding ; buffer_address[4] = (((unsigned int) dbuf_length) >> 24) & 0xFF ; buffer_address[5] = (((unsigned int) dbuf_length) >> 16) & 0xFF; buffer_address[6] = (((unsigned int) dbuf_length) >> 8) & 0xFF ; buffer_address[7] = ((unsigned int) dbuf_length) & 0xFF; /* compose message, sort out tag/context and remote endpoints */ buffer_desc[0].segment_ptr = buffer_address ; buffer_desc[0].segment_length = sbuf_length+offset; /* send message */ //printf("native:send sending \n"); rc = mx_issend(local_endpoint, buffer_desc, 1, * dest, match_send, NULL, &send_handle); //printf("native:send sent \n"); (*env)->SetLongField(env,req,reqhandleID,(jlong)send_handle); (*env)->SetLongField(env,req,lepID,(jlong)local_endpoint); (*env)->SetObjectField(env,req,bufferhandleID,buf); mx_segment_t dbuf_desc [1] ; mx_request_t dbufsend_handle; dbuf_desc[0].segment_ptr = dBuffer ; dbuf_desc[0].segment_length = dbuf_length; if(dbuf_length > 0) { dbuf_tag = SEC_MATCH(context, myRank, tag); rc = mx_isend(local_endpoint, dbuf_desc, 1, * dest, dbuf_tag, NULL, &dbufsend_handle); } (*env)->SetIntField(env,req,dbuflenID,dbuf_length); (*env)->SetLongField(env,req,dbufreqhandleID,(jlong)dbufsend_handle );}/* * Class: xdev_mxdev_MXDevice * Method: nativeIsend * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IILxdev/mxdev/MXSendRequest;I)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeIsend (JNIEnv *env, jobject obj1, jobject buf, jobject dstID, jint tag, jint context, jint sbuf_length, jint dbuf_length, jobject req, jint offset) { /* stuff required for non-blocking */ jclass req_class = (*env)->GetObjectClass(env, req); jfieldID reqhandleID = (*env)->GetFieldID(env, req_class, "handle", "J") ; jfieldID dbufreqhandleID = (*env)->GetFieldID(env, req_class, "dbufHandle", "J"); jfieldID lepID = (*env)->GetFieldID(env, req_class, "localEndpointHandle", "J") ; jfieldID bufferhandleID = (*env)->GetFieldID(env, req_class, "bufferHandle", "Lmpjbuf/Buffer;") ; jfieldID dbuflenID = (*env)->GetFieldID(env, req_class, "dbuflen", "I") ; /* MX accessories for calling mx_isend */ mx_return_t rc ; mx_request_t send_handle; mx_status_t status; mx_segment_t buffer_desc[1]; uint64_t match_send, dbuf_tag ; uint32_t result; mx_endpoint_addr_t * dest; dest = (mx_endpoint_addr_t *) ((*env)->GetLongField(env, dstID, processhandleID )) ; match_send = PRI_MATCH(context, myRank, tag); //printf("send_recv U32 <%x> \n",MX_U32(match_send));fflush(stdout); //printf("send_recv L32 <%x> \n",MX_L32(match_send));fflush(stdout); /* static buffer related declarations */ char *buffer_address=NULL; jobject staticBuffer; jbyteArray directbuffer; /* dynamic buffer related declarations */ jboolean isCopy=JNI_TRUE; jbyteArray dynamicBuffer ; jbyte* dBuffer; /* get static buffer related stuff */ staticBuffer = (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer); directbuffer = (jbyteArray) (*env)->GetObjectField(env, staticBuffer, FID_mpjbuf_NIOBuffer_buffer); buffer_address = (char *)(*env)->GetDirectBufferAddress(env, (jobject)directbuffer); /* get dynamic buffer related stuff */ dynamicBuffer = (jbyteArray) (*env)->GetObjectField(env,buf, FID_mpjbuf_Buffer_dynamicBuffer); if(dbuf_length > 0) { dBuffer = (*env)->GetByteArrayElements(env, dynamicBuffer, &isCopy); } //.. write the first eight bytes .. // _________________________ // | E | X | X | X | DSIZE | // ------------------------- char encoding = 1; buffer_address[0] = encoding ; buffer_address[4] = (((unsigned int) dbuf_length) >> 24) & 0xFF ; buffer_address[5] = (((unsigned int) dbuf_length) >> 16) & 0xFF; buffer_address[6] = (((unsigned int) dbuf_length) >> 8) & 0xFF ; buffer_address[7] = ((unsigned int) dbuf_length) & 0xFF; /* compose message, sort out tag/context and remote endpoints */ buffer_desc[0].segment_ptr = buffer_address ; buffer_desc[0].segment_length = sbuf_length+offset; /* send message */ //printf("native:send sending \n"); fflush(stdout); rc = mx_isend(local_endpoint, buffer_desc, 1, * dest, match_send, NULL, &send_handle); if(rc != MX_SUCCESS) { printf("error in isend"); fflush(stdout); } //printf("native:send sent \n"); fflush(stdout); (*env)->SetLongField(env,req,reqhandleID,(jlong)send_handle); (*env)->SetLongField(env,req,lepID,(jlong)local_endpoint); (*env)->SetObjectField(env,req,bufferhandleID,buf); mx_segment_t dbuf_desc [1] ; dbuf_tag = SEC_MATCH(context, myRank, tag); mx_request_t dbufsend_handle; dbuf_desc[0].segment_ptr = dBuffer ; dbuf_desc[0].segment_length = dbuf_length; if(dbuf_length > 0) { rc = mx_isend(local_endpoint, dbuf_desc, 1, * dest, dbuf_tag, NULL, &dbufsend_handle); } (*env)->SetLongField(env,req,dbufreqhandleID,(jlong)dbufsend_handle ); (*env)->SetIntField(env,req,dbuflenID,dbuf_length);}/* * Class: xdev_mxdev_MXDevice * Method: nativeRecv * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IILmpjdev/Status;I)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeRecv (JNIEnv *env, jobject this, jobject buf, jobject srcID, jint tag, jint context, jobject status, jint any_src) { jboolean isCopy = JNI_TRUE; //jclass req_class = (*env)->GetObjectClass(env, req); //we probably need to get hold of status here ... int src_id = ((*env)->GetIntField(env, srcID, processidID )) ; /* static buffer declarations */ char *buffer_address=NULL; jobject staticBuffer; jbyteArray directbuffer; //int * sbuf_length; int capacity ; /* dynamic buffer declarations */ jbyteArray darr; jbyte* dBuffer; /* MX related declarations */ mx_return_t rc ; mx_request_t recv_handle ; mx_segment_t buffer_desc[1]; uint64_t match_recv, match_recv2, match_mask , proc_mask, tag_mask ; mx_status_t recv_status; uint32_t result; if(tag == ANY_TAG) { tag_mask = EMPTY ; } else { tag_mask = MASK_TAG ; } if(any_src == 1) { src_id = 0 ; proc_mask = EMPTY; } else { proc_mask = MASK_SRC ; } match_mask = MATCH_CONTEXT | proc_mask | tag_mask ; if(tag == ANY_TAG) { match_recv = PRI_MATCH(context, src_id, 0); match_recv2 = SEC_MATCH(context, src_id, 0); } else { match_recv = PRI_MATCH(context, src_id, tag); match_recv2 = SEC_MATCH(context, src_id, tag); } //printf("src_id <%d> \n", src_id ); //printf("context <%d> \n", context); //printf("match_mask U32 <%x> \n",MX_U32(match_mask)); //printf("match_recv U32 <%x> \n",MX_U32(match_recv)); //printf("match_mask L32 <%x> \n",MX_L32(match_mask)); //printf("match_recv L32 <%x> \n",MX_L32(match_recv)); //fflush(stdout); /* get static buffer related stuff */ staticBuffer = (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer); directbuffer = (jbyteArray) (*env)->GetObjectField(env, staticBuffer, FID_mpjbuf_NIOBuffer_buffer); buffer_address = (char *)(*env)->GetDirectBufferAddress(env, (jobject)directbuffer); capacity = (unsigned int) (*env)->GetIntField(env,buf,FID_mpjbuf_Buffer_capacity); /* compose message ,sort out tag/context*/ buffer_desc[0].segment_ptr = buffer_address; buffer_desc[0].segment_length = capacity+8 ;//+offset .. /* recv the message */ rc = mx_irecv(local_endpoint, buffer_desc, 1, match_recv, match_mask, NULL, &recv_handle); /* wait for it complete */ rc = mx_wait(local_endpoint, & recv_handle, MX_INFINITE, &recv_status, &result); //printf("********** \n"); //printf("tag <%d> \n", GET_TAG(recv_status.match_info)); //printf("src <%d> \n", GET_SRC(recv_status.match_info)); //printf("********** \n"); //status.src = recv_status.match_info ; //need a translation back from src to srcID ... //status.tag = recv_status.match_info ; (*env)->SetIntField(env,status, status_src_ID, GET_SRC(recv_status.match_info) ); (*env)->SetIntField(env,status, status_tag_ID, GET_TAG(recv_status.match_info) ); (*env)->SetIntField(env,status, countInBytesID , recv_status.msg_length - 16); //-8 is offset ... //printf("recv:req called wait for control message \n"); fflush(stdout); char encoding = 1; int dbuf_length ; //.. dbuf_length .. encoding = buffer_address[0] ; dbuf_length = (((int)(unsigned char) buffer_address[4]) << 24) | (((int)(unsigned char) buffer_address[5]) << 16) | (((int)(unsigned char) buffer_address[6]) << 8) | (((int)(unsigned char) buffer_address[7]) ); //printf("dbuf_length [after strange] <%d> \n",dbuf_length); //int dbuf_length = byte22int(buffer_address,0); int sbuf_length = recv_status.msg_length - 8 ; //- offset if(dbuf_length > 0) { darr = (*env)->NewByteArray (env, dbuf_length); dBuffer = (*env)->GetByteArrayElements(env, darr, &isCopy); buffer_desc[0].segment_ptr = dBuffer; buffer_desc[0].segment_length = dbuf_length; rc = mx_irecv(local_endpoint, buffer_desc, 1, match_recv2, match_mask, NULL, &recv_handle); /* wait for it complete */ rc = mx_wait(local_endpoint, & recv_handle, MX_INFINITE, &recv_status, &result); (*env)->SetByteArrayRegion(env,darr,0,dbuf_length,dBuffer); jmethodID setdbuf = (*env)->GetMethodID(env, CL_mpjbuf_Buffer, "setDynamicBuffer", "([B)V"); (*env)->CallVoidMethod(env, buf, setdbuf, darr); } jmethodID set_size = (*env)->GetMethodID(env, CL_mpjbuf_Buffer, "setSize", "(I)V"); (*env)->CallVoidMethod(env, buf, set_size, sbuf_length ); //---------------- this is non-blocking recv ...}/* * Class: xdev_mxdev_MXDevice * Method: nativeIrecv * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IILmpjdev/Status;Lxdev/mxdev/MXRecvRequest;I)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeIrecv (JNIEnv *env, jobject obj1, jobject buf, jobject srcID, jint tag, jint context, jobject status, jobject req, jint any_src) { jboolean isCopy = JNI_TRUE; /* We could move these to init methods */ jclass req_class = (*env)->GetObjectClass(env, req); jfieldID matchrecvhandleID = (*env)->GetFieldID(env, req_class, "matchRecvHandle", "J") ; jfieldID ctrlmsghandleID = (*env)->GetFieldID(env, req_class, "ctrlMsgHandle", "J") ; jfieldID matchrecvmaskhandleID = (*env)->GetFieldID(env, req_class, "matchRecvMaskHandle", "J") ; jfieldID bufferaddresshandleID = (*env)->GetFieldID(env, req_class, "bufferAddressHandle", "J") ; jfieldID reqhandleID = (*env)->GetFieldID(env, req_class, "handle", "J") ; jfieldID lepID = (*env)->GetFieldID(env, req_class, "localEndpointHandle", "J") ; jfieldID bufferhandleID = (*env)->GetFieldID(env, req_class, "bufferHandle", "Lmpjbuf/Buffer;") ; int src_id = ((*env)->GetIntField(env, srcID, processidID )) ; /* static buffer declarations */ char *buffer_address = NULL;
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -