//D is a keyword for detail debuging print
#define _REENTRANT
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
#include "common.h"
#include "List.h"
#include "net.h"
#include "Net.h"
#include "Packet.h"
#include "Disk.h"
#include "Buffer.h"
#include "StopWatch.h"
extern bool flag_v,flag_d,flag_bind;
extern bool last_stop;
extern FileList *files;
extern ListIte *hostIte;
extern CntlIn *cntlin;
extern CntlOut *cntlout;
extern DataIn *datain;
extern DataOut *dataout;
long long int readDiskBytes=0;
//---------------
//* Buffer's *-
//---------------
//#define NO_BANK 21 //No of buffer banks
//#define NO_BIND 10 /* Max. seqno distance from readnet() to writenet() *
#define NO_BANK 21 //No of buffer banks
#define NO_BIND 10 /* Max. seqno distance from readnet() to writenet() *
* NO_BIND must less than NO_BANK and follow the rule *
* NO_BANK >= 2*NO_BIND +1 */
ServerBuffer *dataBuffer[NO_BANK];
DataPacket *dataPacket[NO_BANK];
/*----------------------------*
* Very ugly grobal common %-)*
*----------------------------*/
bool writeNetXoff=false;
/*-----------------------*
* Exception recieve *
*-----------------------*/
//Name: [ exception_reciever() ]
//Func: manage control command.
void exception_reciever(NetRecv *net)
{
int ret;
CmdPacket cmdpack;
Net::sr_retval srret;
void (*save_function)(NetRecv*);
if (flag_v) {
fprintf(stderr,"!!!! get control packet. !!!!\n"); }
save_function = net->register_exception(net,NULL);
if (net->get_accept_sock() == (-1)) {
if (net->nonblock_accept()!=Net::SUCCESS) {
fprintf(stderr,"select() got access but cannot accept()."
" in ServThread::exception_reciever()");
exit(1); }
}
srret=net->recv_packet(&cmdpack);
if (srret!=Net::SR_success) /*error or timeout*/ {
fprintf(stderr,"cannot recieve command packet from the host which "
"made an exceptional access.\n"); exit(1); }
ret=cmdpack.get_type();
switch (ret) {
case TYPE_XOFF:
writeNetXoff = true;
if(flag_v) fprintf(stderr,"XOFF exception packet \n");
break;
case TYPE_REPORT:
fprintf(stderr,"CmdMess:'%s'\n",cmdpack.get_message());
break;
case TYPE_RECONNECT :
fprintf(stderr,"I am server. But recieved RECONNECT request."
" in ServThread::exception_reciever() \n");
exit(1);
break;
case TYPE_STOP :
fprintf(stderr,"Got the last node's termination");
break;
default:
fprintf(stderr,"Get unknown command packet(type=%d)\n"
" (see type in packet.h)\n",ret);
last_stop = true;
break;
}
net->register_exception(net,save_function);
cntlin->close_sock();
delete cntlin;
cntlin = new CntlIn();
cntlin->open();
}
/*---------------------*
* readdisk Thread *
*---------------------*/
//Name: [ readdisk() ]
void *readdisk(void *)
{
FileListIte *fileite = new FileListIte(files);
FromDisk *afile; ServerBuffer *buff;
int bankno,prev_bankno,fileno;
bool is_1st_bankround=true; // the bank 1st access=0, 2nd> =1
StopWatch wait_watch, read_watch, all_watch;
int ret;
int ten_mbytes,prev_ten_mbytes;
double now_time_sum=0, prev_time_sum=0;
all_watch.start();
bankno=0; prev_bankno=0;
prev_ten_mbytes=0;
for (fileno=0;;fileno++) {
afile = new FromDisk();
if (!afile->open(fileite->get_inputname())) {
fprintf(stderr,"in ServThread.cpp::readdisk()\n");
exit(1); }
if (flag_v) {
fprintf(stderr,"fileno=%d '%s' opened.\n",fileno,
fileite->get_inputname()); }
int eof; eof=NOT_EOF;
while (eof == NOT_EOF) {
buff = dataBuffer[bankno];
// very tricky code
if (!bankno%YIELD_BANKNO) {
sched_yield();
}
wait_watch.start(); buff->wait_wnet_done(); wait_watch.stop();
// if ((bankno!=prev_bankno) && flag_bind )
if (flag_bind && !is_1st_bankround) {
int check_writenet_bank;
check_writenet_bank=(bankno-NO_BIND+NO_BANK)%NO_BANK;
//= bankno-NO_BIND
DEBPR(printf("readdisk() waiting for bank=%d. I am in %d (seqno=%d+BANKNO(%d))\n",check_writenet_bank,bankno,dataBuffer[bankno]->get_seqno(),NO_BANK););
/*
printf("readdisk() waiting for bank=%d. I am in %d (seqno=%d+BANKNO(%d))\n",check_writenet_bank,bankno,dataBuffer[bankno]->get_seqno(),NO_BANK);
** I saw twice that twobind semaphore did not work. **
** in this case please activate this line
*/
dataBuffer[check_writenet_bank]->wait_bindtwo_go();
DEBPR(printf("wait done!\n"););
}
read_watch.start();
buff->set_fileno(fileno);
eof = afile->read_tobuff(buff);
readDiskBytes += buff->get_contsize();
read_watch.stop();
if (eof == EOF_DETECT) {
DEBPR(printf("+++++++++ readdisk() seqno=%d bankno=%d "
"fileno=%d ..EOF\n", buff->get_seqno(),bankno,fileno););
//<--------- buffer unlock defered
} else if (eof == NOT_EOF) {
DEBPR(printf("+++++++++ readdisk() seqno=%d bankno=%d "
"fileno=%d \n",buff->get_seqno(),bankno,fileno););
buff->set_rdisk_done(); //<--------- buffer unlock
} else /* error */ {
fprintf(stderr,
"fail to read disk in ServThread.cpp::readdisk()\n");
exit(1);
}
prev_bankno=bankno;
bankno=(bankno+1)%NO_BANK; //<--------- bankno modified
if (is_1st_bankround && bankno==0) is_1st_bankround=false;
ten_mbytes = readDiskBytes/10000000LL/*10MB*/;
all_watch.lap();
if (ten_mbytes && (prev_ten_mbytes!=ten_mbytes)) {
now_time_sum = all_watch.get_sum();
printf("%d0MB %.2fMB/s 10MB=%.1fMB/s (%.1fs 10MB=%.1fs)\n",
ten_mbytes,(double)(ten_mbytes*10)/now_time_sum,
10.0/(now_time_sum-prev_time_sum),
now_time_sum, now_time_sum-prev_time_sum);
prev_time_sum = now_time_sum;
}
prev_ten_mbytes = ten_mbytes;
} // until eof, bank loop
afile->close(); delete afile;
if(flag_v) fprintf(stderr,"fileno=%d closed\n",fileno);
if (! fileite->pop_entry() ) {
buff->set_eof(B_FINAL_EOF);
buff->set_rdisk_done(); //<-----------defered unlock
break; //<-----------finish fileloop
} else {
buff->set_eof(B_EOF);
buff->set_rdisk_done(); //<-----------defered unlock
}
} // file loop
all_watch.stop();
if (flag_v) {
fprintf(stderr,"TotalTime=%.2f(sec) DiskRead=%.2f(sec) "
"SemaphoreWait=%.2f(sec)\n",all_watch.get_sum(),
read_watch.get_sum(), wait_watch.get_sum());
fprintf(stderr,"readdisk() ended.. \n");
}
return NULL;
}
/*---------------------------*
* writenet Thread **
*---------------------------*/
//Name: [ recover_sending() ]
//Func: recover from sending error by redirecting alternate host.
void recover_sending(int bankno)
{
CmdPacket rec_pack,ack_pack; DataPacket *pack;
int ii,jj,ret;
Net::ret_value retn;
Net::sr_retval srret;
if (flag_v) {
fprintf(stderr,"Starting recovery process ...\n"); }
dataout->close_sock();
delete dataout;
hostIte->set_flag(NOAVIL_HOST);
if (!hostIte->search_flag(AVAIL_HOST)) {
fprintf(stderr,"Cannot find the next available host. "
"Server::writenet() \n");
hostIte->all_print(); exit(1); }
if (flag_v) {
fprintf(stderr,"start to connect to %s...\n",hostIte->get_name());}
cntlout = new CntlOut(hostIte->get_name());
if (cntlout->open_connect()!=Net::SUCCESS) {
fprintf(stderr,"the next host candidate %s is not available\n",
hostIte->get_name()); exit(1); }
if (flag_v) {
fprintf(stderr,"Send RECONNECT command packet.. \n"); }
rec_pack.make_command(TYPE_RECONNECT,0);
srret = cntlout->send_packet(&rec_pack);
if ( srret != Net::SR_success) {
fprintf(stderr,"Cannot send REconnect packet to %s in Server::"
"recover_sending() (ret=%d)\n", hostIte->get_name(),srret);
exit(1); }
if (flag_v) {
fprintf(stderr,"Waiting RECONNECT ACK ...\n"); }
int req_seqno, now_seqno, lowest_seqno, resend_count, re_bankno;
while (1) { // waiting for RECONNECT Ack
if (cntlout->recv_packet(&ack_pack) != Net::SR_success ) {
fprintf(stderr,"Server::recover_sending() reconnect Ack cannot"
"be recieved\n"); exit(1); }
if (ack_pack.get_type() == TYPE_RECONNECT_ACK) {
req_seqno=ack_pack.get_operand();
break; //<--- break from RECONNECT wait
} else {
fprintf(stderr,"type=%d is recieved, when RECONNECT_ACK is"
" expected \n",ack_pack.get_type()); exit(1);
} // wait for Ack packet loop
}// wait for RECONNECT ACK
if (flag_v) {
fprintf(stderr,"RECONNECT Ack(seqno=%d) is recieved from %s.\n"
, req_seqno, hostIte->get_name()); }
now_seqno = dataBuffer[bankno]->get_seqno();
// lowest_seqno =(now_seqno-NO_BANK-1); //-1 is by readnet()
lowest_seqno = (now_seqno-NO_BIND); //is guranteed by BIND mech.
if (req_seqno<lowest_seqno) {
fprintf(stderr,"I don't have the requested seq_no's bank data."
"request seqno(%d), I have (%d-%d)\n", req_seqno,
lowest_seqno,now_seqno);
fprintf(stderr,"writenet() recovery was failed."
"Server::recover_sending() \n");
exit(1);
} else if (now_seqno<req_seqno) {
fprintf(stderr,"Requested seqno(%d) is larger than my present processing "
"seqno(%d).\n It means your requested file chunk is the chunk"
" which is in the previous file. \n"
"For now, the program doesnot support such situation. sorry!\n",
req_seqno,now_seqno);
fprintf(stderr,"writenet() recovery was failed."
"ClntThread::recover_sending() \n");
exit(1);
}
if (flag_v)
fprintf(stderr,
"resend databuffer from seqno=%d(req) to seqno=%d(now)\n",
req_seqno,now_seqno);
dataout = new DataOut(hostIte->get_name());
sleep(1); // wait for reciever node ready 20010806
if (dataout->open_connect()!=Net::SUCCESS) {
fprintf(stderr,"fail to connect to %s(%d) in connection recovery.\n",
dataout->get_hostname(), dataout->get_portno());
exit(1);
}
resend_count= (now_seqno-req_seqno) +1; //+1: include myself
// print for verbose
ii = bankno;
fprintf(stderr,"now bankno=%d,seqno=%d \n",ii,
dataBuffer[ii]->get_seqno());
ii = (ii-1+NO_BANK) % NO_BANK;
fprintf(stderr,"-1 bankno=%d,seqno=%d \n",ii,
dataBuffer[ii]->get_seqno());
ii = (ii-1+NO_BANK) % NO_BANK;
fprintf(stderr,"-2 bankno=%d,seqno=%d \n",ii,
dataBuffer[ii]->get_seqno());
ii = (ii-1+NO_BANK) % NO_BANK;
fprintf(stderr,"-3 bankno=%d,seqno=%d \n",ii,
dataBuffer[ii]->get_seqno());
// end printing
re_bankno = (bankno + NO_BANK - (resend_count-1)) % NO_BANK;
int rr_bankno;
for (ii=resend_count,jj=0; ii>0; ii--,jj++) {
rr_bankno = (re_bankno + jj)%NO_BANK;
pack = dataPacket[rr_bankno];
if (dataBuffer[rr_bankno]->get_seqno() != (req_seqno+jj)) {
fprintf(stderr,"Requested filechunk(seqno=%d+%d) is not matched to "
"buffer packet filechunk(seqno=%d,bankno=%d)\n",
req_seqno,jj,dataBuffer[rr_bankno]->get_seqno(),rr_bankno);
exit(1);
}
pack->make(0,0);
srret = dataout->send_packet(pack);
if (srret==Net::SR_error) {
fprintf(stderr,"Double send error is not recoverable(1)\n");
exit(1);
}else if (srret==Net::SR_connectionlost) {
fprintf(stderr,"Double send error is not recoverable(TIMEOUT)\n");
exit(1);
}
if (flag_v)
fprintf(stderr,"sending seqno=%d(bank=%d) is succeeded\n",
dataBuffer[rr_bankno]->get_seqno(),rr_bankno);
} /* resend_count loop */
if (flag_v)
fprintf(stderr,"Return from recover_sending.\n");
}
/*--------------------*
* writenet() Thread *
*--------------------*/
//Name: [ writenet() ]
void *writenet(void *)
{
int ii,bankno;
DataPacket *pack; ServerBuffer *buff;
StopWatch all_watch,write_watch,wait_watch,misc_watch;
Net::sr_retval srret;
FileNoPacket filenopack;
all_watch.start();
cntlin = new CntlIn();
cntlin->open();
dataout->register_exception(cntlin,exception_reciever);
filenopack.make(0,0);
for (bankno=0;;bankno=(bankno+1)%NO_BANK) {
misc_watch.start();
buff = dataBuffer[bankno]; pack = dataPacket[bankno];
// very tricky code
if (!bankno%YIELD_BANKNO) {
sched_yield();
}
wait_watch.start(); buff->wait_rdisk_done(); wait_watch.stop();
//printf("\twait rdisk done! \n");
// At the first chunk of file sending, fileno packet is sent
//////////////////////////
// filenopacket sending //
//////////////////////////
if (buff->get_seqno()==0) {
if (flag_v) {
fprintf(stderr,"filenopacket(fileno=%d) sending \n",
buff->get_fileno()); }
filenopack.set_fileno((short int)buff->get_fileno());
int retry_count;
for (retry_count=0;retry_count<MAX_XOFF_WAIT;retry_count++) {
//printf("\tfile_packet(bankno=%d)\n",bankno);
srret=dataout->send_packet(&filenopack);
//printf("\tfile_packet(bankno=%d) done\n",bankno);
if (srret==Net::SR_success) /*send success*/ {
writeNetXoff=false; // reset XOFF
break;
} else if (srret==Net::SR_connectionlost) /*timeout or EPIPE*/ {
if (writeNetXoff) {
continue; // retry sending
} else /* not Xoff status */ {
if (flag_v) {
fprintf(stderr,"fail to send 'filenopack' to %s\n",
hostIte->get_name());}
recover_sending(bankno);
writeNetXoff=false;
break;
}
} else /* send_packet error */ {
fprintf(stderr,"sned filno packet fail(fileno=%d)"
"in Server.cpp::writenet()\n",buff->get_fileno());
exit(1);
} // send_packet error ?
} // send filenopack retry loop;
} // do I send filenopack ?
////////////////////////
// datapacket sending //
////////////////////////
pack->make(0,0);
int retry_count;
for (retry_count=0;retry_count<MAX_XOFF_WAIT;retry_count++) {
write_watch.start();
//D printf("send_packet seq=%d bank=%d\n",buff->get_seqno(),bankno);
srret=dataout->send_packet(pack);
//D printf("send_packet done !(size=%d)\n",ret);
write_watch.stop();
if (srret==Net::SR_success) /*success*/ {
writeNetXoff=false; //<---- reset XOFF
break; //<---- break retry!
} else if (srret==Net::SR_connectionlost) /*timeout or pipe broken*/ {
if (writeNetXoff) {
if (flag_v) {
fprintf(stderr,"sending retry in writenet()\n");
}
continue; //<---- retry !
} else {
if (flag_v) {
fprintf(stderr,"fail to send 'datapacket'(seqno=%d) to %s\n",
buff->get_seqno(),hostIte->get_name()); }
recover_sending(bankno);
writeNetXoff=false;
break;
}
} else /* send_packet error */ {
fprintf(stderr,"send data packet fail in "
"Server.cpp::writenet()\n"); exit(1);
}
} // send data packet retry loop;
int eof;
eof = buff->get_eof(); // refer must be before semaphore set;
DEBPR(printf("********write net done bank=%d seqno=%d\n",bankno,buff->get_seqno()););
/* I saw twice that twobind sempahore did not work.in the case,
please activate the line.
printf("********write net done bank=%d seqno=%d\n",bankno,buff->get_seqno());
*/
buff->set_wnet_done();
if (flag_bind) buff->set_bindtwo_go();
if (eof == B_FINAL_EOF ){
DEBPR(printf("--------- final bankno=%d, seqno=%d eof=%d\n",
bankno,buff->get_seqno(),buff->get_eof()););
break;
}else{
DEBPR(printf("...... writenet() bank=%d seqno=%d fileno=%d"
"eof=%d\n",bankno,buff->get_seqno(),buff->get_fileno(),
buff->get_eof()););
misc_watch.stop();
}
}// bankno loop
all_watch.stop();
if (flag_v) {
fprintf(stderr,"Totaltime=%.2f(sec) WriteNetTime=%.2f(sec)"
" WaitSemaphoreTime=%.2f(sec) miscTime=%.2f(sec)\n",
all_watch.get_sum(),write_watch.get_sum(), wait_watch.get_sum(),
misc_watch.get_sum());
fprintf(stderr,"writenet() ended..\n");
}
return NULL;
}
//****************************//
// read disk and write to net //
//****************************//
void data_transfer()
{
int ii;
StopWatch elapse_watch;
double elapse_time;
for (ii=0;ii<NO_BANK;ii++) {
dataBuffer[ii]=new ServerBuffer();
dataBuffer[ii]->set_wnet_done(); // initially all 'ready'
// dataBuffer[ii]->set_bindtwo_go(); // initially 'go'
dataPacket[ii]=new DataPacket();
dataPacket[ii]->attach(dataBuffer[ii]);
}
elapse_watch.start();
if (flag_v)
fprintf(stderr,"starting threads \n");
void *readdisk(void *);
pthread_t th_readdisk;
if (pthread_create(&th_readdisk,NULL,readdisk,NULL)) {
perror("Server pthread readdisk() creation failed");
exit(2);
}
if (flag_v)
fprintf(stderr,"readdisk() thread started.\n");
void *writenet(void *);
pthread_t th_writenet;
if (pthread_create(&th_writenet,NULL,writenet,NULL)) {
perror("Server pthread writenet() creation failed");
exit(2);
}
if (flag_v)
fprintf(stderr,"writenet() thread started. ALL are ready !\n");
if (pthread_join(th_readdisk,NULL)) {
perror("readdisk() cannot join.\n");
}
if (pthread_join(th_writenet,NULL)) {
perror("writenet() cannot join.\n");
}
elapse_time=elapse_watch.stop();
printf("Transfer Bytes=%lldMB. Transfer Speed=%.2fMB/s \n",
readDiskBytes/1000000LL,
((double)(readDiskBytes/1000000LL))/elapse_time);
while(1) {
sleep(1);
if ( last_stop ) break;
}
elapse_time=elapse_watch.stop();
printf("Filnal Transfer Speed=%.2fMB/s \n",
((double)(readDiskBytes/1000000LL))/elapse_time);
}
syntax highlighted by Code2HTML, v. 0.9.1