?? main_mpi_ms.c
字號:
/* * Ray2mesh : software for geophysicists. * Compute various scores attached to the mesh cells, based on geometric information that rays bring when they traverse cells. * * Copyright (C) 2003, St閜hane Genaud and Marc Grunberg * * This tool is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */#include <stdio.h>#include <stddef.h> /* for offsetof() */#include <sys/time.h>#include <sys/resource.h>#include <unistd.h>#include <sparse/sparse.h>#include "r2mfile.h"#include "merge.h"#include "util.h" /* get_date_stamp() */#include "save_to_disk.h"#include "memusage.h"#include "renumber.h"#include "filter.h"#define ROOT 0 /* the MPI rank of sender */#define MPI_MSG_DATA 1001#define MPI_MSG_RAYIDOFFSET 1002#define MPI_MSG_RAYDATASIZE 1003#define MPI_MSG_RESULTS 1004#define MPI_MSG_NBRAY 1005#define MAX_LENGTH_RANK 10/** \brief cast raydata_t to MPI data types */MPI_Datatype *convert_to_mpi_type(const struct raydata_t * raydata){ MPI_Datatype *mpi_raydata; int blen[10]; /* length in bytes of each field */ MPI_Datatype oldtypes[10]; /* type of each field */ MPI_Aint indices[10]; /* offset in bytes of each field */ /* now, root and slaves cast data as MPI types for send/reveive */ blen[0] = 1; blen[1] = 1; blen[2] = 1; blen[3] = 1; blen[4] = 1; blen[5] = 1; blen[6] = 1; blen[7] = RAYCODE_MAX_STRING_LENGTH + 1; blen[8] = 1; blen[9] = 1; oldtypes[0] = MPI_LONG; oldtypes[1] = MPI_DOUBLE; oldtypes[2] = MPI_DOUBLE; oldtypes[3] = MPI_DOUBLE; oldtypes[4] = MPI_DOUBLE; oldtypes[5] = MPI_DOUBLE; oldtypes[6] = MPI_DOUBLE; oldtypes[7] = MPI_CHAR; oldtypes[8] = MPI_DOUBLE; oldtypes[9] = MPI_UB; indices[0] = offsetof(struct raydata_t, event_id); indices[1] = offsetof(struct raydata_t, src) + offsetof(struct coord_geo_t, lat); indices[2] = offsetof(struct raydata_t, src) + offsetof(struct coord_geo_t, lon); indices[3] = offsetof(struct raydata_t, src) + offsetof(struct coord_geo_t, prof); indices[4] = offsetof(struct raydata_t, dest) + offsetof(struct coord_geo_t, lat); indices[5] = offsetof(struct raydata_t, dest) + offsetof(struct coord_geo_t, lon); indices[6] = offsetof(struct raydata_t, dest) + offsetof(struct coord_geo_t, prof); indices[7] = offsetof(struct raydata_t, phase); indices[8] = offsetof(struct raydata_t, ray_travel_time); indices[9] = sizeof(struct raydata_t); mpi_raydata = (MPI_Datatype *) malloc(sizeof(MPI_Datatype)); MPI_Type_struct(10, blen, indices, oldtypes, mpi_raydata); MPI_Type_commit(mpi_raydata); return (mpi_raydata);}/** \brief receives a given number of ray data lines from a given MPI process. * * @param size the number of lines * @param rank the MPI process number (just for display) **/struct raydata_t *recv_raydata(int size, int rank){ struct raydata_t *rbuff;/* receive buffer */ MPI_Status status; MPI_Datatype *t; rbuff = (struct raydata_t *) malloc(size * sizeof(struct raydata_t)); if (rbuff == NULL) { MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } t = convert_to_mpi_type(rbuff); MPI_Recv(rbuff, size, *t, ROOT, MPI_MSG_DATA, MPI_COMM_WORLD, &status); MPI_Type_free(t); fprintf(stdout, "*** [process %d] did receive %d rays\n", rank, size); return (rbuff);}/** \brief sends a given number of ray data lines to a given MPI process. * * @param raydata the data buffer (structured) * @param size the number of lines * @param destproc the MPI process number * **/void send_raydata(struct raydata_t * raydata, const int size, const int destproc){ MPI_Datatype *t; t = convert_to_mpi_type(raydata); MPI_Send(raydata, size, *t, destproc, MPI_MSG_DATA, MPI_COMM_WORLD); fprintf(stdout, "*** [root] issued Send %d rays to [process %d]\n", size, destproc); MPI_Type_free(t);}void flush_output(int signal){ fprintf(stderr, "*** Stopped by signal %d.\n", signal); fprintf(stdout, "*** Stopped by signal %d.\n", signal); fflush(stdout); fflush(stderr);}/************************************************************//* renumbering stuff *//************************************************************//** \brief get the offset to apply to dataset * * @param rank the process rank * @param nbprocs the number of processes involved * @param rays total number of rays computed by the slave * */int get_offset_for_ray_renumbering(const int rank, const int nbprocs, const int rays){ MPI_Status status; int i; int offset = 0; int *nb_ray_slave; /* * if i am the master, collects the number of rays computed by each * slave */ if (rank == ROOT) { int prefix_sum; nb_ray_slave = (int *) malloc((nbprocs - 1) * sizeof(int)); assert(nb_ray_slave); fprintf(stdout, "*** [root] ray re-numbering.\n"); for (i = 1; i < nbprocs; i++) { MPI_Recv(&nb_ray_slave[i - 1], 1, MPI_INT, i, MPI_MSG_NBRAY, MPI_COMM_WORLD, &status); fprintf(stdout, "*** [root] re-numbering : receive from [process %d] <--%d\n", i, nb_ray_slave[i - 1]); } prefix_sum = nb_ray_slave[0]; for (i = 2; i < nbprocs; i++) { fprintf(stdout, "*** [root] re-numbering : send to [process %d] -->%d\n", i, prefix_sum); MPI_Send(&prefix_sum, 1, MPI_INT, i, MPI_MSG_RAYIDOFFSET, MPI_COMM_WORLD); prefix_sum += nb_ray_slave[i - 1]; } free(nb_ray_slave); return (0); } else { /* SLAVE */ /* * send the number of ray i computed and wait for an offset * to re-index my ids */ int val = rays; MPI_Send(&val, 1, MPI_INT, ROOT, MPI_MSG_NBRAY, MPI_COMM_WORLD); if (rank > 1) { /* for process of rank 1, offset=0 */ MPI_Recv(&offset, 1, MPI_INT, ROOT, MPI_MSG_RAYIDOFFSET, MPI_COMM_WORLD, &status); } fprintf(stdout, "*** [process %d] have to renumber rays with offset %d\n", rank, offset); return (offset); }}/************************************************************//* main *//************************************************************/int main(int argc, char **argv){ /* return the nb of ray computed */ struct ray_config_t ray_config; struct ray_filter_t ray_filter; char *meshfile = NULL; char *inputdatafile = NULL; char *rayfilter_filename = NULL; char *tmpdir = NULL; int chunksize = 0; /* size of ray data block sent by * root process */ float limit = -1; int nb_ray_computed = 0; int nb_ray_rejected = 0; int nb_ray_total = 0; int nb_ray_computed_this_round = 0; int nb_ray_computed_by_all_slaves = 0; int nbreq; int nbread = 0; int round = 0; /* round of computation for slave */ int nberr = 0; /* nb of line with error */ /* res & sparse files */ FILE *sparse_fd; FILE *res_fd; FILE *event_fd; FILE *filter_fd = NULL; /* renumbering */ int ray_offset; int want_sparse_file = 0; /* set to FALSE in // */ /*---------------- MPI related --------------------*/ int nbprocs;/* number of processes in the group */ int size; /* offset of remainder of raydata not treated */ int xml_buffer_size; /* number of bytes of the xml * file */ char *xml_buffer = NULL; struct raydata_t *recv_data; /* transmitted ray data buffer */ struct raydata_t *raydata; double start_time, tmp_time, comm_time, *merge_times, score_time, ray_time_start = 0, ray_time_end = 0; char *date_stamp; int nb_ray_slave; int root_rank[1] = {0}; MPI_Request req_ask; MPI_Status status; MPI_Group orig_group, new_group; MPI_Comm new_comm; /* catch Ctrl-C signal */ signal(SIGINT, emergency_halt); signal(SIGUSR2, flush_output); /* MPI initialization */ MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &nbprocs); /* cmd line */ memset(&ray_filter, 0, sizeof(struct ray_filter_t)); parse_command_line(argc, argv, &ray_config, &ray_filter, &meshfile, &celldatafilename, &inputdatafile, &rayfilter_filename, &limit, &output_format, &tmpdir, &chunksize);#ifdef DEBUG if (rank == ROOT) { fprintf(stderr, "%s:main():%d command line parsing done.\n", __FILE__, __LINE__); }#endif if (limit > 0 && rank == ROOT) { if (strchr(output_format, 's')) { fprintf(stderr, "[ROOT] can not use *sco* output format with -L option\n"); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } fprintf(stderr, "[ROOT] info, limit memory usage to %.1fM\n", limit); } /* checks if the basename for files is provided */ if (!celldatafilename) { fprintf(stderr, "No output file specified. Exiting.\n"); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } start_time = MPI_Wtime(); /*************************************************************/ /* first check that files are accessible (on root side only) */ /*************************************************************/ if (rank == ROOT) { /********/ /* ROOT */ /********/ fprintf(stdout, "*** [root] MPI process started (MPI_Init).\n"); fprintf(stdout, "*** [root] %s running on %d processors.\n", PACKAGE, nbprocs); ray2mesh_info(&ray_config); check_file_access(meshfile); check_file_access(inputdatafile);#ifdef RAYTRACING_ONLY fprintf(stdout, "%s : only ray-tracing executed (compiled with -DRAYTRACING_ONLY)\n", PACKAGE);#endif#ifdef DEBUG fprintf(stderr, "%s:main():%d [root] after check access .\n", __FILE__, __LINE__);#endif /* init of the mesh->parameters struct */ if ((xml_buffer = load_file_to_memory(meshfile, &xml_buffer_size)) == NULL) { fprintf(stderr, "%s : could not load %s to memory (%d bytes). Exiting.\n", PACKAGE, meshfile, xml_buffer_size); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } /* open the file containing the rays */ if ((fdinput = fopen(inputdatafile, "r")) == NULL) { fprintf(stderr, "Can not open input datafile '%s' for reading. Exiting.\n", inputdatafile); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } } else { /**********/ /* SLAVES */ /**********/ date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] MPI process started (MPI_Init) as process %d (%s)\n", rank, getpid(), date_stamp); free(date_stamp); /* open the filter file if needed */ if (rayfilter_filename) { char *filename; char *ext = "sei"; filename = (char *) malloc(sizeof(char) * ( strlen(rayfilter_filename) + strlen("-p") + RANK_SIZE + strlen(".") + strlen("sei") + 1)); assert(filename); sprintf(filename, "%s-p%.3d.%s", rayfilter_filename, rank, ext); if ((filter_fd = fopen(filename, "w")) == NULL) { fprintf(stderr, "Cannot open %s for writing ray filtered.\n", filename); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } free(filename); } } /* * root tells all processes (itself as well) what size is the * xml_buffer */ MPI_Bcast(&xml_buffer_size, 1, MPI_INT, ROOT, MPI_COMM_WORLD); fprintf(stdout, "*** [process %d] received xml_buffer_size=%d bytes\n", rank, xml_buffer_size); /* xml buffer alloc for slave */ if (rank != ROOT) { xml_buffer = (char *) malloc(xml_buffer_size * sizeof(char)); if (xml_buffer == NULL) { MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } } /* root send all processes (also to itself) the xml_buffer */ MPI_Bcast(xml_buffer, xml_buffer_size, MPI_CHAR, ROOT, MPI_COMM_WORLD); fprintf(stdout, "*** [process %d] did received xml_buffer\n", rank); /* Timing */ fprintf(stdout, "*** [process %d] TIME: mesh(xml) send/receive operations= %f s.\n", rank, MPI_Wtime() - start_time); /* everyone parses the xml_buffer string */ if (!(mesh = mesh_init_from_memory(xml_buffer, xml_buffer_size, meshfile))) { fprintf(stderr, "[process %d] could not alloc mesh (mesh_init_from_memory failed)\n", rank); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } /* everyone frees the xml_buffer since it has been parsed now */ if (rank == ROOT) { unload_file_from_memory(xml_buffer, xml_buffer_size); /* * Only ROOT has the raydata input file, how many rays to * trace ? */ fprintf(stdout, "*** [root] counting how many rays to trace\n"); size = get_number_of_lines(fdinput); } else { free(xml_buffer); } fflush(stdout); /* root send the nb of line in the ray file */ MPI_Bcast(&size, 1, MPI_INT, ROOT, MPI_COMM_WORLD); fprintf(stdout,
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -