打印

之前的代码好久没看了,开始看不懂,马上就又看懂了。

[复制链接]
1066|0
手机看帖
扫描二维码
随时随地手机跟帖
跳转到指定楼层
楼主
keer_zu|  楼主 | 2015-11-13 09:58 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
#include <iostream>
#include "interface_api_imp.h"

#include "ds_tele_server.h"
#include "log4cxx_wrapper.h"




using namespace cdy;



SessionManagerImp::SessionManagerImp(/*SessionManager &wrapper*/)
        :m_appEventListener(NULL)
        ,m_sessionUser(NULL)
        ,m_idleConnectionChecker(-1)
        ,m_connectionTimeout(-1)
        ,m_timerHandler(NULL)
{
       
        struct timeval tv1={10, 0};
        m_idleConnectionChecker = EventLooper::GetInstance().ScheduleTimer(&tv1, TF_FIRE_PERIODICALLY, this);


        struct timeval tv2={1, 0};
        m_connectionTimeout = EventLooper::GetInstance().ScheduleTimer(&tv2, TF_FIRE_PERIODICALLY, this);

       
}

SessionManagerImp::~SessionManagerImp()
{
        if(m_appListener){
                m_appListener->Stop();
                delete  m_appListener;
        }

        if(m_tsListener){
                m_tsListener->Stop();
                delete m_tsListener;
        }

        EventLooper::GetInstance().CancelTimer(m_idleConnectionChecker);
        EventLooper::GetInstance().CancelTimer(m_connectionTimeout);
}

void SessionManagerImp::NewTimer(unsigned int dwInterval,TimerHandler pTimerHander )
{
        struct timeval tv={dwInterval, 0};
       
        m_timerForApp = EventLooper::GetInstance().ScheduleTimer(&tv, TF_FIRE_PERIODICALLY, this);

        if(m_timerHandler == NULL)
                m_timerHandler = pTimerHander;
}
       
void SessionManagerImp::KillTimer(void)
{
        EventLooper::GetInstance().CancelTimer(m_timerForApp);
        if(m_timerHandler != NULL)
                m_timerHandler = NULL;
}


SessionManagerImp& SessionManagerImp::GetInstance()
{
        static  SessionManagerImp instance_;
        return instance_;

}

bool SessionManagerImp::StartListenForAppServer(IAppEventListener &listener)
{
        LoggerWrapper &loggerUser = LoggerWrapper::GetInstance();
        short port;
       
        m_appEventListener = &listener;

        const string& addr = listener.GetAppListenIp();
        port =  (short)listener.GetAppListenPort();

        m_appListener = new  Acceptor(addr, port,this);
       
        return m_appListener->StartListen() == 0;
}


bool SessionManagerImp::StartListenForTS(ITsSessionUser &sessionUser)
{
        short port;

        const string& addr = sessionUser.GetTsListenIp();

        port = (short)sessionUser.GetTsListenPort();

        m_sessionUser = &sessionUser;
       
        m_tsListener = new  Acceptor(addr, port,this);
       
       
        return m_tsListener->StartListen() == 0;
}

void SessionManagerImp::SendAppMessage(AppMessageBase& appMessage)
{//连接失败后尝试三次间隔时间暂定5分钟

        string json;
        string addr;
        u32 port;
        static unsigned int ConnectTimes = 0;
        Connector *connector=NULL;
        static unsigned int connectErrorTimes = 0;

        struct ConnectorInfo *connectorInfo;

        LoggerWrapper& dsLog = LoggerWrapper::GetInstance();

        appMessage.GetAppTargetInfo(addr,port);
       
        if (!connector)
                connector = new Connector(addr, port, this);

try_connect:
       
        if (connector->Connect(300) == 0){

               
                connectorInfo = new struct ConnectorInfo;//(struct ConnectorInfo *)malloc(sizeof(struct ConnectorInfo));
                if(connectorInfo == NULL){
                        dsLog.Log(TRUE,LOGGER_ERROR," new struct ConnectorInfo error! \n");
                        return;
                }
               
                appMessage.Encode(connectorInfo->msg);
                connectorInfo->re_connect_times = 0;
                m_connectors[connector] = connectorInfo;

                connectErrorTimes = 0;
               
        } else {
       
                delete connector;

                connectErrorTimes ++;
               
                if (connectErrorTimes < 3){
                        connector = new Connector(addr, port, this);
                        goto try_connect;
                }

                connectErrorTimes = 0;
               
        }

        return;

}

void SessionManagerImp::OnNewSession(SessionImp &session)
{
        list<SessionImp*>::iterator alit;
       
        if(session.m_sessionType = SESSION_APP_CLIENT){
               
        } else if(session.m_sessionType = SESSION_APP_SERVER){

        }
}

void SessionManagerImp::OnNewSession(TsSession &session)
{
        list<struct TsSessionInfo*>::iterator tlit;
       
}




void SessionManagerImp::OnSessionClose(SessionImp &session)
{
        LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
       
        list<SessionImp*>::iterator alit;
        SessionImp *session_imp;

       
        if(session.m_sessionType = SESSION_APP_CLIENT){
               
                for (alit = m_appClientSessionList.begin();alit != m_appClientSessionList.end();alit ++) {
                        session_imp = *alit;
                        if(session_imp == &session){
                                m_appClientSessionList.erase(alit);  
                               
                                break;
                        }
                }
               
        } else if(session.m_sessionType = SESSION_APP_SERVER){
                for (alit = m_appServerSessionList.begin();alit != m_appServerSessionList.end();alit ++) {
                        session_imp = *alit;
                        if(session_imp == &session){
                                m_appServerSessionList.erase(alit);   
                               
                                break;
                        }
                }

        }
}

void SessionManagerImp::OnSessionClose(TsSession &session)
{
        struct TsSessionInfo *tsSessionInfo;
        list<struct TsSessionInfo*>::iterator tlit;

        if(m_sessionUser != NULL)
                m_sessionUser->OnServerLeave(session);
       
        for (tlit = m_tsSessionList.begin();tlit != m_tsSessionList.end();tlit ++) {
                tsSessionInfo = *tlit;
                if(tsSessionInfo->session == &session){
                       
                        m_tsSessionList.erase(tlit);   

                        delete tsSessionInfo;
                       
                        break;
                }
        }
}

void SessionManagerImp::OnConnection(Connection *conn, IConnectorAcceptor *ca)
{
        int ret;
       
        TsSession *ts_session;
        struct TsSessionInfo *tsSessionInfo;
        TsSessionImp *ts_session_imp;
        SessionImp *session_imp;


        map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator it;

        LoggerWrapper& dsLog = LoggerWrapper::GetInstance();

        if(!ca){
                return;
        }

        if (conn){
                if (ca->IsConnector() == false){//accepter
                        if (ca == m_tsListener) {//ts

                                ts_session = new TsSession();
                                if(ts_session == NULL){
                                        dsLog.Log(TRUE,LOGGER_ERROR," new TsSession error! \n");
                                        return;
                                }
                               
                                conn->SetConnectionSink(ts_session->m_imp);

                                tsSessionInfo = new struct TsSessionInfo;
                                if (tsSessionInfo == NULL){
                                       
                                        dsLog.Log(TRUE,LOGGER_ERROR,"malloc error!\n");
                                }

                                tsSessionInfo->session = ts_session;
                               
                                ts_session_imp = ts_session->m_imp;
                                ts_session_imp->SetSessionUser(this->m_sessionUser);
                                ts_session_imp->SetConnection(conn);

                                tsSessionInfo->timeOut = 0;
                               
                                m_tsSessionList.push_back(tsSessionInfo);
                                OnNewSession(*ts_session);
                               
                        } else if (ca == m_appListener){ //app

                                session_imp = new AppServerSessionImp(conn,this->m_appEventListener);
                                if(session_imp == NULL){
                                        dsLog.Log(TRUE,LOGGER_ERROR," new AppServerSessionImp error! \n");
                                        return;
                                }
                               
                                conn->SetConnectionSink(session_imp);
                                session_imp->SetConnection(conn);

                                m_appServerSessionList.push_back(session_imp);
                                OnNewSession(*session_imp);
                               
                        } else {
                               
                        }
                       
                } else {//connector
                        it = m_connectors.find(ca);
                        if(it != m_connectors.end()){  //

                                session_imp = new AppClientSessionImp(conn);
                                if(session_imp == NULL){
                                        dsLog.Log(TRUE,LOGGER_ERROR," new AppClientSessionImp error! \n");
                                        return;
                                }
                               
                                conn->SetConnectionSink(session_imp);
                                session_imp->SetConnection(conn);

                                m_appClientSessionList.push_back(session_imp);
                                OnNewSession(*session_imp);

                                if(it->second != NULL)
                                        session_imp->SendMessage(it->second->msg);
                                       
                                delete ca;

                                if(it->second != NULL)
                                        delete it->second;
                                m_connectors.erase(it);
                               
                        } else {  // error ,Can not find connector
                                dsLog.Log(TRUE,LOGGER_ERROR,"Can not find connector! :%d",__LINE__);
                                delete ca;
                        }
                }
        } else {     
   
//                cout << "not connected, try again!" << endl;
                if (ca->IsConnector() == true){//connector
                        it = m_connectors.find(ca);
                        if(it == m_connectors.end()){  //This connector does not exist
                                cout << "This connector does not exist :%d" << endl;
                        } else {
                                if(it->second != NULL){
                                        if(it->second->re_connect_times == 0){
                                                it->second->re_connect_times = 3;    //Join the Error handling map
                                        } else {
                                                if(it->second->re_connect_times == 1){  //Connection failed three times
                                                        delete ca;
                                                        delete it->second;
                                                        m_connectors.erase(it); //it->second
                                                } else {
                                                        it->second->re_connect_times -= 1;
                                                }
                                        }
                                }
                        }
                }
        }
}


void SessionManagerImp::OnTimer(TimerID tid)
{
        struct TsSessionInfo *tsSessionInfo;
        string json;
        SvrLinkCheck svrLinkCheck;
        map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator mit;
        list<struct TsSessionInfo *>::iterator it;
        TsMessageBase *msg;
        TsSession *tsSession;
        LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
       
        if (tid == m_connectionTimeout){
                 for(mit = m_connectors.begin();mit != m_connectors.end();){
                         if(mit->second != NULL){
                                if(mit->second->re_connect_times >= 1){
                                        if(((Connector *)(mit->first))->Connect(300) != 0){// Remove from map directly
                                                if(mit->first != NULL)
                                                        delete mit->first;
                                                delete mit->second;
                                                m_connectors.erase(mit ++);
                                        } else {
                                                mit ++;
                                        }
                                       
                                } else {
                                        mit ++;
                                }
                         }
                 }
        } else if(tid == m_idleConnectionChecker) {
                for(it = m_tsSessionList.begin();it != m_tsSessionList.end();) {
                       
                        tsSessionInfo = *(it++);
                        if(tsSessionInfo == NULL){
                                dsLog.Log(TRUE,LOGGER_ERROR," tsSessionInfo is NUL!");
                                return;
                        }
                       
                        tsSession = tsSessionInfo->session;
                        if(tsSession == NULL){
                                dsLog.Log(TRUE,LOGGER_ERROR," tsSession is NUL!");
                                return;
                        }

                        if(tsSessionInfo->timeOut == 1) {  // Timeout handling
                       
                                OnSessionClose(*tsSessionInfo->session);
                                tsSession->Close();

                        } else {

                                msg = svrLinkCheck.Decode(json);
                               
                                if(msg){
                                       
                                        tsSessionInfo->session->SendMessage(*msg);
                                        tsSessionInfo->timeOut = 1;
                                       
                                } else {
                                        dsLog.Log(TRUE,LOGGER_ERROR,"Message svrLinkCheck err! :%d",__LINE__);
                                }
                        }
                }
        }else if(tid == m_timerForApp){
                        m_timerHandler();               
        }

}

IAppEventListener* SessionManagerImp::GetAppEventListener() const
{
        return m_appEventListener;
}

ITsSessionUser* SessionManagerImp::GetSessionUser() const
{
        return m_sessionUser;
}


void SessionManagerImp::ShowConnectors()
{
        int i;
        map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator mit;
       
        DsPrintf("\n\n\n++++++ connectors:\n");
        for(mit = m_connectors.begin(),i = 0;mit != m_connectors.end();mit ++,i ++){
                DsPrintf("----------------the %dth elem: \n",i);
                DsPrintf("connector: 0x%x\n",(Connector *)(mit->first));
                DsPrintf("msg: %s\n",((struct ConnectorInfo *)mit->second)->msg.data());
                DsPrintf("re_connect_times: %d\n",((struct ConnectorInfo *)mit->second)->re_connect_times);
                DsPrintf("\n\n\n");
        }
       
}


void SessionManagerImp::ShowTsSessionList()
{
        int i;
        struct TsSessionInfo *tsSessionInfo;
        list<struct TsSessionInfo *>::iterator it;

        DsPrintf("\n\n\n++++++ ts_session_list:\n");
        for(it = m_tsSessionList.begin(),i = 0;it != m_tsSessionList.end();it ++,i ++) {
                tsSessionInfo = *it;
                DsPrintf("----------------the %dth elem:  \n",i);
                DsPrintf("session: 0x%x\n",tsSessionInfo->session);
                DsPrintf("timeOut:%d\n",tsSessionInfo->timeOut);
                DsPrintf("\n\n\n");
        }
}

void SessionManagerImp::ShowAppServerSessionList()
{
        int i;
        list<SessionImp*>::iterator alit;

        DsPrintf("\n\n\n++++++ app_server_session_list:\n");
        for (alit = m_appServerSessionList.begin(),i = 0;alit != m_appServerSessionList.end();alit ++,i ++) {
                DsPrintf("----------------the %dth elem:   \n",i);
                DsPrintf("session_imp: 0x%x\n",*alit);
                DsPrintf("\n\n\n");
        }
               
}

void SessionManagerImp::ShowAppClientSessionList()
{
        int i;
        list<SessionImp*>::iterator alit;

        DsPrintf("\n\n\n++++++ app_client_session_list:\n");
        for (alit = m_appClientSessionList.begin();alit != m_appClientSessionList.end();alit ++,i ++) {
                DsPrintf("----------------the %dth elem:    \n",i);
                DsPrintf("session_imp: 0x%x\n",*alit);
                DsPrintf("\n\n\n");
        }

}

void ShowSessionManagerDataStruct()
{
        SessionManagerImp& smi =  SessionManagerImp::GetInstance();

        smi.ShowConnectors();
        smi.ShowTsSessionList();
        smi.ShowAppServerSessionList();
        smi.ShowAppClientSessionList();
}




相关帖子

发新帖 我要提问
您需要登录后才可以回帖 登录 | 注册

本版积分规则

1352

主题

12436

帖子

53

粉丝