?? main_mpi_ms.c
字號:
"*** [process %d] did received nb rays in the file (%d)\n", rank, size); /*****************************/ /* now, start the real stuff */ /*****************************/ comm_time = 0; if (rank == ROOT) { /****************/ /* ROOT */ /****************/ /***********************************************/ /* set chunk size value ie. nb of rays to send */ /***********************************************/ if (limit > 0) { /* if user specified a memory limit */ /* chunk size if 1/10 of the *estimated* value */ nbreq = limit * 1024 / ((nbprocs - 1) * 10 * ONE_RAY_MEM); } else { if (chunksize == 0) { /* chunk size has not been specified ? */ /* * chunk is 1/10 of the file by default (min * is 1 ray) */ nbreq = size / ((nbprocs - 1) * 10); } else { /* request by the user */ nbreq = chunksize; } } if (nbreq < 1) { nbreq = 1; } if (limit > 0 && chunksize != 0) { fprintf(stdout, "*** [root] memory limit specified ... forcing chunksize to %d.\n", nbreq); } else { fprintf(stdout, "*** [root] setting chunksize to %d.\n", nbreq); } fflush(stdout); /* begin pick chunks and distrib. to slaves */ while (nb_ray_total < size) { /* * wait for a slave to ask for work ; with request * comes the number of rays computed by the slave in * previous round (nb_ray_slave) */ MPI_Irecv(&nb_ray_slave, 1, MPI_INT, MPI_ANY_SOURCE, MPI_MSG_RESULTS, MPI_COMM_WORLD, &req_ask); fprintf(stdout, "*** [root] issued a Irecv\n"); raydata = get_raydata(fdinput, nbreq, &nbread, &nberr); nb_ray_total += (nbread + nberr); fprintf(stdout, "*** [root] read a chunk of %d lines (%d requested) (total %d/%d)\n", nbread, nbreq, nb_ray_total, size); if (!raydata) { fprintf(stderr, "*** raydata=NULL ... aborting !\n"); fflush(stderr); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } fprintf(stdout, "*** [root] wait for any work request\n"); fflush(stdout); MPI_Wait(&req_ask, &status); fprintf(stdout, "*** [root] received work request from [process %d] <-- %d rays computed)\n", status.MPI_SOURCE, nb_ray_slave); nb_ray_computed += nb_ray_slave; if (nbread) { /* * send to slave the number of ray to wait * for */ MPI_Send(&nbread, 1, MPI_INT, status.MPI_SOURCE, MPI_MSG_RAYDATASIZE, MPI_COMM_WORLD); fprintf(stdout, "*** [root] replied to work request of [process %d] by sending chunk size=%d\n", status.MPI_SOURCE, nbread); send_raydata(raydata, nbread, status.MPI_SOURCE); free(raydata); } } /* end while */ fprintf(stdout, "*** [root] input file entirely distributed.\n"); fflush(stdout); fclose(fdinput); nbread = -1; /* TERMination signal */ /* all data is distributed, wait for slaves termination */ for (nbreq = 1; nbreq < nbprocs; nbreq++) { MPI_Recv(&nb_ray_slave, 1, MPI_INT, MPI_ANY_SOURCE, MPI_MSG_RESULTS, MPI_COMM_WORLD, &status); nb_ray_computed += nb_ray_slave; fprintf(stdout, "*** [root] collected last result from [%d]\n", status.MPI_SOURCE); fprintf(stdout, "*** [root] Send TERMINATION signal to [process %d]\n", status.MPI_SOURCE); MPI_Send(&nbread, 1, MPI_INT, status.MPI_SOURCE, MPI_MSG_RAYDATASIZE, MPI_COMM_WORLD); } fprintf(stdout, "*** [root] finally counted %d rays computed by slaves\n", nb_ray_computed); } else { float mem_used_start = 0; /* memory used at the * begin of the * raytracing process */ float mem_used_total = 0; int nb_save; /* nb of times we have dumped * memory to disk */ int nb_ray_stored; /* nb of ray used to dump * memory to disk */ float memsize = 0; /******************/ /* SLAVES */ /******************/ if (limit > 0) { mem_used_start = get_mem_usage(); } /* clean the xml file from previous sections if present */ mesh_remove_data_entry(mesh, SPARSE); mesh_remove_data_entry(mesh, RES); mesh_remove_data_entry(mesh, EVT); if (strchr(output_format, 'r')) { mesh_remove_data_entry(mesh, R2M); } else { mesh_remove_data_entry(mesh, SCO); } /* open the sparse res and evt */ sparse_fd = NULL; res_fd = NULL; event_fd = NULL; change_to_next_files(mesh, celldatafilename, size, &sparse_fd, &res_fd, &event_fd, rank, 0 /* nb_save */ , want_sparse_file); /*************/ /* main loop */ /*************/ nbread = 0; nb_ray_computed_this_round = 0; nb_save = 0; nb_ray_stored = 0; /* start timer */ ray_time_start = MPI_Wtime(); while (1) { /* * say to root i want work, by telling how many rays * were computed in previous round */ fprintf(stdout, "*** [process %d] waiting for rays (nb ray computed=%d)\n", rank, nb_ray_computed_this_round); MPI_Send(&nb_ray_computed_this_round, 1, MPI_INT, ROOT, MPI_MSG_RESULTS, MPI_COMM_WORLD); tmp_time = MPI_Wtime(); /* wait for work (size) */ MPI_Recv(&nbread, 1, MPI_INT, ROOT, MPI_MSG_RAYDATASIZE, MPI_COMM_WORLD, &status); comm_time += MPI_Wtime() - tmp_time; if (nbread < 0) { /* stop the ray_tracing */ fprintf(stdout, "*** [process %d] received no more ray to trace\n", rank); break; } fprintf(stdout, "*** [process %d] received a chunk of %d rays\n", rank, nbread); /* Receive the data */ recv_data = recv_raydata(nbread, rank); fprintf(stdout, "*** [process %d] received data. Begin computation [round %d].\n", rank, round); fflush(stdout); /* do the raytracing threw the mesh */ nb_ray_computed_this_round = nb_ray_computed; bunch_of_ray(recv_data, nbread, 0, &ray_config, &ray_filter, &nb_ray_computed, &nb_ray_rejected, filter_fd, sparse_fd, res_fd, event_fd); nb_ray_computed_this_round = nb_ray_computed - nb_ray_computed_this_round; fprintf(stdout, "*** [process %d] %d rays computed at [round %d] (total so far computed,rejected=%d,%d)\n", rank, nb_ray_computed_this_round, round, nb_ray_computed, nb_ray_rejected); round++; /* shows memory usage */ mem_used_total = get_mem_usage(); memsize = mem_used_total - mem_used_start; fprintf(stdout, "*** [process %d] MEM(pid=%d)[data,total] = %.2fMB/%.2fMB\n", rank, getpid(), memsize, mem_used_total); /* save memory to disk */ /* * this behaviour will suppress the merge phase * between processes */ if ((limit > 0) && (memsize > limit) && (nb_ray_computed != nb_ray_stored)) { /* save to disk */ save_memory_to_disk(output_format, celldatafilename, cell_info, mesh, rank, nbprocs, nb_save); nb_save++; nb_ray_stored = nb_ray_computed; /* next set of files */ change_to_next_files(mesh, celldatafilename, size, &sparse_fd, &res_fd, &event_fd, rank, nb_save, want_sparse_file); } fflush(stdout); } /* save the data remaining in memory to disk */ if (limit > 0 && (nb_ray_computed != nb_ray_stored)) { save_memory_to_disk(output_format, celldatafilename, cell_info, mesh, rank, nbprocs, nb_save); } ray_time_end = MPI_Wtime(); /* no sense for master * process */ /* close sparse, res and evt files */ if (sparse_fd) fclose(sparse_fd); if (res_fd) fclose(res_fd); if (event_fd) fclose(event_fd); /* everyone close its filtered ray file (if filtering used) */ if (filter_fd) fclose(filter_fd); } /*************************/ /* Ray renumbering stuff */ /*************************/ /* collect ray ids from others, and re-index our ray ids */ ray_offset = get_offset_for_ray_renumbering(rank, nbprocs, nb_ray_computed); /* master send to all slaves the total number of rays computed */ if (rank == ROOT) { nb_ray_computed_by_all_slaves = nb_ray_computed; } MPI_Bcast(&nb_ray_computed_by_all_slaves, 1, MPI_INT, ROOT, MPI_COMM_WORLD); if (rank != ROOT) { char *filename; if (limit < 0) { /* all data still in memory */ if (rank != 1) { /* no need to renumber data from process 1 */ mesh_cellinfo_rayids_increment(cell_info, mesh, ray_offset); } /* * update in the sparse and res file ray-renumbering * stuff */ if (want_sparse_file) { filename = mesh->data[SPARSE]->filename[0]; renumber_sparse_file(ray_offset, nb_ray_computed_by_all_slaves, filename); } filename = mesh->data[RES]->filename[0]; renumber_res_file(ray_offset, nb_ray_computed_by_all_slaves, filename); filename = mesh->data[EVT]->filename[0]; renumber_evt_file(ray_offset, nb_ray_computed_by_all_slaves, filename); } else { int i, nb_file; char *r2m_buffer; int r2m_buffer_size; int nb; /* all data flushed to disk */ /* have to be reloaded and renumbered */ nb_file = mesh->data[R2M]->ndatafile; for (i = 0; i < nb_file; i++) { /* no need to renumber data from process 1 */ if (rank != 1) { /* update r2m */ filename = mesh->data[R2M]->filename[i]; fprintf(stdout, "*** [process %d] renumbering / r2m file %s\n", rank, filename); r2m_buffer = load_file_to_memory(filename, &r2m_buffer_size); nb = import_cell_buffer(cell_info, r2m_buffer, r2m_buffer_size, -1, -1, 0, 0); unload_file_from_memory(r2m_buffer, r2m_buffer_size); nb = mesh_cellinfo_rayids_increment(cell_info, mesh, ray_offset); make_domain_info_file(filename, cell_info, mesh, 0, 1, 0); } /* ray-renumbering stuff : */ /* insert the total number of rays computed */ filename = mesh->data[RES]->filename[i]; fprintf(stdout, "*** [process %d] renumbering / res file %s\n", rank, filename); renumber_res_file(ray_offset, nb_ray_computed_by_all_slaves, filename); filename = mesh->data[EVT]->filename[i]; fprintf(stdout, "*** [process %d] renumbering / event file %s\n", rank, filename); renumber_evt_file(ray_offset, nb_ray_computed_by_all_slaves, filename); } } } /****************************************/ /* Exchange domain - ALL TO ALL */ /* only when -L or --limit was NOT used */ /****************************************/ /* new communicator without the root process */ MPI_Comm_group(MPI_COMM_WORLD, &orig_group); MPI_Group_excl(orig_group, 1, root_rank, &new_group); MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm); if (rank != ROOT && limit < 0) { /* Slave */#ifdef RAYTRACING_ONLY fprintf(stdout, "*** [process %d] TIME: comm= %f, total raytracing= %f s \n", rank, comm_time, ray_time_end - ray_time_start);#else /* merge */ merge_times = send_recv_cell_info(new_comm, cell_info, mesh, tmpdir); /* score computing */ date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] computing score (%s)\n", rank, date_stamp); free(date_stamp); /* not usefull to produce r2m files */ /* kept to be able to compare to previous benchmark */ score_time = MPI_Wtime(); compute_score(cell_info, mesh); score_time = MPI_Wtime() - score_time; fprintf(stdout, "*** [process %d] computing TIME (s) comm,ray,merge,score,total = %.2f,%.2f,%.2f,%.2f,%.2f (%d non empty cells)\n", rank, comm_time, ray_time_end - ray_time_start, merge_times[8], score_time, MPI_Wtime() - start_time, count_non_empty_cell_in_cellinfo(cell_info, mesh)); fprintf(stdout, "*** [process %d] merge TIME (s) timers[0..9]= %.2f,%.2f,%.2f,%.2f,%.2f,%.2f,%.2f,%.2f,%.2f,%.2f \n", rank, merge_times[0], merge_times[1], merge_times[2], merge_times[3], merge_times[4], merge_times[5], merge_times[6], merge_times[7], merge_times[8],merge_times[9]); free(merge_times);#endif /* save results */ fprintf(stdout, "*** [process %d] dumps info to file\n", rank); if (strchr(output_format, 'r')) { char *r2m_file; /* r2m */ /* * nbprocs is used but there are only nbprocs-1 * sub-domains */ /* rank-1 is the domain_id in [0,nbprocs-1[ */ r2m_file = construct_filename(celldatafilename, "r2m", rank, 0); fprintf(stdout, "*** [process %d] writing cell data (r2m formated) in %s\n", rank, r2m_file); mesh_add_data_filename(mesh, R2M, r2m_file); make_domain_info_file(r2m_file, cell_info, mesh, rank - 1, nbprocs - 1, rank); free(r2m_file); } else { /* sco */ char *sco_file; sco_file = construct_filename(celldatafilename, "sco", rank, 0); fprintf(stdout, "*** [process %d] writing cell data (sco formated) in %s\n", rank, sco_file); mesh_add_data_filename(mesh, SCO, sco_file); mesh_cellinfo_write_sco(sco_file, cell_info, mesh); free(sco_file); } } /**************************************************************/ /* save the xml enrichied with SCO/R2M/SPARSE/RES sections */ /**************************************************************/ if (rank != ROOT) { char *xml_output; xml_output = (char *) malloc((strlen(celldatafilename) + strlen(".xml") + 1 + MAX_LENGTH_RANK) * sizeof(char)); assert(xml_output); sprintf(xml_output, "%s-%d.xml", celldatafilename, rank); mesh2xml(mesh, xml_output); free(xml_output); } /**********************/ /* This is the end :) */ /**********************/ MPI_Barrier(MPI_COMM_WORLD); date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] applications ends normally (%s)\n", rank, date_stamp); free(date_stamp); free(celldatafilename); free_velocity_model(ray_config.velocity_model); MPI_Finalize(); return (0);}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -