#include <iostream>
#include "interface_api_imp.h"
#include "ds_tele_server.h"
#include "log4cxx_wrapper.h"
using namespace cdy;
SessionManagerImp::SessionManagerImp(/*SessionManager &wrapper*/)
:m_appEventListener(NULL)
,m_sessionUser(NULL)
,m_idleConnectionChecker(-1)
,m_connectionTimeout(-1)
,m_timerHandler(NULL)
{
struct timeval tv1={10, 0};
m_idleConnectionChecker = EventLooper::GetInstance().ScheduleTimer(&tv1, TF_FIRE_PERIODICALLY, this);
struct timeval tv2={1, 0};
m_connectionTimeout = EventLooper::GetInstance().ScheduleTimer(&tv2, TF_FIRE_PERIODICALLY, this);
}
SessionManagerImp::~SessionManagerImp()
{
if(m_appListener){
m_appListener->Stop();
delete m_appListener;
}
if(m_tsListener){
m_tsListener->Stop();
delete m_tsListener;
}
EventLooper::GetInstance().CancelTimer(m_idleConnectionChecker);
EventLooper::GetInstance().CancelTimer(m_connectionTimeout);
}
void SessionManagerImp::NewTimer(unsigned int dwInterval,TimerHandler pTimerHander )
{
struct timeval tv={dwInterval, 0};
m_timerForApp = EventLooper::GetInstance().ScheduleTimer(&tv, TF_FIRE_PERIODICALLY, this);
if(m_timerHandler == NULL)
m_timerHandler = pTimerHander;
}
void SessionManagerImp::KillTimer(void)
{
EventLooper::GetInstance().CancelTimer(m_timerForApp);
if(m_timerHandler != NULL)
m_timerHandler = NULL;
}
SessionManagerImp& SessionManagerImp::GetInstance()
{
static SessionManagerImp instance_;
return instance_;
}
bool SessionManagerImp::StartListenForAppServer(IAppEventListener &listener)
{
LoggerWrapper &loggerUser = LoggerWrapper::GetInstance();
short port;
m_appEventListener = &listener;
const string& addr = listener.GetAppListenIp();
port = (short)listener.GetAppListenPort();
m_appListener = new Acceptor(addr, port,this);
return m_appListener->StartListen() == 0;
}
bool SessionManagerImp::StartListenForTS(ITsSessionUser &sessionUser)
{
short port;
const string& addr = sessionUser.GetTsListenIp();
port = (short)sessionUser.GetTsListenPort();
m_sessionUser = &sessionUser;
m_tsListener = new Acceptor(addr, port,this);
return m_tsListener->StartListen() == 0;
}
void SessionManagerImp::SendAppMessage(AppMessageBase& appMessage)
{//连接失败后尝试三次间隔时间暂定5分钟
string json;
string addr;
u32 port;
static unsigned int ConnectTimes = 0;
Connector *connector=NULL;
static unsigned int connectErrorTimes = 0;
struct ConnectorInfo *connectorInfo;
LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
appMessage.GetAppTargetInfo(addr,port);
if (!connector)
connector = new Connector(addr, port, this);
try_connect:
if (connector->Connect(300) == 0){
connectorInfo = new struct ConnectorInfo;//(struct ConnectorInfo *)malloc(sizeof(struct ConnectorInfo));
if(connectorInfo == NULL){
dsLog.Log(TRUE,LOGGER_ERROR," new struct ConnectorInfo error! \n");
return;
}
appMessage.Encode(connectorInfo->msg);
connectorInfo->re_connect_times = 0;
m_connectors[connector] = connectorInfo;
connectErrorTimes = 0;
} else {
delete connector;
connectErrorTimes ++;
if (connectErrorTimes < 3){
connector = new Connector(addr, port, this);
goto try_connect;
}
connectErrorTimes = 0;
}
return;
}
void SessionManagerImp::OnNewSession(SessionImp &session)
{
list<SessionImp*>::iterator alit;
if(session.m_sessionType = SESSION_APP_CLIENT){
} else if(session.m_sessionType = SESSION_APP_SERVER){
}
}
void SessionManagerImp::OnNewSession(TsSession &session)
{
list<struct TsSessionInfo*>::iterator tlit;
}
void SessionManagerImp::OnSessionClose(SessionImp &session)
{
LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
list<SessionImp*>::iterator alit;
SessionImp *session_imp;
if(session.m_sessionType = SESSION_APP_CLIENT){
for (alit = m_appClientSessionList.begin();alit != m_appClientSessionList.end();alit ++) {
session_imp = *alit;
if(session_imp == &session){
m_appClientSessionList.erase(alit);
break;
}
}
} else if(session.m_sessionType = SESSION_APP_SERVER){
for (alit = m_appServerSessionList.begin();alit != m_appServerSessionList.end();alit ++) {
session_imp = *alit;
if(session_imp == &session){
m_appServerSessionList.erase(alit);
break;
}
}
}
}
void SessionManagerImp::OnSessionClose(TsSession &session)
{
struct TsSessionInfo *tsSessionInfo;
list<struct TsSessionInfo*>::iterator tlit;
if(m_sessionUser != NULL)
m_sessionUser->OnServerLeave(session);
for (tlit = m_tsSessionList.begin();tlit != m_tsSessionList.end();tlit ++) {
tsSessionInfo = *tlit;
if(tsSessionInfo->session == &session){
m_tsSessionList.erase(tlit);
delete tsSessionInfo;
break;
}
}
}
void SessionManagerImp::OnConnection(Connection *conn, IConnectorAcceptor *ca)
{
int ret;
TsSession *ts_session;
struct TsSessionInfo *tsSessionInfo;
TsSessionImp *ts_session_imp;
SessionImp *session_imp;
map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator it;
LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
if(!ca){
return;
}
if (conn){
if (ca->IsConnector() == false){//accepter
if (ca == m_tsListener) {//ts
ts_session = new TsSession();
if(ts_session == NULL){
dsLog.Log(TRUE,LOGGER_ERROR," new TsSession error! \n");
return;
}
conn->SetConnectionSink(ts_session->m_imp);
tsSessionInfo = new struct TsSessionInfo;
if (tsSessionInfo == NULL){
dsLog.Log(TRUE,LOGGER_ERROR,"malloc error!\n");
}
tsSessionInfo->session = ts_session;
ts_session_imp = ts_session->m_imp;
ts_session_imp->SetSessionUser(this->m_sessionUser);
ts_session_imp->SetConnection(conn);
tsSessionInfo->timeOut = 0;
m_tsSessionList.push_back(tsSessionInfo);
OnNewSession(*ts_session);
} else if (ca == m_appListener){ //app
session_imp = new AppServerSessionImp(conn,this->m_appEventListener);
if(session_imp == NULL){
dsLog.Log(TRUE,LOGGER_ERROR," new AppServerSessionImp error! \n");
return;
}
conn->SetConnectionSink(session_imp);
session_imp->SetConnection(conn);
m_appServerSessionList.push_back(session_imp);
OnNewSession(*session_imp);
} else {
}
} else {//connector
it = m_connectors.find(ca);
if(it != m_connectors.end()){ //
session_imp = new AppClientSessionImp(conn);
if(session_imp == NULL){
dsLog.Log(TRUE,LOGGER_ERROR," new AppClientSessionImp error! \n");
return;
}
conn->SetConnectionSink(session_imp);
session_imp->SetConnection(conn);
m_appClientSessionList.push_back(session_imp);
OnNewSession(*session_imp);
if(it->second != NULL)
session_imp->SendMessage(it->second->msg);
delete ca;
if(it->second != NULL)
delete it->second;
m_connectors.erase(it);
} else { // error ,Can not find connector
dsLog.Log(TRUE,LOGGER_ERROR,"Can not find connector! :%d",__LINE__);
delete ca;
}
}
} else {
// cout << "not connected, try again!" << endl;
if (ca->IsConnector() == true){//connector
it = m_connectors.find(ca);
if(it == m_connectors.end()){ //This connector does not exist
cout << "This connector does not exist :%d" << endl;
} else {
if(it->second != NULL){
if(it->second->re_connect_times == 0){
it->second->re_connect_times = 3; //Join the Error handling map
} else {
if(it->second->re_connect_times == 1){ //Connection failed three times
delete ca;
delete it->second;
m_connectors.erase(it); //it->second
} else {
it->second->re_connect_times -= 1;
}
}
}
}
}
}
}
void SessionManagerImp::OnTimer(TimerID tid)
{
struct TsSessionInfo *tsSessionInfo;
string json;
SvrLinkCheck svrLinkCheck;
map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator mit;
list<struct TsSessionInfo *>::iterator it;
TsMessageBase *msg;
TsSession *tsSession;
LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
if (tid == m_connectionTimeout){
for(mit = m_connectors.begin();mit != m_connectors.end();){
if(mit->second != NULL){
if(mit->second->re_connect_times >= 1){
if(((Connector *)(mit->first))->Connect(300) != 0){// Remove from map directly
if(mit->first != NULL)
delete mit->first;
delete mit->second;
m_connectors.erase(mit ++);
} else {
mit ++;
}
} else {
mit ++;
}
}
}
} else if(tid == m_idleConnectionChecker) {
for(it = m_tsSessionList.begin();it != m_tsSessionList.end();) {
tsSessionInfo = *(it++);
if(tsSessionInfo == NULL){
dsLog.Log(TRUE,LOGGER_ERROR," tsSessionInfo is NUL!");
return;
}
tsSession = tsSessionInfo->session;
if(tsSession == NULL){
dsLog.Log(TRUE,LOGGER_ERROR," tsSession is NUL!");
return;
}
if(tsSessionInfo->timeOut == 1) { // Timeout handling
OnSessionClose(*tsSessionInfo->session);
tsSession->Close();
} else {
msg = svrLinkCheck.Decode(json);
if(msg){
tsSessionInfo->session->SendMessage(*msg);
tsSessionInfo->timeOut = 1;
} else {
dsLog.Log(TRUE,LOGGER_ERROR,"Message svrLinkCheck err! :%d",__LINE__);
}
}
}
}else if(tid == m_timerForApp){
m_timerHandler();
}
}
IAppEventListener* SessionManagerImp::GetAppEventListener() const
{
return m_appEventListener;
}
ITsSessionUser* SessionManagerImp::GetSessionUser() const
{
return m_sessionUser;
}
void SessionManagerImp::ShowConnectors()
{
int i;
map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator mit;
DsPrintf("\n\n\n++++++ connectors:\n");
for(mit = m_connectors.begin(),i = 0;mit != m_connectors.end();mit ++,i ++){
DsPrintf("----------------the %dth elem: \n",i);
DsPrintf("connector: 0x%x\n",(Connector *)(mit->first));
DsPrintf("msg: %s\n",((struct ConnectorInfo *)mit->second)->msg.data());
DsPrintf("re_connect_times: %d\n",((struct ConnectorInfo *)mit->second)->re_connect_times);
DsPrintf("\n\n\n");
}
}
void SessionManagerImp::ShowTsSessionList()
{
int i;
struct TsSessionInfo *tsSessionInfo;
list<struct TsSessionInfo *>::iterator it;
DsPrintf("\n\n\n++++++ ts_session_list:\n");
for(it = m_tsSessionList.begin(),i = 0;it != m_tsSessionList.end();it ++,i ++) {
tsSessionInfo = *it;
DsPrintf("----------------the %dth elem: \n",i);
DsPrintf("session: 0x%x\n",tsSessionInfo->session);
DsPrintf("timeOut:%d\n",tsSessionInfo->timeOut);
DsPrintf("\n\n\n");
}
}
void SessionManagerImp::ShowAppServerSessionList()
{
int i;
list<SessionImp*>::iterator alit;
DsPrintf("\n\n\n++++++ app_server_session_list:\n");
for (alit = m_appServerSessionList.begin(),i = 0;alit != m_appServerSessionList.end();alit ++,i ++) {
DsPrintf("----------------the %dth elem: \n",i);
DsPrintf("session_imp: 0x%x\n",*alit);
DsPrintf("\n\n\n");
}
}
void SessionManagerImp::ShowAppClientSessionList()
{
int i;
list<SessionImp*>::iterator alit;
DsPrintf("\n\n\n++++++ app_client_session_list:\n");
for (alit = m_appClientSessionList.begin();alit != m_appClientSessionList.end();alit ++,i ++) {
DsPrintf("----------------the %dth elem: \n",i);
DsPrintf("session_imp: 0x%x\n",*alit);
DsPrintf("\n\n\n");
}
}
void ShowSessionManagerDataStruct()
{
SessionManagerImp& smi = SessionManagerImp::GetInstance();
smi.ShowConnectors();
smi.ShowTsSessionList();
smi.ShowAppServerSessionList();
smi.ShowAppClientSessionList();
}
|
|