?? xdev_mxdev_mxdevice.c
字號:
jobject staticBuffer; jbyteArray directbuffer; //int * sbuf_length; int capacity ; /* dynamic buffer declarations */ jbyteArray darr; jbyte* dBuffer; /* MX related declarations */ mx_return_t rc ; mx_request_t recv_handle ; mx_segment_t buffer_desc[1]; uint64_t match_recv, match_recv2, match_mask , proc_mask, tag_mask ; mx_status_t recv_status; uint32_t result; if(tag == ANY_TAG) { tag_mask = EMPTY ; } else { tag_mask = MASK_TAG ; } if(any_src == 1) { src_id = 0 ; proc_mask = EMPTY; } else { proc_mask = MASK_SRC ; match_mask = MATCH_CONTEXT | proc_mask | tag_mask ; if(tag == ANY_TAG) { match_recv = PRI_MATCH(context, src_id, 0); match_recv2 = SEC_MATCH(context, src_id, 0); } else { match_recv = PRI_MATCH(context, src_id, tag); match_recv2 = SEC_MATCH(context, src_id, tag); } //printf("src_id <%d> \n", src_id ); //printf("context <%d> \n", context); //printf("match_mask U32 <%x> \n",MX_U32(match_mask)); //printf("match_recv U32 <%x> \n",MX_U32(match_recv)); //printf("match_mask L32 <%x> \n",MX_L32(match_mask)); //printf("match_recv L32 <%x> \n",MX_L32(match_recv)); //fflush(stdout); /* get static buffer related stuff */ staticBuffer = (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer); directbuffer = (jbyteArray) (*env)->GetObjectField(env, staticBuffer, FID_mpjbuf_NIOBuffer_buffer); buffer_address = (char *)(*env)->GetDirectBufferAddress(env, (jobject)directbuffer); capacity = (unsigned int) (*env)->GetIntField(env,buf,FID_mpjbuf_Buffer_capacity); /* compose message ,sort out tag/context*/ buffer_desc[0].segment_ptr = buffer_address; buffer_desc[0].segment_length = capacity+8 ;//+offset .. //JRequestStruct *jreq = NULL; //jreq = (JRequestStruct *) malloc(sizeof(JRequestStruct)) ; //jobject globalRequestObject = (*env)->NewGlobalRef(env, req); //jreq->jrequest = globalRequestObject ; /* recv the message */ rc = mx_irecv(local_endpoint, buffer_desc, 1, match_recv, match_mask, NULL, &recv_handle); //jreq instead of NULL if(rc != MX_SUCCESS) { printf("error in irecv"); fflush(stdout); } (*env)->SetLongField(env,req,reqhandleID,(jlong)recv_handle); (*env)->SetLongField(env,req,lepID,(jlong)local_endpoint); (*env)->SetObjectField(env,req,bufferhandleID,buf); (*env)->SetLongField(env,req,matchrecvhandleID,(jlong)match_recv2); (*env)->SetLongField(env,req,matchrecvmaskhandleID,(jlong)match_mask); (*env)->SetLongField(env,req,bufferaddresshandleID,(jlong)buffer_address); }/* * Class: xdev_mxdev_MXDevice * Method: nativeIprobe * Signature: (Lxdev/ProcessID;IILmpjdev/Status;II)I */JNIEXPORT jint JNICALL Java_xdev_mxdev_MXDevice_nativeIprobe (JNIEnv *env, jobject jthis, jobject srcID, jint tag, jint context, jobject status, jint any_src, jint isCompleted ) { //printf("native probe method \n"); mx_return_t rc; mx_status_t c_status; uint64_t result = EMPTY ; uint64_t match_recv, proc_mask, tag_mask, match_mask ; int src_id = ((*env)->GetIntField(env, srcID, processidID )) ; if(tag == ANY_TAG) { tag_mask = EMPTY ; } else { tag_mask = MASK_TAG ; } if(any_src == 1) { src_id = 0 ; proc_mask = EMPTY; } else { proc_mask = MASK_SRC ; } match_mask = MATCH_CONTEXT | proc_mask | tag_mask ; if(tag == ANY_TAG) { match_recv = PRI_MATCH(context, src_id, 0); } else { match_recv = PRI_MATCH(context, src_id, tag); } //printf(" result upper (b)<%d> \n", MX_U32(result)); //printf(" result lower (b)<%d> \n", MX_L32(result)); rc = mx_iprobe(local_endpoint, match_recv, match_mask, &c_status, &result); //printf(" result upper (a)<%d> \n", MX_U32(result)); //printf(" result lower (a)<%d> \n", MX_L32(result)); if(result == EMPTY) { //printf(" aint any message "); fflush(stdout); } else { //printf(" yes message "); fflush(stdout); isCompleted = 1; (*env)->SetIntField(env,status, status_src_ID, GET_SRC(c_status.match_info) ); (*env)->SetIntField(env,status, status_tag_ID, GET_TAG(c_status.match_info) ); (*env)->SetIntField(env,status, countInBytesID , c_status.msg_length - 16); //-8 is offset ... } return isCompleted ; }/* * Class: xdev_mxdev_MXDevice * Method: nativeProbe * Signature: (Lxdev/ProcessID;IILmpjdev/Status;I)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeProbe (JNIEnv *env, jobject jthis, jobject srcID, jint tag, jint context, jobject status, jint any_src) { //printf("native iprobe method \n"); mx_return_t rc; mx_status_t c_status; uint64_t result ; uint64_t match_recv, proc_mask, tag_mask, match_mask ; int src_id = ((*env)->GetIntField(env, srcID, processidID )) ; if(tag == ANY_TAG) { tag_mask = EMPTY ; } else { tag_mask = MASK_TAG ; } if(any_src == 1) { src_id = 0 ; proc_mask = EMPTY; } else { proc_mask = MASK_SRC ; } match_mask = MATCH_CONTEXT | proc_mask | tag_mask ; if(tag == ANY_TAG) { match_recv = PRI_MATCH(context, src_id, 0); //match_recv2 = SEC_MATCH(context, src_id, 0); } else { match_recv = PRI_MATCH(context, src_id, tag); //match_recv2 = SEC_MATCH(context, src_id, tag); } rc = mx_probe(local_endpoint, MX_INFINITE, match_recv, match_mask, &c_status, &result); (*env)->SetIntField(env,status, status_src_ID, GET_SRC(c_status.match_info) ); (*env)->SetIntField(env,status, status_tag_ID, GET_TAG(c_status.match_info) ); (*env)->SetIntField(env,status, countInBytesID , c_status.msg_length - 16); //-8 is offset ...} /* * Class: xdev_mxdev_MXDevice * Method: nativePeek * Signature: (Lmpjdev/Status;)J */JNIEXPORT jlong JNICALL Java_xdev_mxdev_MXDevice_nativePeek (JNIEnv *env, jclass jthis, jobject status) { //printf(" nativePeek \n");fflush(stdout); mx_request_t peekedRequest ; mx_return_t rc ; mx_status_t status ; uint32_t result ; //JRequestStruct *jrequest ; //jobject javaRequest, statusInRequest ; rc = mx_peek(local_endpoint, MX_INFINITE, &peekedRequest, &result); if(rc != MX_SUCCESS) { printf(" error while peeking the message \n"); } if(result) { //printf(" message peeked successfully \n"); fflush(stdout); } /* commented because of hashtable approach ... rc = mx_wait( local_endpoint, &peekedRequest, MX_INFINITE, &status, &result); jrequest = (JRequestStruct *) status.context ; javaRequest = jrequest->jrequest ; statusInRequest = (*env)->GetObjectField( env, javaRequest, FID_xdev_mxdev_MXRequest_status ); //.. save javaRequest in Request's long field .. (*env)->SetLongField(env,javaRequest, FID_xdev_mxdev_MXRequest_requestStruct, (jlong)jrequest); //printf(" returning from native peek \n"); fflush(stdout); //jobject copiedJavaRequest = javaRequest ; //(*env)->DeleteGlobalRef(env, javaRequest); (*env)->SetIntField(env,statusInRequest, status_src_ID, GET_SRC(status.match_info) ); (*env)->SetIntField(env,statusInRequest, status_tag_ID, GET_TAG(status.match_info) ); (*env)->SetIntField(env,statusInRequest, countInBytesID, status.msg_length - 8); printf(" length <%d> \n ",status.msg_length) ; fflush(stdout); return javaRequest ; */ // .. returnthe address of request struct .. return peekedRequest ; }/* * Class: xdev_mxdev_MXDevice * Method: deletePeekedRequest * Signature: (Lxdev/mxdev/MXRequest;J)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_deletePeekedRequest (JNIEnv *env, jclass jthis, jobject peekedRequest, jlong requestStruct) { (*env)->DeleteGlobalRef(env, peekedRequest); //JRequestStruct *jrequest = (JRequestStruct *) requestStruct ; //free(jrequest); }/* * Class: xdev_mxdev_MXDevice * Method: nativePeek * Signature: (Lmpjdev/Status;)VJNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativePeek (JNIEnv *env, jobject jthis, jobject requestObject) { printf(" nativePeek \n"); mx_request_t request ; mx_return_t rc ; uint32_t result ; rc = mx_peek( local_endpoint, MX_INFINITE, &request, &result); if(rc == MX_SUCCESS) { //printf(" mx_peek successful \n"); fflush(stdout); } if(result) { printf(" a message has been peeked in nativeIwaitany \n"); fflush(stdout); } // complete comms ... mx_status_t recv_status ; rc = mx_wait( local_endpoint, &request, MX_INFINITE, &recv_status, &result); JRequestStruct *jrequest = (JRequestStruct *) recv_status.context ; jobject javaRequest = jrequest->jrequest ; printf("1 \n"); fflush(stdout); jclass req_class = (*env)->GetObjectClass(env, javaRequest ); //.. get the comms .. if(rc != MX_SUCCESS) { printf(" error while calling mx_wait \n"); } //mx_status_t recv_status; mx_request_t reqhandle ; mx_endpoint_t mlep ; jobject buffer ; uint64_t match_recv; uint64_t match_mask; char * buffer_address; mx_segment_t buffer_desc[1]; reqhandle = (mx_request_t) ((*env)->GetLongField(env, javaRequest, reqhandleID )) ; mx_request_t dreq ; mlep = (mx_endpoint_t) ((*env)->GetLongField(env, javaRequest, m_local_endpoint )); printf("3 \n"); fflush(stdout); match_recv = (uint64_t) ((*env)->GetLongField(env, javaRequest, matchrecvhandleID)) ; match_mask = (uint64_t) ((*env)->GetLongField(env, javaRequest, matchrecvmaskhandleID)) ; buffer_address = (char *) ((*env)->GetLongField(env, javaRequest, bufferaddresshandleID)) ; buffer = ((*env)->GetObjectField(env, javaRequest, bufferhandleID )) ; jclass mpjbuf_class = (*env)->GetObjectClass(env, buffer); jbyteArray darr; jbyte* dBuffer; jboolean isCopy = JNI_TRUE; char encoding = 1; int dbuf_length ; //.. dbuf_length .. encoding = buffer_address[0] ; dbuf_length = (((int)(unsigned char) buffer_address[4]) << 24) | (((int)(unsigned char) buffer_address[5]) << 16) | (((int)(unsigned char) buffer_address[6]) << 8) | (((int)(unsigned char) buffer_address[7]) ); printf("dbuf_length [after strange] <%d> \n",dbuf_length); fflush(stdout); //int dbuf_length = byte22int(buffer_address,0); int sbuf_length = recv_status.msg_length - 8 ; //- offset if(dbuf_length > 0) { darr = (*env)->NewByteArray (env, dbuf_length); dBuffer = (*env)->GetByteArrayElements(env, darr, &isCopy); buffer_desc[0].segment_ptr = dBuffer; buffer_desc[0].segment_length = dbuf_length; //printf(" calling mx_irecv in nativeIwait \n"); fflush(stdout); rc = mx_irecv(mlep, buffer_desc, 1, match_recv, match_mask, NULL, &dreq); if(rc != MX_SUCCESS) { printf(" return code is not successful \n"); fflush(stdout); } rc = mx_wait(mlep, & dreq, MX_INFINITE, &recv_status, &result); if(rc != MX_SUCCESS) { printf(" return code is not successful \n"); fflush(stdout); } //printf(" received \n"); fflush(stdout); (*env)->SetByteArrayRegion(env,darr,0,dbuf_length,dBuffer); jmethodID setdbuf = (*env)->GetMethodID(env, mpjbuf_class, "setDynamicBuffer", "([B)V"); (*env)->CallVoidMethod(env, buffer, setdbuf, darr); } else { //printf(" no dynamic message \n") ; } jmethodID set_size = (*env)->GetMethodID(env, mpjbuf_class, "setSize", "(I)V"); (*env)->CallVoidMethod(env, buffer, set_size, sbuf_length ); //printf("recvRequest wait returns \n"); (*env)->SetIntField(env,javaRequest, testcalledID, 1); //printf("recv:req calling wait for control message \n"); fflush(stdout); //printf("status.msg_length <%d> \n", recv_status.msg_length ); //(*env)->SetIntField(env,status, status_src_ID, // GET_SRC(recv_status.match_info) ); //(*env)->SetIntField(env,status, status_tag_ID, // GET_TAG(recv_status.match_info) ); //(*env)->SetIntField(env,status, countInBytesID, // recv_status.msg_length - 16);//-16 //.. extract JRequest .. //.. figure out if it is send or recv request .. //if(recvrequest) { //.. copy paste code from iwait of recvrequest .. //} //if(sendrequest) { //.. copy paste code from iwait of sendrequest .. //}}*//* * Class: xdev_mxdev_MXDevice * Method: nativeFinish * Signature: ()V */ JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeFinish (JNIEnv *env, jobject obj1) { //printf("native:finish process <%d> starting finish \n",myRank ); //fflush(stdout); //sleep(5); mx_return_t rc; //printf("native:finish process <%d> will close endpoint \n",myRank ); //fflush(stdout); //fflush(stdout); rc = mx_close_endpoint(local_endpoint); if(rc != MX_SUCCESS) { //printf("error in nativeFinish "); fflush(stdout); } //printf("native:finish process <%d> closed endpoint \n",myRank ); //fflush(stdout); //fflush(stdout); free(peer_endpoints); peer_endpoints = NULL; if(rc == MX_SUCCESS) { //printf("closed endpoint \n"); } //printf("native:finish process <%d> calling finalize \n",myRank ) ; //fflush(stdout); rc = mx_finalize(); if(rc == MX_SUCCESS) { //printf("native:finish process <%d> called finalize \n", myRank ); //fflush(stdout); }}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -