//D is a keyward for debug printing #define _REENTRANT #include #include #include #include #include #include #include "net.h" #include "Net.h" #include "Packet.h" #include "Disk.h" #include "StopWatch.h" // just for getTime() #include "common.h" extern bool flag_v,flag_d,flag_bind; extern char * serverName; extern char * myName; extern char * nextName; extern bool isNextServer; extern PacketIte *hostIte; extern PacketIte *fileIte; extern DataOut *dataout; extern DataIn *datain; extern CntlOut *cntlout; extern CntlIn *cntlin; extern CntlOut *repout; bool writeNetXoff=false; int readSeqNo=0; /* for REC_ACK. */ //******************************** //*- Data Transfer ---------* //******************************** //#define NO_BANK 21 //#define NO_BIND 10 /* Max. seqno distance from readnet() to writenet() * #define NO_BANK 21 #define NO_BIND 10 /* Max. seqno distance from readnet() to writenet() * * NO_BIND must less than NO_BANK and follow the rule * * NO_BANK >= 2*NO_BIND +1 */ ClientBuffer *dataBuffer[NO_BANK]; DataPacket *dataPacket[NO_BANK]; // These bank no was originally local to each subroutine // But became global to report bank status. int readnet_bankno; int writnet_bankno; int writdsk_bankno; int diff_rwnet=-1; int diff_wnet_dsk=-1; //********************** //** readnet Thread ** //********************** class ReconnectRequest { }; void exception_reciever(NetRecv *net ); //Name: [ readnet() ] void *readnet(void *) { int seqno,prev_bankno; bool is_1st_bankround=true; // the bank 1st access=0, 2nd> =1 int fileno,no_file; ClientBuffer *buff; DataPacket *pack; FileNoPacket *filenopack; Net::sr_retval srret; extern int readnet_bankno; //* cntlin is newed in Client.cpp --> changed 20011128 //* cntlin is now created writenet() //* if the following line is active. sometimes //* cntlin is not created yet. // datain->register_exception(cntlin,exception_reciever); if (isNextServer) { // if next is Server, writenet doesnot exit // So exception catch is done here. cntlin = new CntlIn(); cntlin->open(); datain->register_exception(cntlin,exception_reciever); fprintf(stderr,"exception is registrated in readnet() \n"); } no_file = fileIte->get_noentry(); if (flag_v) { fprintf(stderr,"start readnet process %d files. %s\n", no_file,getTime()); } filenopack = new FileNoPacket(); readnet_bankno=0; fileno=0; while (filenorecv_packet(filenopack); if (srret != Net::SR_success) { fprintf(stderr, "cannot recieve filenopack in Client.cpp::readnet() (%d=recv_packet()) %s\n" ,srret,getTime()); exit(1);} // data integrity check if (fileno != filenopack->get_fileno()) { fprintf(stderr, "fileno(%d) in 'filenopacket' is not matched to the expected fileno=%d.!! %s\n" ,filenopack->get_fileno(), fileno,getTime()); exit(1);} if (flag_v) { fprintf(stderr,"file No.=%d processing...%s\n",fileno,getTime()); } // Sometimes 'writedisk openfile' takes more than 1min. // I guess it happens because readnet monopolyze CPU and writedisk() // cannot get CPU for open the file. // So I put sleep() for giving a time for file opening to writedisk() sched_yield(); for (seqno=0;;seqno++) { readSeqNo=seqno; /* for REC_ACK */ buff = dataBuffer[readnet_bankno]; pack = dataPacket[readnet_bankno]; // very tricky code if (!readnet_bankno%YIELD_BANKNO) { sched_yield(); } buff->wait_wdisk_done(); // if ((readnet_bankno!=prev_bankno) && flag_bind ) { if (flag_bind && !is_1st_bankround && !isNextServer) { int check_writenet_bank; check_writenet_bank=(readnet_bankno-NO_BIND+NO_BANK)%NO_BANK; //= readnet_bankno-NO_BIND dataBuffer[check_writenet_bank]->wait_bindtwo_go(); } buff->set_fileno(fileno); //2001.0514 try { // reconnect event capture try. (happen in recv_packet()) DEBPR(printf("reading seqno=%d\n",seqno);); srret = datain->recv_packet(pack); DEBPR(printf("done! (seqno=%d)\n",seqno);); } catch (ReconnectRequest) { //reconnect event captured. datain->close_sock(); delete datain; datain = new DataIn(); if (datain->open_accept() != Net::SUCCESS) { fprintf(stderr,"fail to accept connection in recovery process. %s\n" ,getTime()); exit(2); } if(flag_v) fprintf(stderr,"reconnected .. \n"); srret=datain->recv_packet(pack); } // with same fileno,seqno & bankno. if (srret!=Net::SR_success) { //0=timeout will have exception throw fprintf(stderr,"DataIn recieve Packet failed. " "datain->recv_packet(SPack) in ClntThread::readnet(). %s\n", getTime()); exit(1); } if (seqno != buff->get_seqno()) { fprintf(stderr, "buffer seq no(%d) in packet and expected seqno(%d) was not same! %s\n", buff->get_seqno(),seqno,getTime()); exit(1); } //Dfprintf(stderr,"manabe recv seqno=%d..\n",seqno); if (buff->get_eof()) { if (flag_v) { fprintf(stderr,"readnet() detect EOF packet for" " fileno=%d/#_of_files=%d (fileno=0,1..)\n" ,fileno,no_file); } if ((fileno+1) == no_file) { // if the last file to recv. fprintf(stderr,"readnet() detect Final EOF packet\n"); buff->set_eof(B_FINAL_EOF); } buff->set_rnet_done(); readnet_bankno=(readnet_bankno+1)%NO_BANK; //<------- bankno modified if (is_1st_bankround && readnet_bankno==0) is_1st_bankround=false; prev_bankno=readnet_bankno; break; //<--------- go to next fileno }else /* =not eof */{ buff->set_rnet_done(); readnet_bankno=(readnet_bankno+1)%NO_BANK; //<------- bankno modified if (is_1st_bankround && readnet_bankno==0) is_1st_bankround=false; prev_bankno=readnet_bankno; } // eof IF } // seqno loop (bankno loop) fileno++; //<--------- fieno modified } // file loop if (flag_v) { fprintf(stderr,"readnet() ended ....\n"); } delete filenopack; return NULL; } //********************** //** writenet Thread ** //********************** //Name: [ exception_reciever ] //Function: manage exceptional command packet void exception_reciever(NetRecv *net ) { CmdPacket cmdpack,reqack_pack; int ret; void (*save_function)(NetRecv*); char message[256]; Net::sr_retval srret; save_function = net->register_exception(net,NULL); if (flag_v) { fprintf(stderr,"!!!! get control packet !!! \n"); } if (net->get_accept_sock() == (-1)) { if (net->nonblock_accept()!=Net::SUCCESS) { fprintf(stderr, "cannot accept at the port in ClntThread::exception_reciever(). %s\n" ,getTime()); exit(1); } } srret=net->recv_packet(&cmdpack); if (srret!=Net::SR_success) { fprintf(stderr, "cannot recieve any command packet from the host which made expt. access.%s\n" ,getTime()); exit(1); } ret=cmdpack.get_type(); switch (ret) { case TYPE_XOFF: writeNetXoff = true; if(flag_v) fprintf(stderr,"XOFF exception packet recv.\n"); break; case TYPE_RECONNECT : if(flag_v) fprintf(stderr,"RECONNECT exception packet recv.\n"); reqack_pack.make_command(TYPE_RECONNECT_ACK,readSeqNo); srret=net->send_packet(&reqack_pack); if (srret != Net::SR_success) { fprintf(stderr, "Cannot send REconnectAck to %s in ClntThread::exception_recver(). %s\n" ,net->get_hostname(),getTime()); exit(1); } throw ReconnectRequest(); break; case TYPE_REQBANKST: if(flag_v) fprintf(stderr,"REPORT REQ exception packet recv.\n"); fprintf(stderr,"Bank information; [%d-%d-%d] read-writenet(%d)," "write net-disk(%d)\n",readnet_bankno,writnet_bankno,writdsk_bankno, diff_rwnet,diff_wnet_dsk); if (diff_rwnet == -1) { sprintf(message,"I am ready to connect.[%d-%d-%d] (%d-%d)", readnet_bankno,writnet_bankno,writdsk_bankno,diff_rwnet, diff_rwnet,diff_wnet_dsk); }else { if (flag_bind) { sprintf(message,"BankSt: Bank(%d),BIND(%d),[%d-%d-%d]," "read-writenet(%d),write net-disk(%d)", NO_BANK,NO_BIND, readnet_bankno,writnet_bankno,writdsk_bankno, diff_rwnet,diff_wnet_dsk); } else { sprintf(message,"BankSt: Bank(%d),[%d-%d-%d]," "read-writenet(%d),write net-disk(%d)\n", NO_BANK,readnet_bankno,writnet_bankno,writdsk_bankno, diff_rwnet,diff_wnet_dsk); } } reqack_pack.make_command(TYPE_ACKBANKST,message,strlen(message)); srret=net->send_packet(&reqack_pack); if (srret != Net::SR_success) { fprintf(stderr, "Cannot send Ack against BankStatusRequest to %s in ClntThread::exception_recver(). %s\n" ,net->get_hostname(),getTime()); exit(1); } break; default: fprintf(stderr,"Get unknown command packet(type=%d) (see packet.h). %s\n", ret,getTime()); exit(1); break; } net->register_exception(net,save_function); cntlin->close_sock(); delete cntlin; cntlin = new CntlIn(); cntlin->open(); } // Name : set_noav_and_find_nexthost() // Return: success(char *), fail(NULL) char * set_noav_and_find_nexthost(PacketIte *hostite) { int len; if (isNextServer) { fprintf(stderr,"Now the next host is the server, so there are no " " alternatives for the next host.\n"); return(NULL); } hostIte->set_flag(NOAVIL_HOST); delete nextName; if (hostIte->search_flag(AVAIL_HOST)!=0) { isNextServer=false; // not needed. but for easy to read len = hostite->get_name_len(); nextName = new char(len+1); strncpy(nextName,hostite->get_name(),len); nextName[len]='\0'; } else { // reached to the bottom of the host list isNextServer=true; len = strlen(serverName); // strlen is without '\0' nextName = new char(len+1); strncpy(nextName,serverName,len); nextName[len]='\0'; } return(nextName); } // Name: [ recover_sending() ] // void recover_sending(int bankno) { CmdPacket rec_pack,ack_pack,rep_pack; DataPacket *pack; int ii,jj,ret; char *next; char message[256]; Net::ret_value retn; Net::sr_retval srret; fprintf(stderr,"Starting recovery process ...\n"); if (flag_v) { fprintf(stderr,"Reporting the situation to the server.\n"); } if (!isNextServer) { if (repout == NULL) { repout = new CntlOut(serverName); retn = repout->open_connect(); if (retn != Net::SUCCESS) { fprintf(stderr,"cannot connect to the server in reporting" "reconnection (ClntThread.cpp::recover_sending()). %s\n", getTime()); exit(1); } } sprintf(message,"(<%s): Stop to sending to %s.",myName,nextName); ret = strlen(message); rep_pack.make_command(TYPE_REPORT,message,ret); if (repout->send_packet(&rep_pack) != Net::SR_success) { fprintf(stderr, "cannot send report packet(%s) to the server." "(ClntThread.cpp::recover_sending(). %s\n", message,getTime()); exit(1); } } dataout->close_sock(); delete dataout; next=set_noav_and_find_nexthost(hostIte); if (next==NULL) { fprintf(stderr,"Cannot find the next available host. " "clntThread.cpp::recover_sending(). %s\n",getTime()); hostIte->all_print(); exit(1); } if (flag_v) { fprintf(stderr,"start to connect %s(manabe=%s)\n",nextName,next); } if (isNextServer) { fprintf(stderr,"Reconnecting... but the next node find to be server." " Do nothing.\n"); return; } else { /* next node is not server */ cntlout = new CntlOut(next); if (cntlout->open_connect()!=Net::SUCCESS) { fprintf(stderr,"the next host candidate %s is not available. %s\n", nextName,getTime()); exit(1); } } if (flag_v) { fprintf(stderr,"Send RECONNECT command packet.. \n"); } rec_pack.make_command(TYPE_RECONNECT,0); srret=cntlout->send_packet(&rec_pack); if (srret!=Net::SR_success) { fprintf(stderr,"Cannot send REconnect packet to %s " "in ClntThread.cpp::recover_sending(). %s\n", nextName,getTime()); exit(1); } if (flag_v) { fprintf(stderr,"Waiting RECONNECT ACK ....\n"); } int req_seqno, now_seqno, lowest_seqno, resend_count, re_bankno; while(1) { // waiting for RECONNECT Ack srret=cntlout->recv_packet(&ack_pack); if (srret!=Net::SR_success) { fprintf(stderr, "ClntThread.cpp::recover_sending() reconnect Ack cannotbe recieved. %s\n" ,getTime()); exit(1); } if (ack_pack.get_type() == TYPE_RECONNECT_ACK){ req_seqno=ack_pack.get_operand(); break; // <------- break from RECONNECT wait }else{ // other packet than RECONNECT fprintf(stderr, "type=%d is recieved, when RECONNECT_ACK is expected. %s\n" ,ack_pack.get_type(),getTime()); exit(1); } // wait for Ack packet loop }// wait for RECONNECT ACK if (flag_v) { fprintf(stderr,"RECONNECT Ack(seqno=%d) is recieved from %s.\n" ,req_seqno, nextName); } now_seqno=dataBuffer[bankno]->get_seqno(); // lowest_seqno=(now_seqno-NO_BANK-1); //-1 is by readdisk() lowest_seqno=(now_seqno-NO_BIND); //is guranteed by BIND mech. if (req_seqnoopen_connect()!=Net::SUCCESS) { fprintf(stderr,"fail to reconnect in recorver sending. %s\n",getTime()); exit(1); } re_bankno = (bankno+NO_BANK-(resend_count-1)) % NO_BANK; int rr_bankno; for (ii=resend_count,jj=0;ii>0;ii--,jj++) { rr_bankno = (re_bankno+jj) % NO_BANK; pack = dataPacket[rr_bankno]; if (dataBuffer[rr_bankno]->get_seqno() != (req_seqno+jj)) { fprintf(stderr, "Requested filechunk(seqno=%d+%d) is not matched to buffer packet filechunk(seqno=%d,bankno=%d). %s\n", req_seqno,jj, dataBuffer[re_bankno]->get_seqno(),rr_bankno, getTime()); exit(1); } pack->make(0,0); srret=dataout->send_packet(pack); if (srret != Net::SR_success) { fprintf(stderr,"Double send error is not recoverable(1). %s\n", getTime()); exit(1); } if (flag_v) { fprintf(stderr,"sending seqno=%d(bank=%d) is succeeded\n", dataBuffer[rr_bankno]->get_seqno(),rr_bankno); } } /* resend_count loop */ if (flag_v) fprintf(stderr,"Return from recover_sending.\n"); cntlout->close_sock(); delete cntlout; } // Name: [ writenet() ] // void *writenet(void *) { DataPacket *pack; ClientBuffer *buff; int ret; int thread_return_code; FileNoPacket filenopack; extern int writnet_bankno,diff_rwnet; Net::sr_retval srret; if (isNextServer) { // if next is Server, this routine is useless. if(flag_v) fprintf(stderr,"Next is server, writenet() exit.\n"); thread_return_code =1; pthread_exit(&thread_return_code); } else { cntlin = new CntlIn(); cntlin->open(); dataout->register_exception(cntlin,exception_reciever); } filenopack.make(0,0); for (writnet_bankno=0;;writnet_bankno=(writnet_bankno+1)%NO_BANK) { buff=dataBuffer[writnet_bankno]; pack=dataPacket[writnet_bankno]; //D printf("\trnet wait\n"); //very tricky code if (!writnet_bankno%5) { sched_yield(); } buff->wait_rnet_done(); //D printf("\trnet wait done\n"); diff_rwnet = readnet_bankno - writnet_bankno ; if (diff_rwnet < 0) diff_rwnet += NO_BANK; //Before sending the 1st chunk of file, send 'fileno' packet //////////////////////////// // file no packet sending // //////////////////////////// if (buff->get_seqno() == 0) { if (flag_v) { fprintf(stderr,"filenopacket(fileno=%d) sending.\n", buff->get_fileno()); } filenopack.set_fileno((short int)buff->get_fileno()); int retry_count; for (retry_count=0;retry_countsend_packet(&filenopack); if (srret==Net::SR_success) { // Success writeNetXoff=false; // stop Xoff (reset) break; //<----retry break; }else if (srret==Net::SR_connectionlost) { //timeout or EPIPE if (flag_v) { fprintf(stderr,"filenopack(%d) send timeout.\n", buff->get_fileno()); } if (writeNetXoff) { //in XOFF waiting. continue; //<----retry (XOFF waiting)! }else{ //first recovery if (flag_v) { fprintf(stderr,"Fail to send 'filenopacket' to %s\n", nextName); } recover_sending(writnet_bankno); writeNetXoff=false; // stop Xoff (reset) break; //<----retry break; } }else{ fprintf(stderr, "send fileno packet fail. (fileno=%d) (ClntTread::writenet). %s\n" ,buff->get_fileno(),getTime()); exit(1); } }//send filenopack retry loop; }// send ? filenopack /////////////////////////// // datapacket sending // /////////////////////////// int retry_count; for (retry_count=0;retry_countsend_packet(pack); //D printf("\tsend_packet done\n"); if (srret==Net::SR_success) { //Success writeNetXoff=false; //<----- reset XOFF break; //<----- break retry!; } else if(srret==Net::SR_connectionlost) /*timeout or EPIPE*/{ if (flag_v) { fprintf(stderr,"datapacket send timeout.\n"); } if (writeNetXoff) { //in XOFF waiting. continue; //<----- retry!; } else { //First recovery process if (flag_v) { fprintf(stderr,"fail to send 'datapacket' to %s\n", nextName); } recover_sending(writnet_bankno); writeNetXoff=false; // stop Xoff (reset) break; //<----retry break; } } else /* send_packet error */ { fprintf(stderr, "send data packet fail. (seqno=%d) (ClntTread::writenet). %s\n" ,buff->get_seqno(),getTime()); exit(1); } } // datapacket sending retry loop int eof; eof = buff->get_eof(); //D printf("\tmanabe set wnet done sem\n"); buff->set_wnet_done(); if (flag_bind) buff->set_bindtwo_go(); if (isNextServer) { // if next becomes Server, this routine should stop. if(flag_v) fprintf(stderr,"Next node becomes server, writenet() exit.\n"); thread_return_code =1; pthread_exit(&thread_return_code); } if (eof == B_FINAL_EOF) { break; //<--------- break to the end!! } }// bank loop if (flag_v) { fprintf(stderr,"writnet() stop ... \n"); } return NULL; } //************************ //** writedisk Thread ** //************************ // Name: writedisk // Func: Main routine in writedisk thread void writedisk(void) { ToDisk *afile; ClientBuffer *buff; int eof; const char *filename; int flag; extern int writdsk_bankno,diff_wnet_dsk; writdsk_bankno=0; while (1) /* fileno loop */{ afile = new ToDisk(); filename = fileIte->get_name(); flag = fileIte->get_flag(); if (flag_v) fprintf(stderr,"open before(%s)-",getTime()); if (!afile->open(filename,flag)) { fprintf(stderr,"\nfail to open output file in " "ClntThread.cpp::writedisk(). %s\n",getTime()); exit(1); } if (flag_v) fprintf(stderr,"(%s)after \n",getTime()); if(flag_v) { fprintf(stderr,"writing to '%s' \n",fileIte->get_name()); } while(1) /* bankno loop*/ { buff = dataBuffer[writdsk_bankno]; // very tricky code if (!readnet_bankno%YIELD_BANKNO) { sched_yield(); } if (isNextServer) buff->wait_rnet_done(); else buff->wait_wnet_done(); diff_wnet_dsk = writnet_bankno - writdsk_bankno; if (diff_wnet_dsk < 0) diff_wnet_dsk += NO_BANK; if (!afile->write_frombuff(buff)) { fprintf(stderr,"Disk write failed. %s\n",getTime()); exit(2); } eof=buff->get_eof(); buff->set_wdisk_done(); writdsk_bankno=(writdsk_bankno+1)%NO_BANK; if (eof) { if(flag_v) { fprintf(stderr,"writedisk(%s) packet seq=EOF is detected.\n" ,filename); } break; //<------ break from bankno loop; } }// bank loop afile->close(); if (flag_v) { fprintf(stderr,"%s is closed.\n",filename); } delete afile; if( eof == B_FINAL_EOF) { if(flag_v) { fprintf(stderr,"All files are written \n"); } break; //<------ break from fileno loop; } if(!fileIte->pop_entry()) { fprintf(stderr,"file list becomes empty. Client::writedisk() \n" "but cannot detect B_FINAL_EOF in buffer. %s\n",getTime()); exit(1); } }// file loop if(flag_v){ fprintf(stderr,"writedisk() ended ... \n"); } } //************************************************ // read from net, write to net and write to disk * //************************************************ void data_transfer() { int ii; ClientBuffer *buff; DataPacket *pack; for (ii=0;iiset_wdisk_done(); // initial 'ready' // buff->set_bindtwo_go(); // initial 'go' pack=dataPacket[ii]=new DataPacket(); pack->attach(buff); } void *writenet(void *); void *readnet(void *); pthread_t th_readnet,th_writenet; if (pthread_create(&th_readnet,NULL,readnet,NULL)){ perror("Client pthread readnet() creation failed"); exit(2); } if (flag_v) fprintf(stderr,"readnet() thread started \n"); if (pthread_create(&th_writenet,NULL,writenet,NULL)){ perror("Client pthread writenet() creation failed"); exit(2); } if (flag_v) fprintf(stderr,"writenet() thread started. ALL are ready ! \n"); writedisk(); //Final ending report to server. I am the last node. if (isNextServer) { CmdPacket rep_pack; Net::ret_value retn; char message[100]; int ret; if (repout == NULL) { repout = new CntlOut(serverName); retn = repout->open_connect(); if (retn != Net::SUCCESS) { fprintf(stderr,"cannot connect to the server for ending report. %s\n", getTime()); exit(1); } sprintf(message,"END:(<%s): The last node finish recieving.",myName); ret = strlen(message); rep_pack.make_command(TYPE_STOP,message,ret); if (repout->send_packet(&rep_pack) != Net::SR_success) { fprintf(stderr, "cannot send end report packet(%s) to the server. %s", message,getTime()); exit(1); } } } if (repout!=NULL) { repout->close_sock(); delete repout; } }