- #include <iostream>
- #include "InterfaceApiImp.h"
- #include "ConfigManager.h"
- #include "Log4cxxWrapper.h"
- #include "DataShow.h"
- #include "json/json.h"
- #include <execinfo.h>
- #include <string.h>
- #include "AppResultSubmit.h"
- using namespace cdy;
- SessionManagerImp::SessionManagerImp(/*SessionManager &wrapper*/)
- //:m_appEventListener(NULL)
- //,m_sessionUser(NULL)
- :m_idleConnectionChecker(-1)
- ,m_connectionTimeout(-1)
- //,m_timerHandler(NULL)
- ,m_dataShow(NULL)
- {
-
- struct timeval tv1={8, 0};
- m_idleConnectionChecker = EventLooper::GetInstance().ScheduleTimer(&tv1, TF_FIRE_PERIODICALLY, this);
- struct timeval tv2={1, 0};
- m_connectionTimeout = EventLooper::GetInstance().ScheduleTimer(&tv2, TF_FIRE_PERIODICALLY, this);
-
- }
- SessionManagerImp::~SessionManagerImp()
- {
- if(m_appListener){
- m_appListener->Stop();
- delete m_appListener;
- }
- if(m_tsListener){
- m_tsListener->Stop();
- delete m_tsListener;
- }
- EventLooper::GetInstance().CancelTimer(m_idleConnectionChecker);
- EventLooper::GetInstance().CancelTimer(m_connectionTimeout);
- }
- TimerID SessionManagerImp::NewTimer(unsigned int dwInterval,TimerHandler pTimerHander )
- {
- TimerID timerForApp;
- struct timeval tv={dwInterval, 0};
-
- timerForApp = EventLooper::GetInstance().ScheduleTimer(&tv, TF_FIRE_PERIODICALLY, this);
- m_timerMap[timerForApp] = pTimerHander;
- printf("timerID: %d %s:%d\n",timerForApp,__FUNCTION__,__LINE__);
- //if(m_timerHandler == NULL)
- // m_timerHandler = pTimerHander;
- return timerForApp;
- }
-
- void SessionManagerImp::KillTimer(TimerID id)
- {
- EventLooper::GetInstance().CancelTimer(id);
- map<TimerID,TimerHandler>::iterator iTimer = m_timerMap.find(id);
- if(iTimer != m_timerMap.end())
- m_timerMap.erase(iTimer);
- //if(m_timerHandler != NULL)
- // m_timerHandler = NULL;
- }
- SessionManagerImp& SessionManagerImp::GetInstance()
- {
- static SessionManagerImp instance_;
- return instance_;
- }
- bool SessionManagerImp::StartListenForAppServer(IAppNewFrame &appNewFrame)
- {
- LoggerWrapper &loggerUser = LoggerWrapper::GetInstance();
- short port;
- //IAppEventListener listener;
-
- //m_appEventListener = &listener;
-
- m_appNewFrame = &appNewFrame;
- m_appResultSubmit = &DocAppResultSubmit::GetInstance();//&appResultSubmit;
- m_appResultSubmit->add(&VideoAppResultSubmit::GetInstance());
- m_appResultSubmit->add(&DpptAppResultSubmit::GetInstance());
- m_appResultSubmit->add(&NoUseTsAppResultSubmit::GetInstance());
-
- const string& addr = ConfigManager::GetIp();//appInfo.GetAppListenIp();
- port = (short)ConfigManager::GetAppPort();//(short)appInfo.GetAppListenPort();
- m_appListener = new Acceptor(addr, port,this);
-
- return m_appListener->StartListen() == 0;
- }
- bool SessionManagerImp::StartListenForTS(ITsNewFrame &tsNewFrame,ITsServerLeave &tsServerLeave)
- {
- m_tsNewFrame = &tsNewFrame;
- m_tsServerLeave = &tsServerLeave;
-
- m_tsListener = new Acceptor(ConfigManager::GetIp(), (short)ConfigManager::GetTsPort(),this);
-
-
- return m_tsListener->StartListen() == 0;
- }
- bool SessionManagerImp::DestoryConnectToServer(string ip,unsigned int port)
- {
- map<TargetAttributeT *,IConnectorAcceptor *>::iterator it;
-
- for(it = m_longConnectors.begin();it != m_longConnectors.end();it ++){
- if(((TargetAttributeT *)it->first)->ip == ip && ((TargetAttributeT *)it->first)->port == port){
- // TODO: close this connection;
- if(it->first != NULL)
- delete it->first;
-
- if(it->second)
- delete it->second;
-
- m_longConnectors.erase(it);
-
- break;
- }
- }
- }
- void SessionManagerImp::SetDataShowProvider(IDataShowProvider &dataShow)
- {
- m_dataShow = &dataShow;
- printf("####################### datashow: 0x%x\n",m_dataShow);
- }
- void print_trace(void)
- {
- int i;
- const int MAX_CALLSTACK_DEPTH = 32; /* 需要打印堆栈的最大深度 */
- void *traceback[MAX_CALLSTACK_DEPTH]; /* 用来存储调用堆栈中的地址 */
- /* 利用 addr2line 命令可以打印出一个函数地址所在的源代码位置
- * 调用格式为: addr2line -f -e /tmp/a.out 0x400618
- * 使用前,源代码编译时要加上 -rdynamic -g 选项
- */
- char cmd[512] = "addr2line -f -e ";
- char *prog = cmd + strlen(cmd);
- /* 得到当前可执行程序的路径和文件名 */
- int r = readlink("/proc/self/exe",prog,sizeof(cmd)-(prog-cmd)-1);
- /* popen会fork出一个子进程来调用/bin/sh, 并执行cmd字符串中的命令,
- * 同时,会创建一个管道,由于参数是'w', 管道将与标准输入相连接,
- * 并返回一个FILE的指针fp指向所创建的管道,以后只要用fp往管理里写任何内容,
- * 内容都会被送往到标准输入,
- * 在下面的代码中,会将调用堆栈中的函数地址写入管道中,
- * addr2line程序会从标准输入中得到该函数地址,然后根据地址打印出源代码位置和函数名。
- */
- FILE *fp = popen(cmd, "w");
- /* 得到当前调用堆栈中的所有函数地址,放到traceback数组中 */
- int depth = backtrace(traceback, MAX_CALLSTACK_DEPTH);
- for (i = 0; i < depth; i++)
- {
- /* 得到调用堆栈中的函数的地址,然后将地址发送给 addr2line */
- fprintf(fp, "%p\n", traceback[i]);
- /* addr2line 命令在收到地址后,会将函数地址所在的源代码位置打印到标准输出 */
- }
- fclose(fp);
- }
- void SessionManagerImp::SendAppMessage(const string addr,const unsigned int port,const string msg,void* pTaskMessage)
- {//连接失败后尝试三次间隔时间暂定5分钟
- static unsigned int ConnectTimes = 0;
- Connector *connector=NULL;
- static unsigned int connectErrorTimes = 0;
- struct ConnectorInfo *connectorInfo;
- LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
- if(addr == "" || msg == "" || port < 0){
- printf("addr: %s msg: %s port:%d\n",addr.c_str(),msg.c_str(),port);
- // print_trace();
- dsLog.Log(true,LOGGER_ERROR," parameter error! line:%d\n",__LINE__);
- return;
- }
-
- if (!connector)
- connector = new Connector(addr, port, this);
- try_connect:
-
- if (connector->Connect(300) == 0){
- connectorInfo = new struct ConnectorInfo;//(struct ConnectorInfo *)malloc(sizeof(struct ConnectorInfo));
- if(connectorInfo == NULL){
- dsLog.Log(true,LOGGER_ERROR," new struct ConnectorInfo error! \n");
- return;
- }
-
- connectorInfo->msg = msg;
- connectorInfo->re_connect_times = 0;
- connectorInfo->pTaskMessage = pTaskMessage;
- m_connectors[connector] = connectorInfo;
- connectErrorTimes = 0;
-
- } else {
-
- delete connector;
- connectErrorTimes ++;
-
- if (connectErrorTimes < 3){
- connector = new Connector(addr, port, this);
- goto try_connect;
- }
- connectErrorTimes = 0;
-
- }
- return;
- }
- void SessionManagerImp::OnNewSession(SessionImp &session)
- {
- list<SessionImp*>::iterator alit;
-
- if(session.m_sessionType == SESSION_APP_CLIENT){
-
- } else if(session.m_sessionType == SESSION_APP_SERVER){
- }
- }
- void SessionManagerImp::OnNewSession(TsSession &session)
- {
- list<struct TsSessionInfo*>::iterator tlit;
-
- }
- void SessionManagerImp::OnSessionClose(SessionImp &session)
- {
- LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
-
- list<SessionImp*>::iterator alit;
- SessionImp *session_imp;
-
- if(session.m_sessionType == SESSION_APP_CLIENT){
-
- for (alit = m_appClientSessionList.begin();alit != m_appClientSessionList.end();alit ++) {
- session_imp = *alit;
- if(session_imp == &session){
- m_appClientSessionList.erase(alit);
-
- break;
- }
- }
-
- } else if(session.m_sessionType == SESSION_APP_SERVER){
- for (alit = m_appServerSessionList.begin();alit != m_appServerSessionList.end();alit ++) {
- session_imp = *alit;
- if(session_imp == &session){
- m_appServerSessionList.erase(alit);
-
- break;
- }
- }
- }
- }
- void SessionManagerImp::OnSessionClose(TsSession &session)
- {
- struct TsSessionInfo *tsSessionInfo;
- list<struct TsSessionInfo*>::iterator tlit;
- if(m_tsServerLeave != NULL)
- m_tsServerLeave->OnServerLeave(session);
-
- for (tlit = m_tsSessionList.begin();tlit != m_tsSessionList.end();tlit ++) {//printf("@@@@@@@@@@@ leng:%d 0x%x (%d)\n",m_tsSessionList.size(),tlit,__LINE__);
- tsSessionInfo = *tlit;
- if(tsSessionInfo->session == &session){
-
- m_tsSessionList.erase(tlit);
- delete tsSessionInfo;
-
- break;
- }
- }
- }
- void SessionManagerImp::OnConnection(Connection *conn, IConnectorAcceptor *ca)
- {
- int ret;
-
- TsSession *ts_session;
- struct TsSessionInfo *tsSessionInfo;
- //TsSessionImp *ts_session_imp;
- TsSessionImp *ts_session_imp;
- SessionImp *session_imp;
- TsClientSession *tsClientSession;
- //TsClientSessionImp *tsClientSessionImp;
- map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator it;
- map<TargetAttributeT *,IConnectorAcceptor *>::iterator lcit;
- LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
- if(!ca){
- return;
- }
- if (conn){
- if (ca->IsConnector() == false){//accepter
- if (ca == m_tsListener) {//ts
- ts_session = new TsSession();
- if(ts_session == NULL){
- dsLog.Log(true,LOGGER_ERROR," new TsSession error! \n");
- return;
- }
-
- conn->SetConnectionSink(ts_session->m_imp);
- tsSessionInfo = new struct TsSessionInfo;
- if (tsSessionInfo == NULL){
-
- dsLog.Log(true,LOGGER_ERROR,"malloc error!\n");
- }
- tsSessionInfo->session = ts_session;
-
- ts_session_imp = (TsSessionImp *)ts_session->m_imp;
- //ts_session_imp->SetSessionUser(this->m_sessionUser);
- ts_session_imp->SetConnection(conn);
- tsSessionInfo->timeOut = 0;
-
- m_tsSessionList.push_back(tsSessionInfo);
- OnNewSession(*ts_session);
-
- } else if (ca == m_appListener){ //app
- session_imp = new AppServerSessionImp(conn/*,this->m_appEventListener*/);
- if(session_imp == NULL){
- dsLog.Log(true,LOGGER_ERROR," new AppServerSessionImp error! \n");
- return;
- }
-
- conn->SetConnectionSink(session_imp);
- session_imp->SetConnection(conn);
- m_appServerSessionList.push_back(session_imp);
- OnNewSession(*session_imp);
-
- } else {
-
- }
-
- } else {//connector
- for(lcit = m_longConnectors.begin();lcit != m_longConnectors.end();lcit ++){
- if((IConnectorAcceptor *)lcit->second == ca){
- tsClientSession = new TsClientSession();
- if(session_imp == NULL){
- dsLog.Log(true,LOGGER_ERROR," new AppClientSessionImp error! \n");
- return;
- }
- //conn->SetConnectionSink(session_imp);
- //session_imp->SetConnection(conn);
- conn->SetConnectionSink(tsClientSession->m_imp);
- tsClientSession->m_imp->SetConnection(conn);
- TsClientSessionInfoT *info = new TsClientSessionInfoT;
- info->session =tsClientSession;//->m_imp->m_session;
- info->ip = ((TargetAttributeT *)lcit->first)->ip;
- info->port = ((TargetAttributeT *)lcit->first)->port;
-
- m_tsClinetSessionList.push_back(info);
- // TODO:OnNewSession(*session_imp);
-
- return;
- }
- }
-
- it = m_connectors.find(ca);
- if(it != m_connectors.end()){ //
- session_imp = new AppClientSessionImp(conn);
- if(session_imp == NULL){
- dsLog.Log(true,LOGGER_ERROR," new AppClientSessionImp error! \n");
- return;
- }
-
- conn->SetConnectionSink(session_imp);
- session_imp->SetConnection(conn);
- m_appClientSessionList.push_back(session_imp);
- OnNewSession(*session_imp);
- if(it->second != NULL){
- ret = session_imp->SendMessage(it->second->msg);
- m_appResultSubmit->OnResultSubmit(ret,it->second->pTaskMessage);
- }
- dsLog.Log(true,LOGGER_INFO,"Send app message:%s, %s:%s:%d",it->second->msg.c_str(),__FILE__,__PRETTY_FUNCTION__,__LINE__);
-
- delete ca;
- if(it->second != NULL)
- delete it->second;
- m_connectors.erase(it);
-
- } else { // error ,Can not find connector
- dsLog.Log(true,LOGGER_ERROR,"Can not find connector! :%d",__LINE__);
- delete ca;
- }
- }
- } else {
- //Added by wwz 160424 for test
- printf("conn is NULL\n");
-
- // cout << "not connected, try again!" << endl;
- if (ca->IsConnector() == true){//connector
- it = m_connectors.find(ca);
- if(it == m_connectors.end()){ //This connector does not exist
- cout << "This connector does not exist :%d" << endl;
- } else {
- if(it->second != NULL){
- if(it->second->re_connect_times == 0){
- it->second->re_connect_times = 3; //Join the Error handling map
- } else {
- if(it->second->re_connect_times == 1){ //Connection failed three times
- //Added by wwz 160424 for test,for connection failure
- printf("connection failure,send task result fail!\n");
- m_appResultSubmit->OnResultSubmit(ret,it->second->pTaskMessage);
-
- delete ca;
- delete it->second;
- m_connectors.erase(it); //it->second
- } else {
- it->second->re_connect_times -= 1;
- }
- }
- }
- }
- }
- }
- }
- void SessionManagerImp::OnTimer(TimerID tid)
- {
- struct TsSessionInfo *tsSessionInfo;
- string json;
- Json::Value root;
- root["type"] = "SvrLinkCheck";
- map<TimerID,TimerHandler>::iterator iTimer;
- //root.toStyledString();
- json = root.toStyledString();
-
- //SvrLinkCheck svrLinkCheck;
- map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator mit;
- list<struct TsSessionInfo *>::iterator it;
- map<TargetAttributeT *,IConnectorAcceptor *>::iterator lcit;
- //TsMessageBase *msg;
- TsSession *tsSession;
- LoggerWrapper& dsLog = LoggerWrapper::GetInstance();
- Connector *connector=NULL;
-
- if (tid == m_connectionTimeout){
- for(mit = m_connectors.begin();mit != m_connectors.end();){
- if(mit->second != NULL){
- if(mit->second->re_connect_times >= 1){
- if(((Connector *)(mit->first))->Connect(300) != 0){// Remove from map directly
- if(mit->first != NULL)
- delete mit->first;
- delete mit->second;
- m_connectors.erase(mit ++);
- } else {
- mit ++;
- }
-
- } else {
- mit ++;
- }
- }
- else
- {
- mit ++;
- }
- }
- for(lcit = m_longConnectors.begin();lcit != m_longConnectors.end();lcit ++){
- if (lcit->second == NULL){
- if(((TargetAttributeT *)lcit->first)->reconnectInterval >= LONG_CONNECT_RETRY_INTERFAL){
- if (!connector)
- connector = new Connector(((TargetAttributeT *)lcit->first)->ip,((TargetAttributeT *)lcit->first)->port, this);
- if (connector->Connect(300) == 0){
- lcit->second = connector;
- } else {
- ((TargetAttributeT *)lcit->first)->reconnectInterval = 0;
- }
- } else {
- ((TargetAttributeT *)lcit->first)->reconnectInterval += 1;
- }
- }
- }
- } else if(tid == m_idleConnectionChecker) {
- for(it = m_tsSessionList.begin();it != m_tsSessionList.end();) {//printf("@@@@@@@@@@@ leng:%d 0x%x (%d)\n",m_tsSessionList.size(),it,__LINE__);
-
- tsSessionInfo = *(it++);
- if(tsSessionInfo == NULL){
- dsLog.Log(true,LOGGER_ERROR," tsSessionInfo is NUL!");
- return;
- }
-
- tsSession = tsSessionInfo->session;
- if(tsSession == NULL){
- dsLog.Log(true,LOGGER_ERROR," tsSession is NUL!");
- return;
- }
- if(tsSessionInfo->timeOut == 1) { // Timeout handling
- dsLog.Log(true,LOGGER_DEBUG,"Timeout handling! session:0x%x sessionimp:0x%x\n",tsSessionInfo->session,tsSessionInfo->session->m_imp);
- OnSessionClose(*tsSessionInfo->session);
- tsSession->Close();
- } else {
-
- tsSessionInfo->session->SendMessage(/**msg*/json);
- tsSessionInfo->timeOut = 1;
- }
- }
- //}else if(tid == m_timerForApp){
- // m_timerHandler();
- } else { //app timer handle
-
- for(iTimer = m_timerMap.begin();iTimer != m_timerMap.end();++ iTimer){
- if(tid == iTimer->first){
- ((TimerHandler)iTimer->second)();
- break;
- }
- }
- }
- }
- //IAppEventListener* SessionManagerImp::GetAppEventListener() const
- //{
- // return m_appEventListener;
- //}
- IAppNewFrame *SessionManagerImp::GetAppNewFrame() const
- {
- return m_appNewFrame;
- }
- //ITsSessionUser* SessionManagerImp::GetSessionUser() const
- //{
- // return m_sessionUser;
- //}
- ITsNewFrame* SessionManagerImp::GetTsNewFrame() const
- {
- return m_tsNewFrame;
- }
- void SessionManagerImp::ShowConnectors()
- {
- int i;
- map<IConnectorAcceptor*,struct ConnectorInfo *>::iterator mit;
-
- if(m_dataShow == NULL)
- return;
-
- m_dataShow->DataPrint("\n\n\n++++++ connectors:\n");
- for(mit = m_connectors.begin(),i = 0;mit != m_connectors.end();mit ++,i ++){
- m_dataShow->DataPrint("----------------the %dth elem: \n",i);
- m_dataShow->DataPrint("connector: 0x%x\n",(Connector *)(mit->first));
- m_dataShow->DataPrint("msg: %s\n",((struct ConnectorInfo *)mit->second)->msg.data());
- m_dataShow->DataPrint("re_connect_times: %d\n",((struct ConnectorInfo *)mit->second)->re_connect_times);
- m_dataShow->DataPrint("\n\n\n");
- }
-
- }
- void SessionManagerImp::ShowTsSessionList()
- {
- int i;
- struct TsSessionInfo *tsSessionInfo;
- list<struct TsSessionInfo *>::iterator it;
-
- if(m_dataShow == NULL)
- return;
-
- m_dataShow->DataPrint("\n\n\n++++++ ts_session_list:\n");
- for(it = m_tsSessionList.begin(),i = 0;it != m_tsSessionList.end();it ++,i ++) {//printf("@@@@@@@@@@@ leng:%d 0x%x (%d)\n",m_tsSessionList.size(),it,__LINE__);
- tsSessionInfo = *it;
- m_dataShow->DataPrint("----------------the %dth elem: \n",i);
- m_dataShow->DataPrint("session: 0x%x\n",tsSessionInfo->session);
- m_dataShow->DataPrint("timeOut:%d\n",tsSessionInfo->timeOut);
- m_dataShow->DataPrint("\n\n\n");
- }
- }
- void SessionManagerImp::ShowAppServerSessionList()
- {
- int i;
- list<SessionImp*>::iterator alit;
-
- if(m_dataShow == NULL)
- return;
- m_dataShow->DataPrint("\n\n\n++++++ app_server_session_list:\n");
- for (alit = m_appServerSessionList.begin(),i = 0;alit != m_appServerSessionList.end();alit ++,i ++) {
- m_dataShow->DataPrint("----------------the %dth elem: \n",i);
- m_dataShow->DataPrint("session_imp: 0x%x\n",*alit);
- m_dataShow->DataPrint("\n\n\n");
- }
-
- }
- void SessionManagerImp::ShowAppClientSessionList()
- {
- int i;
- list<SessionImp*>::iterator alit;
-
- if(m_dataShow == NULL)
- return;
-
- m_dataShow->DataPrint("\n\n\n++++++ app_client_session_list:\n");
- for (alit = m_appClientSessionList.begin();alit != m_appClientSessionList.end();alit ++,i ++) {
- m_dataShow->DataPrint("----------------the %dth elem: \n",i);
- m_dataShow->DataPrint("session_imp: 0x%x\n",*alit);
- m_dataShow->DataPrint("\n\n\n");
- }
- }