?? namecachemain.c
字號:
pdnp = NULL; /* so it won't be freed below (now the packet is referenced by the pending_q rbt...) */ } else { /* do nothing, maybe it's an unnecessary retry for timeout that is already being taken care of */ } } else { replyERR(pptp->pdnsio, pdm, pdnp, RCODE_SERVER_FAILURE); } pthread_mutex_unlock(&pptp->mutex); /* ///// UNLOCK ///// */ } } else { /* it's a response */ void *iter = NULL; raw_qa *pending_qa = NULL; rbt_StatusEnum rbt_status; /* see if there is a pending query matching it */ pthread_mutex_lock(&pptp->mutex); /* \\\\\\ LOCK \\\\\\ */#ifdef VERBOSE_DEBUG KadC_log("pending_q_rbt contains %d entries\n", rbt_size(pptp->pending_q_rbt));#endif rbt_status = rbt_find(pptp->pending_q_rbt, pdnp, &iter); if(rbt_status != RBT_STATUS_OK) { /* if not, discard the response (pdnp will be destroyed below) */ } else { /* if yes retrieve the pending query... */ pending_qa = rbt_value(iter); /* ...and remove it from the pending rbt */ rbt_status = rbt_erase(pptp->pending_q_rbt, iter); assert(rbt_status == RBT_STATUS_OK); /* copy retrieved ID, IP, port and fd into pdnp */ pdnp->buf[0] = pending_qa->q->buf[0]; pdnp->buf[1] = pending_qa->q->buf[1]; pdnp->remoteip = pending_qa->q->remoteip; pdnp->remoteport = pending_qa->q->remoteport; pdnp->fd = pending_qa->q->fd; /* forward pdnp as response to the original requestor */ if(pdnp->remoteip != 0) /* if question DNSpacket was not a pseudo, used for predictive caching */ DNSreply(pptp->pdnsio, pdnp); /* add the reply to the retrieved qa (whose a was NULL) */ pending_qa->a = pdnp; pdnp = NULL; /* prevent its deallocation, as it's referenced by pending_qa */ /* if positive result and there is at least one answer, try to place the result in the cache */ if(pdm->rcode == RCODE_NO_ERROR && pdm->answer_rr != NULL && pdm->answer_rr[0] != NULL) { time_t now = time(NULL); time_t answttl = pdm->answer_rr[0]->ttl; /* use TTL of first answer (if present) */ time_t texp = now + answttl;#ifdef DEBUG KadC_log("Caching answ for qt=%d on %s; TTL %d s, exp %s", pdm->questions[0]->type, pdm->questions[0]->name, answttl, ctime(&texp));#endif /* see if we are replacing an existing entry */ rbt_status = rbt_find(pptp->cache_q_rbt, pending_qa->q, &iter); pending_qa->expiry = texp; pending_qa->last_accessed = now; if(rbt_status == RBT_STATUS_OK) { raw_qa *pqa = rbt_value(iter); rbt_status = rbt_erase(pptp->cache_q_rbt, iter); assert(rbt_status == RBT_STATUS_OK); raw_qa_destroy(pqa);#ifdef DEBUG KadC_log("...after replacing existing record for predictive caching refresh\n");#endif /* the replacement will appear as NOT recently accessed. This prevents replacement loops */ pending_qa->last_accessed = 0; } rbt_status = rbt_insert(pptp->cache_q_rbt, pending_qa->q, pending_qa, 0); if(rbt_status == RBT_STATUS_OK) { pending_qa = NULL; /* preventing its deallocation */ /* keep size below limit by expunging oldest entries */ trim_qa_rbt(pptp->cache_q_rbt, pptp->cache_maxentries); } else if(rbt_status != RBT_STATUS_DUPLICATE_KEY) { assert(0); /* trouble if it's something else... */ } } /* if not placed in cache, destroy the retrieved qa */ raw_qa_destroy(pending_qa); } pthread_mutex_unlock(&pptp->mutex); /* ///// UNLOCK ///// */ } dns_msg_destroy(pdm); } DNSpacket_destroy(pdnp); } /* end processing of DNS packet retrieved from FIFO */ /* scan pending_q_rbt and cache_q_rbt to expire old records */ pthread_mutex_lock(&pptp->mutex); /* \\\\\\ LOCK \\\\\\ */ purge_expired(pptp->cache_q_rbt); pthread_mutex_unlock(&pptp->mutex); /* ///// UNLOCK ///// */ pthread_mutex_lock(&pptp->mutex); /* \\\\\\ LOCK \\\\\\ */ purge_expired(pptp->pending_q_rbt); pthread_mutex_unlock(&pptp->mutex); /* ///// UNLOCK ///// */ /* KadC_log("."); */ } return NULL;}int millisleep_exit(DNSIO *pdnsio, int millis) { int sf; pthread_mutex_lock(&pdnsio->mutex); /* \\\\\\ LOCK \\\\\\ */ sf = pdnsio->shutdown_flag; pthread_mutex_unlock(&pdnsio->mutex); /* ///// UNLOCK ///// */ if(sf) return 1; millisleep(500); return 0;}static void *publishing_thread(void *p){ processing_thread_params *pptp = p; int i; for(;;) { unsigned long int extip; extip = KadC_getextIP(pptp->pkcc); if(extip == 0 || pptp->nmy_pseudo <= 0) {#ifdef VERBOSE_DEBUG KadC_log("Waiting... our external IP address is %s, nmy_pseudo = %d.\n", htoa(extip), pptp->nmy_pseudo);#endif if(millisleep_exit(pptp->pdnsio, 500)) goto exit; } else { /* only if we are connected */#ifdef DEBUG KadC_log("Connected! Our external IP address is %s .\n", htoa(extip)); KadC_log("Going to publish records for the %d \"-d\" params\n", pptp->nmy_pseudo);#endif /* and even then, wait for 1 min to ensure the kbuckets are reasonably full * / for(i=0; i < 60*2; i++) { if(millisleep_exit(pptp->pdnsio, 500)) goto exit; } */ for(i=0; i < pptp->nmy_pseudo; i++) { char index[1024]; char value[64]; char metalist[1024]; char *mpd = strdup(pptp->my_pseudo[i]); char *arg; int i, n_published; assert(mpd != NULL); if((arg=strchr(mpd, '=')) != NULL) { *arg++ = 0; sprintf(metalist,"KadCapp=namecache;rrtype=A;ip=%s", arg); } else { sprintf(metalist,"KadCapp=namecache;rrtype=A;ip=%s", htoa(extip)); } sprintf(index, "KadC::namecache::%s", mpd); free(mpd); sprintf(value, "#1234"); /* any idea for a meaningful use? */ /* p KadC::namecache::www.xxx.yyy #12 KadCapp=namecache;rrtype=A;ip=1.2.3.4 */ for(i=0, n_published=0; i<5 && n_published < 15; i++) { int n; /* repeat up to 5 times, or until a total of at least 15 nodes have received the publishing. (Yes, the same nodes could receive it multiple times: nothing's perfect...) Usually one round suffices. */ n = KadC_republish(pptp->pkcc, index, value, metalist, 20, 30); /* 20 threads, max 30 s */#ifdef DEBUG KadC_log("Record published to %d peernodes\n", n);#endif n_published += n; }#ifdef DEBUG KadC_log("Totally, record published to %d peernodes\n", n_published );#endif } { int sleeptime = 2*3600-60; /* times 2 because each step of slep is 500 ms */ time_t wakeuptime = time(NULL)+sleeptime;#ifdef DEBUG KadC_log("Done. Going to sleep for %d seconds, till %s", sleeptime, ctime(&wakeuptime));#endif /* sleep for 2 hours before republishing */ for(i=0; i < sleeptime * 2; i++) { if(millisleep_exit(pptp->pdnsio, 500)) goto exit; } } } }exit:#ifdef DEBUG KadC_log("Thread publishing_thread() terminated.\n");#endif return NULL;}/* opens socket setting pdnsio->fd; returns pdnsio->fd or error status */static int udp_dns_init(DNSIO *pdnsio) { int fd, on = 1; struct sockaddr_in local; int status; memset(&(local), 0, sizeof(struct sockaddr_in)); local.sin_family = AF_INET; local.sin_addr.s_addr = htonl(pdnsio->ip); local.sin_port = htons(pdnsio->port); status = socket(AF_INET, SOCK_DGRAM, 0); if(status < 0) goto err; fd = status; status = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on)); if(status < 0) goto err; status = bind(fd, (struct sockaddr *)&local, (socklen_t)sizeof(struct sockaddr_in)); if(status < 0) goto err; pdnsio->err = 0; pdnsio->fd = fd; return fd;err:#ifdef __WIN32__ pdnsio->err = WSAGetLastError();#else pdnsio->err = errno;#endif return status;}void *dns_udp_recv_thread(void *p) { DNSIO *pdnsio = p; unsigned char buf[1024]; int bufsize = sizeof(buf); for(;;) { DNSpacket *dp; int nrecv; struct sockaddr_in remote; socklen_t sa_len = sizeof(struct sockaddr_in); /* NOTE: if a datagram longer than t->size arrives, the buffer is only filled in with the first t->size characters; however: - With Cygwin, the whole datagram size is returned in nrecv, WITHOUT TRUNCATION - With -mno-cygwin, nrecv returns -1 */ /* the following select() works around a problem with OpenBSD and possibly other BSD systems as well: close(fd) in one thread hangs if another thread is waiting for input. So, we make this thread wait in select(), BEFORE the recvfrom(). */ int status; fd_set rset; struct timeval timeout; int sf; pthread_mutex_lock(&pdnsio->mutex); /* \\\\\\ LOCK \\\\\\ */ sf = pdnsio->shutdown_flag; pthread_mutex_unlock(&pdnsio->mutex); /* ///// UNLOCK ///// */ if(sf) break; for(;;) { pthread_mutex_lock(&pdnsio->mutex); /* \\\\\\ LOCK \\\\\\ */ sf = pdnsio->shutdown_flag; pthread_mutex_unlock(&pdnsio->mutex); /* ///// UNLOCK ///// */ if(sf) goto exit; FD_ZERO(&rset); FD_SET(pdnsio->fd, &rset); timeout.tv_sec = 0; timeout.tv_usec = 500000; /* 500 ms timeout */ status = select(pdnsio->fd+1, &rset, NULL, NULL, &timeout); if(status > 0){ break; /* data available, go read it */ } }; /* wait outside recvfrom */ pthread_mutex_lock(&pdnsio->mutex); /* \\\\\\ LOCK UDPIO \\\\\\ */ nrecv = recvfrom( pdnsio->fd, (char *)buf, bufsize, 0, (struct sockaddr *)&remote, &sa_len); pthread_mutex_unlock(&pdnsio->mutex); /* ///// UNLOCK UDPIO ///// */ if(nrecv > bufsize) nrecv = -1; /* in UNIX as in WIN32 ignore oversize datagrams */ if(nrecv <= 0) {/* ...catch oversize datagrams */ continue; /* in case of other errors, just skip this datagram */ } /* allocate a DNSpacket */ dp = malloc(sizeof(DNSpacket)); assert(dp != NULL); dp->remoteip = ntohl(remote.sin_addr.s_addr); dp->remoteport = ntohs(remote.sin_port); dp->fd = -1; /* means "packet arrived over UDP" */ /* allocate a copy of buffer referenced by DNSpacket */ dp->buf = malloc(nrecv); assert(dp->buf != NULL); memcpy(dp->buf, buf, nrecv); dp->bufsize = nrecv;#ifdef VERBOSE_DEBUG /* DEBUG ONLY */ { int i; KadC_log("dns_udp_recv_thread - received from %s:%d %d bytes:", htoa(dp->remoteip), dp->remoteport, dp->bufsize); for(i=0; i < dp->bufsize /* && i < 48 */; i++) { if((i % 16) == 0) KadC_log("\n"); KadC_log("%02x ", dp->buf[i]); } } KadC_log("\n================================\n");#endif /* enqueue the DNSpacket */ if(pdnsio->fifo->enq(pdnsio->fifo, dp) != 0) { /* if FIFO full, drop the packet? */ free(dp->buf); free(dp); } }exit:#ifdef __WIN32__ pdnsio->err = closesocket(pdnsio->fd);#else pdnsio->err = close(pdnsio->fd);#endif return NULL;}/* blocking UDP send() */int udp_send(DNSIO *pd, unsigned char *buf, int buflen, unsigned long int destip, int destport) { int status; struct sockaddr_in destsockaddr; int fd = pd->fd; memset(&destsockaddr, 0, sizeof(struct sockaddr_in)); destsockaddr.sin_family = AF_INET; destsockaddr.sin_port = htons((unsigned short int)destport); destsockaddr.sin_addr.s_addr = htonl(destip); /* this lock protects the fd from concurrent access by separate threads either via sendto() recvfrom() */ pthread_mutex_lock(&pd->mutex); /* \\\\\\ LOCK UDPIO \\\\\\ */ status = sendto(fd, (char *)buf, buflen, 0, (struct sockaddr *)&destsockaddr, (socklen_t)sizeof(destsockaddr)); pthread_mutex_unlock(&pd->mutex); /* ///// UNLOCK UDPIO ///// */ return status;}/* end blocking send() *//* blocking TCP send() FIXME: not used at the moment, need to write listener */int tcp_send(DNSIO *pd, unsigned char *buf, int buflen, int fd) { int status;#ifdef MSG_NOSIGNAL status = send(fd, (char *)buf, buflen, MSG_NOSIGNAL);#else status = send(fd, (char *)buf, buflen, 0);#endif#ifdef __WIN32__ closesocket(fd);#else close(fd);#endif return status;}/* forward the packed DNS query in pdnp to the upstream server at the address ip */static int DNSquery(DNSIO *pd, DNSpacket *pdnp, unsigned long int ip) { int status; /* for the time being, only UDP */ status = udp_send(pd, pdnp->buf, pdnp->bufsize, ip, DNS_PORT); return status;}/* send back the reply packed in the DNSpacket pointed by pdnpreply */static int DNSreply(DNSIO *pd, DNSpacket *pdnpreply) { int status; if(pdnpreply->fd < 0) { /* use UDP */ if(pdnpreply->bufsize > 512) { pdnpreply->bufsize = 512; /* truncate to 512 */ pdnpreply->buf[2] |= 0x02; /* set the TC (truncation) flag */ } status = udp_send(pd, pdnpreply->buf, pdnpreply->bufsize, pdnpreply->remoteip, pdnpreply->remoteport); } else { /* use TCP on the same fd (FIXME: TBI) */ status = tcp_send(pd, pdnpreply->buf, pdnpreply->bufsize, pdnpreply->fd); } return status;}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -