?? cdemodp.c
字號:
state = GET_RECORD; /* get some more input records*/ /* FALLTHROUGH */ } case GET_RECORD: { assert(lastoff < ctlp->nrow_ctl); /* array bounds check*/ recp = (text *)(ctlp->inbuf_ctl + (lastoff * sessp->maxreclen_sess)); if (fgets((char *)recp, (int)sessp->maxreclen_sess, inputfp) != (char *)NULL) { /* set column array offset to input record number map */ ctlp->otor_ctl[lastoff] = ++input_recnum; /* if ((input_recnum % 10000) == 0) fprintf(output_fp, "record number: %d\n", (int)input_recnum); */ state = FIELD_SET; /* FALLTHROUGH */ } else { if (lastoff) lastoff--; state = END_OF_INPUT; break; } } case FIELD_SET: { /* map input data fields to DB columns, set column array entries */ fsetrv = field_set(ctlp, tblp, (struct obj *) 0, recp, lastoff, 0); rowCnt = lastoff + 1; if (rowCnt == ctlp->nrow_ctl || fsetrv != FIELD_SET_COMPLETE) { /* array is full, or have a large partial column, or the * secondary buffer is in use by an OUTOFLINE field. */ state = DO_CONVERT; /* FALLTHROUGH */ } else { lastoff++; /* use next column array slot*/ state = GET_RECORD; /* get next record*/ break; } } case DO_CONVERT: { /* Either one of the following is true: * - the column array is full * - there is a large partial column * - the secondary buffer used by field_set() is in use * - previous conversion returned CONVERT_CONTINUE and * now the conversion is being resumed. * * In any case, convert and load the data. */ ub4 cvtBadRoff; /* bad row offset from conversion*/ ub2 cvtBadCoff; /* bad column offset from conversion*/ while (startoff <= lastoff) { ub4 cvtCntPerCall = 0; /* rows converted in one call to do_convert*/ /* note that each call to do_convert() will convert all contiguousrows * in the colarray until it hit a row in error while converting. */ cvtrv = do_convert(ctlp, startoff, rowCnt, &cvtCntPerCall, &cvtBadCoff); cvtCnt += cvtCntPerCall; /* sum of rows converted so far in colarray*/ if (cvtrv == CONVERT_SUCCESS) { /* One or more rows converted successfully, break * out of the conversion loop and load the rows. */ assert(cvtCntPerCall > 0); state = DO_LOAD; break; } else if (cvtrv == CONVERT_ERROR) { /* Conversion error. Reject the bad record and * continue on with the next record (if any). * cvtBadRoff is the 0-based index of the bad row in * the column array. cvtBadCoff is the 0-based index * of the bad column (of the bad row) in the column * array. */ assert(cvtCntPerCall >= 0); cvtBadRoff = startoff + cvtCntPerCall; err_recnum = ctlp->otor_ctl[cvtBadRoff]; /* map to input_recnum*/ fprintf(output_fp, "Conversion Error on record %d, column %d\n", (int)err_recnum, (int)cvtBadCoff + 1); /* print err msg txt */ errprint((dvoid *)(ctlp->errhp_ctl), OCI_HTYPE_ERROR, (sb4 *)0); /* Check to see if the conversion error occurred on a * continuation of a partially loaded row. * If so, either (a) flush the partial row from the server, or * (b) mark the column as being 0 length and complete. * In the latter case (b), any data already loaded into the column * from a previous LoadStream call remains, and we can continue * field setting, conversion and loading with the next column. * Here, we implement (a), and flush the row from the server. */ if (err_recnum == load_recnum) { /* Conversion error occurred on record which has been * partially loaded (by a previous stream). * XXX May be better to have an attribute of the direct path * XXX context which indicates that the last row loaded was * XXX partial. * * Flush the output pipe. Note that on conversion error, * no part of the row data for the row in error makes it * into the stream buffer. * Here we flush the partial row from the server. The * stream state is reset if no rows are successfully * converted. */ /* flush partial row from server */ (void) OCIDirPathFlushRow(ctlp->dpctx_ctl, ctlp->errhp_ctl); } if (cvtBadRoff == lastoff) { /* Conversion error occurred on the last populated slot * of the column array. * Flush the input stream of any data for this row, * and re-use this slot for another input record. */ field_flush(ctlp, lastoff); state = GET_RECORD; startoff = cvtBadRoff; /* only convert the last row*/ rowCnt = 0; /* already tried converting all rows in col array*/ assert(startoff <= lastoff); break; } else { /* Skip over bad row and continue conversion with next row. * We don't attempt to fill in this slot with another record. */ startoff = cvtBadRoff + 1; assert(startoff <= lastoff); continue; } } else if (cvtrv == CONVERT_NEED_DATA) /* partial col encountered*/ { /* Partial (large) column encountered, load the piece * and loop back up to field_set to get the rest of * the partial column. * startoff is set to the offset into the column array where * we need to resume conversion from, which should be the * last entry that we set (lastoff). */ state = DO_LOAD; /* Set our row position in column array to resume * conversion at when DO_LOAD transitions to DO_CONVERT. */ assert(cvtCntPerCall >= 0); startoff = startoff + cvtCntPerCall;/* assert(startoff == lastoff); */ break; } else if (cvtrv == CONVERT_CONTINUE) { /* The stream buffer is full and there is more data in * the column array which needs to be converted. * Load the stream (DO_LOAD) and transition back to * DO_CONVERT to convert the remainder of the column array, * without calling the field setting function in between. * The sequence {DO_CONVERT, DO_LOAD} may occur many times * for a long row or column. * Note that startoff becomes the offset into the column array * where we need to resume conversion from. */ cvtcontcnt++; state = DO_LOAD; /* Set our row position in column array (startoff) to * resume conversion at when we transition from the * DO_LOAD state back to DO_CONVERT. */ assert(cvtCntPerCall >= 0); startoff = startoff + cvtCntPerCall; assert(startoff <= lastoff); break; } } /* end while*/ break; } case DO_LOAD: { ub4 loadCnt; /* count of rows loaded by do_load*/ ldrv = do_load(ctlp, &loadCnt); nxtLoadOff = nxtLoadOff + loadCnt; switch (ldrv) { case LOAD_SUCCESS: { /* The stream has been loaded successfully. What we do next * depends on the result of the previous conversion step. */ load_recnum = ctlp->otor_ctl[nxtLoadOff - 1]; if (cvtrv == CONVERT_SUCCESS || cvtrv == CONVERT_ERROR) { /* The column array was successfully converted (or the * last row was in error). * Fill up another array with more input records. */ state = RESET; } else if (cvtrv == CONVERT_CONTINUE) { /* There is more data in column array to convert and load. */ state = DO_CONVERT; /* Note that when do_convert returns CONVERT_CONTINUE that * startoff was set to the row offset into the column array * of where to resume conversion. The loadCnt returned by * OCIDirPathLoadStream is the number of rows successfully * loaded. * Do a sanity check on the attributes here. */ if (startoff != nxtLoadOff) /* sanity*/ fprintf(output_fp, "LOAD_SUCCESS/CONVERT_CONTINUE: %ld:%ld\n", (long)nxtLoadOff, startoff); /* Reset the direct stream state so conversion starts at * the beginning of the stream. */ (void) OCIDirPathStreamReset(ctlp->dpstr_ctl, ctlp->errhp_ctl); } else { /* Note that if the previous conversion step returned * CONVERT_NEED_DATA then the load step would have returned * LOAD_NEED_DATA too (not LOAD_SUCCESS). */ FATAL("DO_LOAD:LOAD_SUCCESS: unexpected cvtrv", cvtrv); } break; } case LOAD_ERROR: { sb4 oraerr; ub4 badRowOff; badRowOff = nxtLoadOff; nxtLoadOff += 1; /* account for bad row*/ err_recnum = ctlp->otor_ctl[badRowOff]; /* map to input_recnum*/ fprintf(output_fp, "Error on record %ld\n", (long)err_recnum); /* print err msg txt */ errprint((dvoid *)(ctlp->errhp_ctl), OCI_HTYPE_ERROR, &oraerr); /* On a load error, all rows up to the row in error are loaded. * account for that here by setting load_recnum only when some * rows have been loaded. */ if (loadCnt != 0) load_recnum = err_recnum - 1; if (oraerr == OER(600)) FATAL("DO_LOAD:LOAD_ERROR: server internal error", oraerr); if (err_recnum == input_recnum) { /* Error occurred on last input row, which may or may not * be in a partial state. Flush any remaining input for * the bad row. */ field_flush(ctlp, badRowOff); } if (err_recnum == load_recnum) { /* Server has part of this row already, flush it */ (void) OCIDirPathFlushRow(ctlp->dpctx_ctl, ctlp->errhp_ctl); } if (badRowOff == lastoff) { /* Error occurred on the last entry in the column array, * go process more input records and set up another array. */ state = RESET; } else { /* Otherwise, continue loading this stream. Note that the * stream positions itself to the next row on error. */ state = DO_LOAD; } break; } case LOAD_NEED_DATA: { load_recnum = ctlp->otor_ctl[nxtLoadOff]; if (cvtrv == CONVERT_NEED_DATA) state = FIELD_SET; /* need more input data*/ else if (cvtrv == CONVERT_CONTINUE) state = DO_CONVERT; /* have input data, continue with conversion*/ else FATAL("DO_LOAD:LOAD_NEED_DATA: unexpected cvtrv", cvtrv); /* Reset the direct stream state so conversion starts at * the beginning of the stream. */ (void) OCIDirPathStreamReset(ctlp->dpstr_ctl, ctlp->errhp_ctl); break; } case LOAD_NO_DATA: { /* Attempt to either load an empty stream, or a stream * which has been completely processed. */ if (cvtrv == CONVERT_CONTINUE) { /* Reset stream state so we convert into an empty stream buffer.*/ (void) OCIDirPathStreamReset(ctlp->dpstr_ctl, ctlp->errhp_ctl); state = DO_CONVERT; /* convert remainder of column array*/ } else state = RESET; /* get some more input records*/ break; } default:
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -