import java.awt.List;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.Pipe.SourceChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import org.apache.commons.beanutils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.star.io.ConnectException;
import net.sf.ezmorph.Morpher;
import net.sf.ezmorph.MorpherRegistry;
import net.sf.ezmorph.bean.BeanMorpher;
import net.sf.json.JSONArray;
import net.sf.json.JSONException;
import net.sf.json.JSONFunction;
import net.sf.json.JSONObject;
import net.sf.json.util.JSONUtils;
enum ConnectState{idel,socket_channel_open,connect,disconnect,connect_fail}
/**
* NIO客户端
* @author
*/
public class NIOClient {
//通道管理器
private Selector selector;
private static Pipe NioPipe;
static Pipe.SinkChannel sinkChannel;
private static Pipe.SourceChannel sourceChannel;
private static ConnectState cs;
private static RecvMessage recvMsg;
private static SocketChannel SockChannel;
private SocketChannel NativeChannel;
// private static Vector<Byte> m_recvBuf = new Vector();
// private static String m_recv_msg;
private static ByteBuffer m_msgBuffer = null;
private static Logger logger = LoggerFactory.getLogger(NIOClient.class);
/**
* 获得一个Socket通道,并对该通道做一些初始化的工作
* @param ip 连接的服务器的ip
* @param port 连接的服务器的端口号
* @throws IOException
*/
public void OpenChannelSelector(){
try {
NioPipe = Pipe.open();
} catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
sinkChannel = NioPipe.sink();
sourceChannel = NioPipe.source();
try {
SockChannel = SocketChannel.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
/*
try {
NativeChannel = SocketChannel.open();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
*/
try {
this.selector = Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
cs = ConnectState.socket_channel_open;
}
public void initClient(String ip,int port) throws IOException {
recvMsg = null;
// 获得一个Socket通道
SocketChannel channel = SockChannel;//SocketChannel.open();
// 设置通道为非阻塞
channel.configureBlocking(false);
sinkChannel.configureBlocking(false);
sourceChannel.configureBlocking(false);
// 获得一个通道管理器
Selector selector = this.selector;
//this.selector = Selector.open();
//cs = ConnectState.socket_channel_open;
// 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
//用channel.finishConnect();才能完成连接
try {
channel.connect(new InetSocketAddress(ip,port));
}catch (UnresolvedAddressException e){
System.out.println("Connect error!");
}
//SockChannel.connect(new InetSocketAddress(ip,port));
//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
//SockChannel.register(selector, SelectionKey.OP_CONNECT);
//sinkChannel.register(selector,SelectionKey.OP_READ);
sourceChannel.register(selector,SelectionKey.OP_READ);
}
/**
* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
* @throws IOException
* @throws com.sun.star.io.IOException
*/
// @SuppressWarnings("unchecked")
public void EventSelect() throws IOException, com.sun.star.io.IOException {
//IniReader iniReader = new IniReader("./fileconver_config.ini");
ConfigEnv cfg = ConfigEnv.getInstance();
// 轮询访问selector
while (true) {
this.selector.select();
System.out.println("**********1");
// 获得selector中选中的项的迭代器
Iterator<?> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key,以防重复处理
ite.remove();
// 连接事件发生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
// 如果正在连接,则完成连接
if(channel.isConnectionPending()){
//channel.finishConnect();
System.out.println("**********31");
try{
channel.finishConnect();
} catch (Exception e){
cs = ConnectState.connect_fail;
System.out.println( "!!!!!!!!!!! Exception " + e);
SockChannel.close();
this.selector.close();
// Start up timer for reconnection
//OpenOfficePdfConvert.stopService();
return;
}
cs = ConnectState.connect;
}
// 设置成非阻塞
channel.configureBlocking(false);
//String native_id = iniReader.getValue("config", "ID");
//String native_ip = iniReader.getValue("config", "IP");
int native_id = cfg.GetServerId();
String native_ip = cfg.GetDsIp();
//String a = new String("{\"id\" : " + native_id + ",\"ip\" : " + native_ip + ",\"servertype\" : \"DOC\",\"type\" : \"SvrRegister\"}");
Map<String, String> map2 = new HashMap<String, String>();
map2.put("type","SvrRegister");
map2.put("id","" + native_id);
map2.put("ip","" + native_ip);
map2.put("servertype","DOC");
try {
SendMsgInJson(map2,channel);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//System.out.println(a);
//在这里可以给服务端发送信息哦
// channel.write(ByteBuffer.wrap((a+"\0").getBytes("utf-8")));
//在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
channel.register(selector, SelectionKey.OP_READ);
// channel.register(this.selector, SelectionKey.OP_WRITE);
// 获得了可读的事件
} else if (key.isReadable()) {
if( key.channel() == SockChannel)
read(key);
else if (key.channel() == sourceChannel)
read_source(key);
} else if (key.isWritable()) {
System.out.println("write able!");
} else if (key.isValid()){
System.out.println("------valid");
}
//System.out.println("-=================== 1");
}
//System.out.println("-=================== 2");
if(cs == ConnectState.disconnect)
break;
}
//System.out.println("-=================== 3");
}
public static void SendMsgInJson(Map<String, String> map,SocketChannel channel) throws IOException{
JSONObject jsonObject = JSONObject.fromObject( map );
String sendmsg = jsonObject.toString() + "\0";
System.out.println("---------- SendMsgInJson: " + sendmsg);
logger.info("SendMsgInJson: " + sendmsg);
ByteBuffer outBuffer = ByteBuffer.wrap(sendmsg.getBytes());
try{
channel.write(outBuffer);
} catch (IOException e){
//System.out.println("SendMsgInJson err");
if (channel.isOpen()) {
channel.close();
OpenOfficePdfConvert.stopService();
}
e.printStackTrace();
}
}
public static void SendResultMsg(ConvertResult result){
RecvMessage rmsg = recvMsg;
SocketChannel channel = SockChannel;
String file = rmsg.getFile();
String id = rmsg.getId();
String prefix = rmsg.getPrefix();
String targetfolder = rmsg.getTargetfolder();
// String transtype = rmsg.getTranstype();
String errorCode = null;
String strResult = null;
Map<String, String> map2 = new HashMap<String, String>();
if(result.getType().equals("TransOk")){
Boolean ret = SysTools.convert_pdf2swf(rmsg);
System.out.println("SysTools.convert_pdf2swf: " + ret);
errorCode = "0";
strResult = "Success";
} else {
errorCode = "10";
strResult = "Fail";
}
int page_num = SysTools.GetPdfPageNum(targetfolder + "/" + prefix + ".pdf");
System.out.println(targetfolder + "/" + prefix + ".pdf " + "page num:" + page_num);
String ErrorDetail = "";
map2.put("type","TransResult");
map2.put("id",id);
map2.put("errorcode",errorCode);
map2.put("errordetail",ErrorDetail);
map2.put("file",file);
map2.put("pages","" + page_num);
map2.put("result",strResult);
map2.put("servertype","DOC");
map2.put("targetfolder",targetfolder);
map2.put("timeused","0");
try {
SendMsgInJson(map2,channel);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 处理读取服务端发来的信息 的事件
* @param key
* @throws IOException
*/
public static void read(SelectionKey key) throws IOException{
//和服务端的read方法一样
int count = 0,i = 0,j = 0;
// 服务器可读取消息:得到事件发生的Socket通道
// final ConfigEnv cfg = ConfigEnv.getInstance();
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
//获取事件key中的channel
ByteBuffer buffer = ByteBuffer.allocate(1000);
if(m_msgBuffer == null)
m_msgBuffer = ByteBuffer.allocate(1000);
do {
//清理缓冲区,便于使用
buffer.clear();
//将channel中的字节流读入缓冲区
count = channel.read(buffer);
if (count > 0) {
buffer.flip();
//处理粘包
for( i= 0,j = 0; i < buffer.remaining(); i ++) {
Byte b = buffer.get(i);
if(b != '\0') {
m_msgBuffer.put(j,b);
j += 1;
} else {
j = 0;
String recv_msg = new String(m_msgBuffer.array(), "UTF-8");
try {
handlePacket(channel,recv_msg);
} catch (Exception e) {
e.printStackTrace();
}
if(i < buffer.remaining()){
m_msgBuffer = ByteBuffer.allocate(1000);
continue;
} else {
}
}
}
buffer.clear();
} else if (count == 0) {
break;
} else {
channel.close();
logger.info("客户端"+channel.toString()+"连接关闭!");
cs = ConnectState.disconnect;
return;
}
} while (count > 0);
}
// public void ProcessFrame(const char *buf, int length)
private static void handlePacket(SocketChannel channel,String msg/* ByteBuffer buffer*/) {
// TODO Auto-generated method stub
final ConfigEnv cfg = ConfigEnv.getInstance();
System.out.println(msg);
System.out.println("++++ msg len:" + msg.length());
if(msg.length() <= 0){
return;
}
JSONObject obj = null;
try{
obj = JSONObject.fromObject(msg);
}catch(JSONException e){
System.out.println("wrong msg!");
return;
}
RecvMessage rmsg = new RecvMessage();
rmsg = (RecvMessage)JSONObject.toBean(obj, RecvMessage.class);
System.out.println(rmsg);
// String tp = rmsg.getType();
// System.out.println("type:" + tp);
if(rmsg.getType().equals("SvrLinkCheck")){
Map<String, String> map = new HashMap<String, String>();
map.put("id","" + cfg.GetServerId());
map.put("ip", "127.0.0.1");
map.put("servertype", "DOC");
map.put("type", "LinkResponse");
try {
SendMsgInJson(map,channel);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else if (rmsg.getType().equals("Transfer")){
Map<String, String> map = new HashMap<String, String>();
map.put("id",rmsg.getId());
map.put("type","AcceptTask");
try {
SendMsgInJson(map,channel);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// TODO: check args & mount remote file system.
String file = rmsg.getFile();
// String id = rmsg.getId();
String prefix = rmsg.getPrefix();
String targetfolder = rmsg.getTargetfolder();
// String transtype = rmsg.getTranstype();
FileTransResult res = ConverterManager.SubmitTransTask(file,targetfolder + "/" + prefix + ".pdf");
recvMsg = rmsg;
} else if (rmsg.getType().equals("ResultResponse")) {
}
// buffer.clear();
}
public static void read_source(SelectionKey key) throws IOException{
//和服务端的read方法一样
// 服务器可读取消息:得到事件发生的Socket通道
final ConfigEnv cfg = ConfigEnv.getInstance();
SourceChannel channel = (SourceChannel) key.channel();
//SockChannel = channel;
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1000);
while (channel.read(buffer) > 0) {
buffer.flip();
System.out.println("Receive from sink:"
+ new String(buffer.array(), "UTF-8"));
buffer.clear();
}
String msg = SysTools.ByteBuffToString(buffer);
System.out.println("---------- read_source:" + msg);
System.out.println(msg);
System.out.println("++++ msg len:" + msg.length());
if(msg.length() <= 0){
return;
}
JSONObject obj = null;
try{
obj = JSONObject.fromObject(msg);
}catch(JSONException e){
System.out.println("wrong msg!");
sourceChannel.close();
return;
}
ConvertResult result = new ConvertResult();
result = (ConvertResult)JSONObject.toBean(obj, ConvertResult.class);
System.out.println("&&&& " + result);
System.out.println("---------- send source:");
SendResultMsg(result);
}
/**
* 启动客户端测试
* @throws com.sun.star.io.IOException
* @throws IOException
* @throws NoSuchMethodException
* @throws InvocationTargetException
* @throws com.sun.star.io.IOException
*/
public void ReConnectTimer() {
final ConfigEnv cfg = ConfigEnv.getInstance();
if(cs == ConnectState.idel){
OpenChannelSelector();
try {
initClient(cfg.GetDsIp(),cfg.GetDsPort());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
EventSelect();
} catch (com.sun.star.io.IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// if((cs == ConnectState.disconnect) || (cs == ConnectState.connect_fail)){
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// public void run() {
else if(cs == ConnectState.disconnect){
OpenChannelSelector();
try {
initClient(cfg.GetDsIp(),cfg.GetDsPort());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
EventSelect();
} catch (com.sun.star.io.IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else if(cs == ConnectState.connect_fail){
OpenChannelSelector();
try {
initClient(cfg.GetDsIp(),cfg.GetDsPort());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
EventSelect();
} catch (com.sun.star.io.IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//}
// }, 5000);// 设定指定的时间time,此处为2000毫秒
// }
}
public static void main(String[] args) throws com.sun.star.io.IOException, IOException {
String arg = null;
if(args != null && args.length >0){
arg = args[0];
} else {
System.out.println("no args!!!");
System.exit(0);
}
int id = 3000;
try {
id = Integer.parseInt(arg);
} catch (NumberFormatException e){
System.out.println("Parameter error!");
System.exit(0);
}
cs = ConnectState.idel;
ConfigEnv cfg = null;
try{
cfg = ConfigEnv.getInstance();
}catch(ExceptionInInitializerError e){
System.out.println("config file error!");
System.exit(0);
}
cfg.SetServerId(id);
cfg.SetConvertImpPort(id + 5000);
try {
ConverterManager.StartConverter();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
NIOClient client = new NIOClient();
//client.OpenChannelSelector();
while(true){
client.ReConnectTimer();
}
}
}
|