DataPresistence& DataPresistence::GetInstance(IQueueUser *queueUser)
{
static DataPresistence instance_(queueUser);
return instance_;
}
DataPresistence::DataPresistence(IQueueUser *queueUser)
:m_queueUser(queueUser)
{
}
//IDataPresistence &DataPresistence::CreatePresistence(IQueueUser *user)
//{
// return GetInstance(user);
//}
bool DataPresistence::DpGetMessage(DpMessageArryType &dpMsg,TaskType taskType)
{
set_task_dp_manager(taskType);
m_taskDpManager->GetTaskinfoFromDpFile(dpMsg);
return true;
}
bool DataPresistence::DpGetWaitMessage(DpMessageArryType &dpMsg,TaskType taskType)
{
return true;
}
bool DataPresistence::DpGetRunMessage(DpMessageArryType &dpMsg,TaskType taskType)
{
return true;
}
bool DataPresistence::CreateDpMsgFile(TaskType taskType)
{
set_task_dp_manager(taskType);
m_taskDpManager->CreateDpFile();
return true;
}
bool DataPresistence::CreateDpMsgFile(const DpMessageType &dpMsg,TaskType taskType)
{
set_task_dp_manager(taskType);
m_taskDpManager->CreateDpFile(dpMsg);
return true;
}
bool DataPresistence::CreateWaitMsgFile(TaskType taskType)
{
set_task_dp_manager(taskType);
m_taskDpManager->CreateDpFile(QueueWait);
}
bool DataPresistence::CreateRunMsgFile(TaskType taskType)
{
set_task_dp_manager(taskType);
m_taskDpManager->CreateDpFile(QueueRuning);
}
///////////////////////////////////// TransTaskDpManager ///////////////////////////////
bool TransTaskDpManager::WriteInfoToFile(DpMessageArryType &dpMsg)
{
unsigned short size;
//DpMessageType *msg;
char buf[1024];
int i,ret;
FILE*fp=NULL;
string folder = "dpfile";
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
memset(buf,0,sizeof(buf));
string provFileName = "dpfile/" + m_filePrefix + m_queueType + ".prov";
string filename = "dpfile/" + m_filePrefix + m_queueType + ".data";
if(!IsExistedDir(folder)){
MakeDir(folder);
return false;
}
fp=fopen(provFileName.c_str(),"ab+");
if(NULL==fp){
dsLog.Log(true,LOGGER_INFO,"%s:%s:%d \n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
size = dpMsg.size();
dsLog.Log(true,LOGGER_INFO,"size:%d, %s:%s:%d \n",size,__FILE__,__PRETTY_FUNCTION__,__LINE__);
buf[0] = size/256;
buf[1] = size%256;
fwrite(buf, 1, 2, fp);
string strMsg;
for(i = 0;i < size;i ++){
//msg = dpMsg[i];
strMsg = BuildJsonStr(dpMsg[i]);
//delete(msg);
dpMsg[i].clear();
dsLog.Log(true,LOGGER_INFO,"strMsg:%s, %s:%s:%d \n",strMsg.c_str(),__FILE__,__PRETTY_FUNCTION__,__LINE__);
buf[0] = strMsg.length()/256;
buf[1] = strMsg.length()%256;
fwrite(buf, 1, 2, fp);
fwrite(strMsg.c_str(),1,strMsg.length(),fp);
}
fclose(fp);
string cmd0 = "rm -rf " + filename;
systemcmd(cmd0);
string cmd = "mv " + provFileName + " " + filename;
systemcmd(cmd);
return true;
}
bool TransTaskDpManager::GetTaskinfoFromDpFile(DpMessageArryType &dpMsgArray,QueueType qt)
{
switch(qt){
case QueueWait:
m_queueType = "_WAIT";
break;
case QueueRuning:
m_queueType = "_RUN";
break;
default:
return false;
}
return GetTaskinfoFromDpFile(dpMsgArray);
}
bool TransTaskDpManager::GetTaskinfoFromDpFile(DpMessageArryType &dpMsgArray)
{
int i,j;
char buf[2096];
memset(buf,0,sizeof(buf));
unsigned short msgNum,readLen;
size_t ret;
FILE*fp=NULL;
string folder = "dpfile";
string filename = "dpfile/" + m_filePrefix + m_queueType + ".data";
string provFileName = "dpfile/" + m_filePrefix + m_queueType + ".prov";
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
if(!IsExistedDir(folder)){
MakeDir(folder);
return false;
}
fp=fopen(filename.c_str(),"r");
if(NULL==fp){
dsLog.Log(true,LOGGER_INFO,"%s:%s:%d \n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
fp = fopen(provFileName.c_str(),"r");
if(NULL==fp){
dsLog.Log(true,LOGGER_INFO,"%s:%s:%d \n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
string cmd = "mv " + provFileName + " " + filename;
systemcmd(cmd);
} else {
string cmd = "rm -rf " + provFileName;
systemcmd(cmd);
}
ret = fread(&msgNum,sizeof(unsigned short),1,fp);
msgNum = (msgNum%256)*256 + msgNum/256;
dsLog.Log(true,LOGGER_INFO,"%s:%s:%d msg num: %d\n",__FILE__,__PRETTY_FUNCTION__,__LINE__,msgNum);
for(i = 0;i < msgNum;i ++){
ret = fread(&readLen,sizeof(unsigned short),1,fp);
readLen = (readLen%256)*256 + readLen/256;
ret = fread(buf,1,readLen,fp);
string msg = buf;
DpMessageType dpMsg;// = new DpMessageType;
dsLog.Log(true,LOGGER_INFO,"%s:%s:%d msg : %s\n",__FILE__,__PRETTY_FUNCTION__,__LINE__,msg.c_str());
if(GetJsonMap(msg,dpMsg)){
dpMsgArray.push_back(dpMsg);
} else {
fclose(fp);
return false;
}
memset(buf,0,sizeof(buf));
}
fclose(fp);
return true;
}
bool TransTaskDpManager::CreateDpFile()
{
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
DpMessageArryType dpMsgArry;
return GetInfoAndWriteToFile(dpMsgArry);
}
bool TransTaskDpManager::CreateDpFile(QueueType qt)
{
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
DpMessageArryType dpMsgArry;
return GetInfoAndWriteToFile(dpMsgArry,qt);
}
bool TransTaskDpManager::CreateDpFile(const DpMessageType &dpMsg)
{
DpMessageArryType dpMsgArry;
//DpMessageType msg = dpMsg;
dpMsgArry.push_back(dpMsg);
return GetInfoAndWriteToFile(dpMsgArry);
}
bool TransTaskDpManager::GetInfoAndWriteToFile(DpMessageArryType &dpMsgArry)
{
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
//if(!GetInfoFromSystem(dpMsgArry)){
// dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetInfoFromSystem error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
// return false;
//}
bool ret = false;
QueueInfo *qi = new QueueInfoAdapter(*m_queueUser);
//InfoComponent* pCom = new ConcreteInfoComponent(dpMsgArry);
ConcreteInfoComponent com(dpMsgArry);
InfoAdder* pDec = NULL;
pDec = new RunQueueInfoAdder(&com,*qi,m_taskType);
pDec = new WaitQueueInfoAdder(pDec,*qi,m_taskType);
ret = pDec->Operation();
delete qi;
delete pDec;
if(ret == false){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetQueueInfo error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!WriteInfoToFile(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d WriteInfoToFile error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
return true;
}
bool TransTaskDpManager::GetInfoAndWriteToFile(DpMessageArryType &dpMsgArry,QueueType qt)
{
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
//if(!GetInfoFromSystem(dpMsgArry,qt)){
// dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetInfoFromSystem error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
// return false;
//}
QueueInfo *qi = new QueueInfoAdapter(*m_queueUser);
bool ret = false;
//InfoComponent* pCom = new ConcreteInfoComponent(dpMsgArry);
ConcreteInfoComponent com(dpMsgArry);
InfoAdder* pDec = NULL;
switch(qt){
case QueueWait:
m_queueType = "_WAIT";
pDec = new WaitQueueInfoAdder(&com,*qi,m_taskType);
break;
case QueueRuning:
m_queueType = "_RUN";
pDec = new RunQueueInfoAdder(&com,*qi,m_taskType);
break;
default:
ret = false;
}
ret = pDec->Operation();
delete qi;
delete pDec;
if(ret == false){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetQueueInfo error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!WriteInfoToFile(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d WriteInfoToFile error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
return true;
}
///////////////////////////////////// DocDpManager /////////////////////////
/*
bool DocDpManager::GetInfoAndWriteToFile(DpMessageArryType &dpMsgArry)
{
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
if(!m_queueUser->GetDocWaitQueueMessage(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetDocWaitQueueMessage error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!m_queueUser->GetDocRunQueueMessage(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetDocRunQueueMessage error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!WriteInfoToFile(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d WriteInfoToFile error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
return true;
}
bool DocDpManager::GetInfoFromSystem(DpMessageArryType &dpMsgArry)
{
QueueInfo *qi = new QueueInfoAdapter(*m_queueUser);
InfoComponent* pCom = new ConcreteInfoComponent(dpMsgArry);
ConcreteInfoComponent com(dpMsgArry);
InfoAdder* pDec = NULL;
pDec = new RunQueueInfoAdder(pCom,*qi,DocTask);
pDec = new WaitQueueInfoAdder(pDec,*qi,DocTask);
pDec->Operation();
delete qi;
delete pDec;
}
///////////////////////////////////// DpptDpManager /////////////////////////
bool DpptDpManager::GetInfoAndWriteToFile(DpMessageArryType &dpMsgArry)
{
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
if(!m_queueUser->GetDpptWaitQueueMessage(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetDpptWaitQueueMessage error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!m_queueUser->GetDpptRunQueueMessage(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetDpptRunQueueMessage error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!WriteInfoToFile(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d dppt WriteInfoToFile error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
return true;
}
bool DpptDpManager::GetInfoFromSystem(DpMessageArryType &dpMsgArry)
{
QueueInfo *qi = new QueueInfoAdapter(*m_queueUser);
InfoComponent* pCom = new ConcreteInfoComponent(dpMsgArry);
ConcreteInfoComponent com(dpMsgArry);
InfoAdder* pDec = NULL;
pDec = new RunQueueInfoAdder(pCom,*qi,DpptTask);
pDec = new WaitQueueInfoAdder(pDec,*qi,DpptTask);
pDec->Operation();
delete qi;
delete pDec;
}
///////////////////////////////////// VideoDpManager /////////////////////////
bool VideoDpManager::GetInfoAndWriteToFile(DpMessageArryType &dpMsgArry)
{
LoggerWrapper dsLog= LoggerWrapper::GetInstance();
if(!m_queueUser->GetVideoWaitQueueMessage(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetVideoWaitQueueMessage error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!m_queueUser->GetVideoRunQueueMessage(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d GetVideoRunQueueMessage error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
if(!WriteInfoToFile(dpMsgArry)){
dsLog.Log(true,LOGGER_ERROR,"%s:%s:%d video WriteInfoToFile error.\n",__FILE__,__PRETTY_FUNCTION__,__LINE__);
return false;
}
return true;
}
bool VideoDpManager::GetInfoFromSystem(DpMessageArryType &dpMsgArry)
{
bool ret = false;
QueueInfo *qi = new QueueInfoAdapter(*m_queueUser);
InfoComponent* pCom = new ConcreteInfoComponent(dpMsgArry);
ConcreteInfoComponent com(dpMsgArry);
InfoAdder* pDec = NULL;
pDec = new RunQueueInfoAdder(pCom,*qi,VideoTask);
pDec = new WaitQueueInfoAdder(pDec,*qi,VideoTask);
ret = pDec->Operation();
delete qi;
delete pDec;
return ret;
}
*/