下載本文檔
版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
/*#ifndefUTILITY_H_#define#include<stdio.h>#include<stdlib.h>#include<string.h>#include<errno.h>#include<unistd.h>#include<netdb.h>#include<sys/types.h>#include<netinet/in.h>#include<arpa/inet.h>#include<algorithm>#include<fcntl.h>#include<string>#include<iostream>#include<sys/epoll.h>#include<sys/wait.h>#include<dirent.h>#include<sys/stat.h>#include<assert.h>#include<set>#include"fdfs_client.h"#include"fdfs_global.h"#include"logger.h"#include"hiredis/hiredis.h"#includeusingnamespace#defineBlock_Size(1024*externstringserver_home;externstringclient_home;externstringredisServerIP;externstringFastDFSconf;classintadd_register(intepoll_fd,Connection*conn,intintchange_register(intepoll_fd,Connection*conn,intevent_mask);intdel_register(intepoll_fd,Connection*conn,intevent_mask);structend_pointchar*ip_;intend_point(char*ip,intport):ip_(ip),port_(port){}char*ip(){returnip_;}intport(){returnport_;enumTYPEenumCOMMAND{structpacketinttype;intcomd;intbuf_len;charbuf[Block_Size+packet():type(0),comd(0),buf_len(0){memset(buf,0,Block_Size+}packet(constpacket&hp){memset(buf,0,Block_Size+10);type=hp.type;comd= buf_len=hp.buf_len;}intsize()returnsizeof(int)*3+}voidclear()memset(buf,0,Block_Size+type=comd=buf_len=}voidbuild_packet(inttype_,intcomd_,constchar*buf_,intlen_){type=type_;comd=comd_;buf_len=memcpy(buf,buf_,}voidprint()printf("type:%d\n",type);printf("comd:%d\n",comd);printf("buf_len:%d\n",buf_len);//printf("buf:%s\n",}classtypedefbool(*send_file_callback)(Connection*conn,packet*hp,FILE*f);classsend_file_state{send_file_callbackConnection*packet*FILE*intbytes_send;send_file_state(send_file_callbackcallback,Connection*conn,packet*hp,FILE*f){callback_=callback;conn_=conn;hp_=hp;f_=bytes_send=packets_send=}printf("\t%dbytessen%dpacketssent\n",bytes_send,}boolgo_ahead()boolis_finished=(*callback_)(conn_,hp_,f_);if(!is_finished){bytes_send+=hp_->buf_len;}return}#includestaticvoidset_eof(Connection*conn,packet*staticboolrecv_file_server(constchar*local_filename,Connection*conn,packet*hp,FILE*f,int&byte_recv,int&packet_recv);staticvoidsend_file_client(Connection*conn,packet*hp,FILE*f);staticboolrecv_file_client(Connection*conn,packet*hp,FILE*f,int&byte_recv,intendif/*UTILITY_H_*/#includestringserver_home="/home/mutou/ftp_server_simple/server";stringclient_home="/home/mutou/ftp_server_simple/client";intmake_socket_non_blocking(ints_fd){intflags,ret;flags=fcntl(s_fd,F_GETFL,0);if(flags==-1){return-1;}flags|=ret=fcntl(s_fd,F_SETFL,flags);if(ret==-1){return-1;}return}intadd_register(intepoll_fd,Connection*conn,intevent_mask){if(epoll_fd==-1)returnepoll_eventevent_.events=event_mask;intret=epoll_ctl(epoll_fd,EPOLL_CTL_ADD,conn->conn_sock(),&event_);returnret;}intchange_register(intepoll_fd,Connection*conn,intevent_mask){if(epoll_fd==-1)returnepoll_eventevent_.events=event_mask;intret=epoll_ctl(epoll_fd,EPOLL_CTL_MOD,conn->conn_sock(),&event_);returnret;}intdel_register(intepoll_fd,Connection*conn,intevent_mask){if(epoll_fd==-1)returnepoll_eventevent_.events=event_mask;intret=epoll_ctl(epoll_fd,EPOLL_CTL_DEL,conn->conn_sock(),&event_);returnret;}#ifndefCONNECTION_H_#defineCONNECTION_H_#include<queue>#include<iostream>#include"utility.h"#include"blocking_queue.hpp"usingnamespacestd;structypedefbool(*send_func)(SendJob*);enumJobStatus{NEW,//ING,//STOP,//除了緩沖區(qū)的內(nèi)容外,沒(méi)有的內(nèi)容要發(fā)送DEAD//classConnection; dJob{send_funcJobStatusstatus_;Connection*conn_;packet*hp_;intSendJob(packet*hp,Connection*conn){hp_=necket(*hp);status_=STOP;offest_=0;conn_=conn;callback_=NULL;}SendJob(send_funccallback,Connection*conn):callback_(callback){status_=NEW;hp_=necket();offest_=0;conn_=}~SendJob()deletehp_;hp_=NULL;}classConnection{Connection(intepoll_fd,intid,int~Connection()hp=NULL;}intintconn_sock()return}voidset_events(uint32_tevents){events_=events;}uint32_tevents()return}intid()return}intin_pool()returnin_pool_;}terminate_=}intis_terminate()return}voidregister_epoll(uint32_tevents);voidunregister_epoll(uint32_tevents);SendJob*voidappend_send_job(packet*voidappend_send_job(send_funcsender);boolschedule_send_job();ind(packet*hp,intoffest=0);intrecv(packet*hp);intrecvn(intintput_bytes;intconn_sock_;intepoll_fd_;intid_;int intterminate_;charread_buf[Block_Size*3];intread_sz;constintdeque<SendJob*>#endif/*CONNECTION_H_#includeConnection::Connection(intepoll_fd,intid,intMax_Buffer_len(Block_Size*10)id_=id;conn_sock_=conn_sock;epoll_fd_=epoll_fd;in_pool_=0;terminate_=put_bytes=put_packets=read_sz=memset(read_buf,0,sizeofread_buf);pthread_mutex_init(&mutex_,NULL);hp=(packet*)malloc(sizeof(packet));}voidConnection::register_epoll(uint32_t_event){if(epoll_fd_==-1)ScopedLock_(&mutex_);in_pool_=0;}voidConnection::unregister_epoll(uint32_tevents){if(epoll_fd_==-1)events_=events;in_pool_=1;}voidConnection::append_send_job(packet*hp){SendJob*job=newSendJob(hp,this);}voidConnection::append_send_job(send_funcsender){SendJob*job=newSendJob(sender,this);}SendJob*Connection::get_job(){SendJob*job=NULL;while(!s_jobs.empty()){job=if(job->status_==DEAD){deletejob;}else}}if(s_jobs.empty())return}return}boolConnection::schedule_send_job(){SendJob*job=get_job();if(job==NULL)return}if(job->status_==NEW||(job->offest_==job->hp_->size()&&job->status_==ING))//callbackjob->offest_=boolis_last_time=job->callback_(job);if(is_last_time){job->status_=}else}}
job->status_=//offestintnbytes=send(job->hp_,job->offest_);if(nbytes>0)job->offest_+=if(job->offest_==job->hp_->size()&&job->status_==STOP){job->status_=}return}intConnection::send(packet*hp,intoffest){intbyte_to_send=hp->size()-intnbytes=write(conn_sock_,((char*)hp)+offest,byte_to_send);if(nbytes==-1){}printf("Connection::send(),sentnbytes=%d,hp->size()=%d\n",nbytes,hp-return}intConnection::recvn(intbytes_to_recv){intnbytes=0;while(read_sz<bytes_to_recv)nbytes=read(conn_sock_,read_buf+read_sz,bytes_to_recv-if(nbytes==-1)if(errno!=EAGAIN)return-1;}}
return-if(nbytes==0)puts("connectionclose");del_register(epoll_fd_,this,0);return}read_sz+=}return}intConnection::recv(packet*hp){if(epoll_fd_!=-1)puts("recv()");//3inthead_len=sizeof(int)*if(read_sz<head_len)intnbytes=recvn(head_len);if(nbytes<=0)return}//buf_lenintbuf_len=*((int*)(read_buf+2*sizeof(int)));if(read_sz<head_len+buf_len){intnbytes=recvn(head_len+buf_len);if(nbytes<=0)return}memcpy((char*)hp,read_buf,3*sizeof(int)+buf_len);read_sz=0;returnhp-}#ifndefTHREAD_POOL_H_#define#include<stdio.h>#include<stdlib.h>#include<string.h>#include<errno.h>#include<unistd.h>#include<netdb.h>#include<sys/types.h>#include<netinet/in.h>#include<arpa/inet.h>#include<algorithm>#include<fcntl.h>#include<string>#include<iostream>#include<sys/epoll.h>#include<sys/wait.h>#include<dirent.h>#include<queue>#include<sys/stat.h>#include<pthread.h>usingnamespacestd;#include"blocking_queue.hpp"#defineMax_Thread_Num10010#defineDefault_Capacitytypedefvoid(*task_func)(void*);classtp_task{task_funccallback_;void*arg_;tp_task(task_funccallback,void*arg):callback_(callback),arg_(arg)~tp_task()void*thread_main(void*arg);void*gg(void*arg);classThreool;structThreadInfoThreool*intclassThreool{Threool(intthread_num,intcapacity):queue_(){thread_num_=thread_num;capacity_=capacity;terminate_=0;pthread_mutex_init(&mutex_,NULL);pthread_cond_init(&queue_not_empty_,NULL);pthread_cond_init(&queue_not_full_,NULL);for(inti=0;i<thread_num;i++)t_info->pool=this;t_info->t_id=pthread_create(&(thread_id[i]),NULL,thread_main,}pthread_create(&(thread_id[thread_num]),NULL,gg,}Threool(intthread_num):queue_(){thread_num_=thread_num;capacity_=Default_Capacity;pthread_mutex_init(&mutex_,NULL);pthread_cond_init(&queue_not_empty_,NULL);pthread_cond_init(&queue_not_full_,NULL);for(inti=0;i<thread_num;i++)t_info->pool=this;t_info->t_id=pthread_create(&(thread_id[i]),NULL,thread_main,}pthread_create(&(thread_id[thread_num]),NULL,gg,}~Threool(){}intvoidput_task(tp_task*task);tp_task*take_task();boolis_terminate();voidterminate();intthread_num_;intcapacity_;intterminate_;deque<tp_task*>queue_;pthread_mutex_tmutex_;pthread_cond_tqueue_not_full_;pthread_t#endif/*THREAD_POOL_H_thread_poolcpp#include"thread_pool.h"#include<assert.h>#include<stdlib.h>voidThreool::put_task(tp_task*task){ScopedLock_(&mutex_);while(queue_.size()>=capacity_){}//printf("queuesize:%d\n",queue_size());if(queue_.size()==1){}}tp_task*Threool::take_task()//puts("takingtask");while(queue_.empty()&&!terminate_){}if(queue_.empty())returntp_task*task=queue_.front();if(queue_.size()==capacity_-1)}return}intThreool::get_size(){if(terminate_)return-return}boolThreool::is_terminate(){ScopedLock_(&mutex_);returnterminate_;}voidThreool::terminate(){terminate_=1;}voidThreool::wait_for_done(){void*ret;for(inti=0;i<thread_num_;i++){}}void*thread_main(void*arg)ThreadInfo*t_info=(ThreadInfo*)arg;Threool*tp=t_info->pool;intid=t_info-while(true)tp_task*task=tp->take_task();if(task==NULL)//printf("thread[%d]:",id);deletetask;task=NULL;}return}void*gg(void*arg)Threool*tp= FILE*f=fopen("net_size.txt","w");fprintf(f,"start\n");time_ttmp;while(true)intsz=tp->get_size();if(sz==-1)fprintf(f,"%s,size\t%d\n",ctime(&tmp),sz);}return}#include"utility.h"#include<assert.h>#includestringredisServerIP="09";voidFileTransfer::set_eof(Connection*conn,packet*hp){hp->comd=PUT;hp->type=intnbytes=conn-if(nbytes!=hp->size())}voidFileTransfer::send_file_client(Connection*conn,packet*hp,FILE*f){intbytes_read=0,packet_send=0;while(!feof(f))hp->comd=hp->buf_len=fread(hp->buf,1,Block_Size,bytes_read+=hp->buf_len;intnbytes=conn->send(hp);if(nbytes!=hp->size())//printf("\t%dbytessent.\n",//printf("\t%dpacketssent.\n",}set_eof(conn,printf("\t%dbytessent.\n",bytes_read);printf("\t%dpacketssent.\n",packet_send);}while(true)if(hp->type==DATA)byte_recv+=hp->buf_len;fwrite(hp->buf,1,hp->buf_len,}else
printf("\t%dbytesrecv.\n",byte_recv);}}return}boolFileTransfer::recv_file_server(constchar*local_filename,Connection*conn,packet*hp,FILE*byte_recv+=hp->buf_len;if(hp->type==DATA){//if(f!=intnbytes=fwrite(hp->buf,1,hp->buf_len,f);printf("writebyte:%d\n",//intnbytes=conn->bf->write_file(conn->write_f_node,hp->buf,hp->buf_len);if(nbytes==-1){perror("writefile}return}elseif(hp->type==EOF_)printf("\t%dbytesrecv.\n",byte_recv);//intnbytes=conn->bf->write_file(conn->write_f_node,hp->buf,-1);constchar*conf_filename;//charchargroup_name[FDFS_GROUP_NAME_MAX_LEN+1];TrackerServerInfo*pTrackerServer;intintstore_path_index;charfile_id[128];redisReply*reply=NULL;if(rcontext!=NULL&&rcontext->err!=0){printf("ConnecttoredisServer}g_log_context.log_level=conf_filename=if((result=fdfs_client_init(conf_filename))!=0){returnresult;}if(pTrackerServer==NULL){returnerrno!=0?errno:}"errorno:%d,errorinfo:%s\n",\result,STRERROR(result));return}local_filename,NULL,\NULL,0,group_name,reply=(redisReply*) mand(rcontext,"SET%s%s",local_filename,file_id);if(!(reply->type==REDIS_REPLY_STATUS&&strcasecmp(reply->str,"OK")==0))printf("Failedtoexecute}printf("%d%s%s\n",result,local_filename,return}return}#ifndefBLOCKING_QUEUE_HPP_#define#include<stdio.h>#include<iostream>#include<pthread.h>#include<queue>usingnamespacestd;#defineDefault_Queue_Size2010classScopedLock{mutex_=mutex;}~ScopedLock()}classBlockingQueue{BlockingQueue()capacity_=Default_Queue_Size;terminate_=0;pthread_mutex_init(&mutex_,NULL);pthread_cond_init(&queue_not_empty_,NULL);pthread_cond_init(&queue_not_full_,NULL);}BlockingQueue(intcapacity){capacity_=capacity;terminate_=0;pthread_mutex_init(&mutex_,NULL);pthread_cond_init(&queue_not_empty_,NULL);pthread_cond_init(&queue_not_full_,NULL);}~BlockingQueue()}voidput(void*data)ScopedLockwhile(queue_.size()>=capacity_){}if(queue_.size()==1){}}intget_size()if(terminate_)return-1;return}void*take()ScopedLockwhile(queue_.empty()&&!terminate_){}if(queue_.empty())returnNULL;void*data=queue_.front();if(queue_.size()==capacity_-1){}return}boolis_terminate()returnterminate_;}voidterminate()terminate_=1;}intcapacity_;deque<void*>queue_;pthread_mutex_tmutex_;pthread_cond_tqueue_not_full_;#endif/*BLOCKING_QUEUE_HPP_#ifndefFTP_SERVER_H_#define#include"utility.h"#include"thread_pool.h"#defineMax_Connection#defineMax_Events#defineLIMIT_WEEKconststringbase_path="/home/mu classFtpServer;structServerCallArgsFtpServer*Connection*ServerCallArgs(FtpServer*server,Connection*conn){server_=server;conn_=}voidhandle_write(void*arg);voidhandle_read(void*classFtpServer{FtpServer(intport,intthread_num):port_(port),tp(thread_num){for(inti=0;i<Max_Connection;i++)conn[i]=}~FtpServer()for(inti=0;i< t;i++){deleteconn[i];conn[i]=}}Connection*get_connection(intconn_sock);voidaccept_client();voidinit_server();voidrun();voidsend_rsp(Connection*conn,packet*hp,intvoidput(Connection*conn,packet*hp);intlisten_sock, intport_;intepoll_fd;stringConnection*conn[Max_Connection];Threooltp;#endif/*FTP_SERVER_H_#include//createandvoidFtpServer::init_server()//creaistenif((listen_sock=socket(AF_INET,SOCK_STREAM,0))==-1){fprintf(stderr,"socketerror:%s\n",strerror(errno));}//fillsockaddrstructuresockaddr_inserver_addr;bzero(&server_addr,sizeof(sockaddr_in));server_addr.sin_family=AF_INET;//if(bind(listen_sock,(sockaddr*)(&server_addr),sizeof(sockaddr))==-1){fprintf(stderr,"binderror:%s\n",strerror(errno));}}voidFtpServer::run()//if(listen(listen_sock,5)==-1){perror("listenerror");}intret;epoll_event*epoll_fd=epoll_create1(0);if(epoll_fd==-1){}event.events=EPOLLIN;ret=epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&event);if(ret==-1){}//bufferwhereeventsare//theeventloopwhile(true){intn=epoll_wait(epoll_fd,events,Max_Events,-for(inti=0;i<n;i++)if(events[i].data.fd==listen_sock)//anewclienthascome}else
//getdatafromConnection*conn=if(conn->in_pool())//printf("wastingtime,events=%d\n",events[i]events);}//printf("epoll%d,event%d\n",i,events[i].events);if(events[i].events&EPOLLIN){//puts("putreadServerCallArgs*arg=(ServerCallArgs*)malloc(sizeof(ServerCallArgs));arg->server_=this;arg->conn_=tp.put_task(newtp_task(handle_read,}elseif(events[i].events&EPOLLOUT)//puts("putwriteServerCallArgs*arg=(ServerCallArgs*)malloc(sizeof(ServerCallArgs));arg->server_=this;arg->conn_=tp.put_task(newtp_task(handle_write,}}}}}voidFtpServer::accept_client(){while(true){sockaddr_insocklen_tsin_size= t]=accept(listen_sock,(sockaddr*)&client_addr,if t]==-1)if(errno==EAGAIN||errno==EWOULDBLOCK)//wehaveprocessedall ingconnections.}else}}printf("newconnectionclient[%d]intret=make_socket_non_blocking(conn_sock[ if(ret==-1)t]=newt,if(ret==-1){}}}Connection*FtpServer::get_connection(intconn_sock){Connection*conn_c=NULL;for(inti=0;i t;i++)if(conn[i]->conn_sock()==conn_sock){conn_c=conn[i];}}return}voidhandle_write(void*arg)ServerCallArgs*args=(ServerCallArgs*)arg;FtpServer*server=args->server_;Connection*conn=args->conn_;intis_empty=conn->schedule_send_job();if(args)if(is_empty)}else}}
voidhandle_read(void*arg)ServerCallArgs*args=(ServerCallArgs*)arg;FtpServer*server=args->server_;Connection*conn=args->conn_;intpacket*hp=conn->hp;count=conn->recv(hp);if(count==-1){//iferror==EAGAIN,thatmeanswehavereadalldata.Sogobacktothemainloopif(errno!=EAGAIN){perror("readdatafrom}if(!conn->is_terminate())}}elseif(count==0)if(!conn->is_terminate())}}switch(hp->comd){casePUT:}if(args)}}voidFtpServer::send_rsp(Connection*conn,packet*hp,intstatus){hp->type=hp->comd=printf("type=%d,comd=%d\n",hp->type,hp->comd);}voidFtpServer::put(Connection*conn,packet*hp){charf_name[100];staticFILE*if(hp->type==REQ){strcpy(f_name,hp->buf);stringstr=base_path+"/"+f_name;filename=str;if(access(str.c_str(),W_OK)==0)printf("accessOK\n");if(f==NULL)perror("put,openfileerror");send_rsp(conn,hp,FAIL);return;}elseprintf("Openfile}send_rsp(conn,hp,OK);conn->put_bytes=0;conn->put_packets=}elsebooldone=FileTransfer::recv_file_server(filename.c_str(),conn,hp,}}#include<iostream>#include<cstdio>#include<cstdlib>#include<unistd.h>#include<string.h>#include<string>#include<errno.h>#include<netinet/in.h>#include<sys/select.h>#include<sys/types.h>#include<pthread.h>#include<arpa/inet.h>#include<signal.h>namespaceconstintOK1;constintDOWN=0;constintHOST=1;constintBACKUP=0;typedefstruct{intbackupState;//1表示正常,0typedefstruct{int_HostOrBackup;//1表示主,0voidinitBackupState(BState*bs);voidsetBackupStateOn(BState*bs);intgetBackupState(BState*bs);intvoidinitHostOrBackup(HOBFlagvoidsetHostFlag(HOBFlag*hb);voidsetBackupFlag(HOBFlag*hb);intIsHost(HOBFlag*hb);voidusecsleep(intsec,intusec);voidsetIntervalTime(inttimeval);voidsetServerIPandPort(constchar*ip,constunsigned/r
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025-2030全球光學(xué)透明粘合帶行業(yè)調(diào)研及趨勢(shì)分析報(bào)告
- 2025合同范本勞務(wù)派遣合同模板書(shū)人力資源和企業(yè)新
- 2025用戶(hù)服務(wù)合同
- 2025委托律師代理合同范本范文
- 土地轉(zhuǎn)讓居間合同
- 美容師勞動(dòng)合同書(shū)
- 消殺服務(wù)合同范文
- 2025公司用工合同范本
- 戰(zhàn)略合作協(xié)議書(shū)合同
- 小區(qū)監(jiān)控改造方案
- 第1課+古代亞非(教學(xué)設(shè)計(jì))【中職專(zhuān)用】《世界歷史》(高教版2023基礎(chǔ)模塊)
- 新教科版六年級(jí)下冊(cè)科學(xué)全冊(cè)教案
- 物業(yè)客服管家的培訓(xùn)課件
- 2024年房地產(chǎn)行業(yè)的樓市調(diào)控政策解讀培訓(xùn)
- 《統(tǒng)計(jì)學(xué)-基于Python》 課件全套 第1-11章 數(shù)據(jù)與Python語(yǔ)言-時(shí)間序列分析和預(yù)測(cè)
- 裝飾定額子目(河南省)
- 【高速鐵路乘務(wù)工作存在的問(wèn)題及對(duì)策研究9800字】
- 北師大版英語(yǔ)課文同步字帖三年級(jí)下冊(cè)課文對(duì)話(huà)原文及翻譯衡水體英語(yǔ)字帖三年級(jí)起點(diǎn)
- GB/T 2550-2016氣體焊接設(shè)備焊接、切割和類(lèi)似作業(yè)用橡膠軟管
- GB/T 21295-2014服裝理化性能的技術(shù)要求
評(píng)論
0/150
提交評(píng)論