打印

分享最近一个JAVA项目:通信部分。

[复制链接]
823|2
手机看帖
扫描二维码
随时随地手机跟帖
跳转到指定楼层
楼主
keer_zu|  楼主 | 2015-11-5 09:07 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
沙发
keer_zu|  楼主 | 2015-11-5 09:08 | 只看该作者
import java.awt.List;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;  
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.Pipe.SourceChannel;
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;

import org.apache.commons.beanutils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.star.io.ConnectException;

import net.sf.ezmorph.Morpher;
import net.sf.ezmorph.MorpherRegistry;
import net.sf.ezmorph.bean.BeanMorpher;
import net.sf.json.JSONArray;
import net.sf.json.JSONException;
import net.sf.json.JSONFunction;
import net.sf.json.JSONObject;
import net.sf.json.util.JSONUtils;  

enum ConnectState{idel,socket_channel_open,connect,disconnect,connect_fail}
/**
* NIO客户端
* @author
*/  
public class NIOClient {  
    //通道管理器  
    private  Selector selector;  
    private static Pipe NioPipe;
    static Pipe.SinkChannel sinkChannel;
    private static Pipe.SourceChannel sourceChannel;

    private static ConnectState cs;

    private static RecvMessage recvMsg;
    private static SocketChannel SockChannel;
    private SocketChannel NativeChannel;
//    private static Vector<Byte> m_recvBuf = new Vector();
//    private static String m_recv_msg;
    private static ByteBuffer m_msgBuffer = null;

    private static Logger logger = LoggerFactory.getLogger(NIOClient.class);
    /**
     * 获得一个Socket通道,并对该通道做一些初始化的工作
     * @param ip 连接的服务器的ip
     * @param port  连接的服务器的端口号         
     * @throws IOException
     */  
    public  void OpenChannelSelector(){
            try {
                        NioPipe = Pipe.open();
                } catch (IOException e2) {
                        // TODO Auto-generated catch block
                        e2.printStackTrace();
                }
           
            sinkChannel = NioPipe.sink();
            sourceChannel = NioPipe.source();
           
            try {
                        SockChannel = SocketChannel.open();
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                        return;
                }
/*          
            try {
                        NativeChannel = SocketChannel.open();
                } catch (IOException e1) {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                }
   */        
            try {
                        this.selector = Selector.open();
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                        return;
                }
           
            cs = ConnectState.socket_channel_open;
    }
    public  void initClient(String ip,int port) throws IOException {  
            recvMsg = null;
        // 获得一个Socket通道  
        SocketChannel channel = SockChannel;//SocketChannel.open();  
        // 设置通道为非阻塞  
        channel.configureBlocking(false);

        sinkChannel.configureBlocking(false);
        sourceChannel.configureBlocking(false);


        // 获得一个通道管理器  
        Selector selector = this.selector;
        //this.selector = Selector.open();  
        //cs = ConnectState.socket_channel_open;
        // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调  
        //用channel.finishConnect();才能完成连接  
        try {
                channel.connect(new InetSocketAddress(ip,port));  
        }catch (UnresolvedAddressException e){
                System.out.println("Connect error!");
        }
            //SockChannel.connect(new InetSocketAddress(ip,port));
        //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。  
        channel.register(selector, SelectionKey.OP_CONNECT);  
            //SockChannel.register(selector, SelectionKey.OP_CONNECT);

        //sinkChannel.register(selector,SelectionKey.OP_READ);
        sourceChannel.register(selector,SelectionKey.OP_READ);

    }  

    /**
     * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
     * @throws IOException
     * @throws com.sun.star.io.IOException
     */  
//    @SuppressWarnings("unchecked")  
    public void EventSelect() throws IOException, com.sun.star.io.IOException {
            //IniReader iniReader = new IniReader("./fileconver_config.ini");
            ConfigEnv cfg = ConfigEnv.getInstance();
        // 轮询访问selector  
        while (true) {  
            this.selector.select();  
            System.out.println("**********1");
            // 获得selector中选中的项的迭代器  
            Iterator<?> ite = this.selector.selectedKeys().iterator();  
            while (ite.hasNext()) {  
                SelectionKey key = (SelectionKey) ite.next();  
                // 删除已选的key,以防重复处理  
                ite.remove();  
                // 连接事件发生  
                if (key.isConnectable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 如果正在连接,则完成连接  
                    if(channel.isConnectionPending()){  
                            //channel.finishConnect();
                            System.out.println("**********31");
                            try{
                                    channel.finishConnect();
                            } catch (Exception e){
                                    cs = ConnectState.connect_fail;
                                    System.out.println( "!!!!!!!!!!! Exception " + e);
                                    SockChannel.close();
                                    this.selector.close();
                                    // Start up timer for reconnection
                                    //OpenOfficePdfConvert.stopService();
                                    return;
                            }
                            cs = ConnectState.connect;
                    }
                    // 设置成非阻塞  
                    channel.configureBlocking(false);  
                    //String native_id = iniReader.getValue("config", "ID");
                    //String native_ip = iniReader.getValue("config", "IP");
                    int native_id = cfg.GetServerId();
                    String native_ip = cfg.GetDsIp();



                    //String a = new String("{\"id\" : " + native_id + ",\"ip\" : " + native_ip + ",\"servertype\" : \"DOC\",\"type\" : \"SvrRegister\"}");
                    Map<String, String> map2 = new HashMap<String, String>();
                        map2.put("type","SvrRegister");
                        map2.put("id","" + native_id);
                        map2.put("ip","" + native_ip);
                        map2.put("servertype","DOC");
                       
                        try {
                                    SendMsgInJson(map2,channel);
                            } catch (IOException e) {
                                    // TODO Auto-generated catch block
                                    e.printStackTrace();
                            }
                       
                    //System.out.println(a);
                    //在这里可以给服务端发送信息哦  
                    // channel.write(ByteBuffer.wrap((a+"\0").getBytes("utf-8")));  
                    //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。  
                    channel.register(selector, SelectionKey.OP_READ);
//                    channel.register(this.selector, SelectionKey.OP_WRITE);

                    // 获得了可读的事件  
                } else if (key.isReadable()) {  
                        if( key.channel() == SockChannel)
                        read(key);  
                        else if (key.channel() == sourceChannel)
                                read_source(key);
                } else if (key.isWritable()) {
                       
                        System.out.println("write able!");
                } else if (key.isValid()){
                       
                        System.out.println("------valid");
                }
                //System.out.println("-=================== 1");

            }  
            //System.out.println("-=================== 2");
            if(cs == ConnectState.disconnect)
                    break;
           }
                //System.out.println("-=================== 3");
        }  



    public static void SendMsgInJson(Map<String, String> map,SocketChannel channel) throws IOException{
            JSONObject jsonObject = JSONObject.fromObject( map );
            String sendmsg = jsonObject.toString() + "\0";
            System.out.println("---------- SendMsgInJson: " + sendmsg);
            logger.info("SendMsgInJson: " + sendmsg);
            ByteBuffer outBuffer = ByteBuffer.wrap(sendmsg.getBytes());
            try{
                    channel.write(outBuffer);
            } catch (IOException e){
                    //System.out.println("SendMsgInJson err");
                    if (channel.isOpen()) {
                            channel.close();
                            OpenOfficePdfConvert.stopService();
                        }
                    e.printStackTrace();
            }
    }

    public static void SendResultMsg(ConvertResult result){
            RecvMessage rmsg = recvMsg;
            SocketChannel channel = SockChannel;
            String file = rmsg.getFile();
            String id = rmsg.getId();
            String prefix = rmsg.getPrefix();
            String targetfolder = rmsg.getTargetfolder();
//           String transtype = rmsg.getTranstype();
            String errorCode = null;
            String strResult = null;
           
           
            Map<String, String> map2 = new HashMap<String, String>();
           
            if(result.getType().equals("TransOk")){
                    Boolean ret = SysTools.convert_pdf2swf(rmsg);
                    System.out.println("SysTools.convert_pdf2swf: " + ret);
                    errorCode = "0";
                    strResult = "Success";
                   
            } else {

                    errorCode = "10";
                    strResult = "Fail";
            }
           
           
            int page_num = SysTools.GetPdfPageNum(targetfolder + "/" + prefix + ".pdf");
            System.out.println(targetfolder + "/" + prefix + ".pdf   " + "page num:" + page_num);
           
            String ErrorDetail = "";
           
            map2.put("type","TransResult");
            map2.put("id",id);
            map2.put("errorcode",errorCode);
            map2.put("errordetail",ErrorDetail);
            map2.put("file",file);
            map2.put("pages","" + page_num);
            map2.put("result",strResult);
            map2.put("servertype","DOC");
            map2.put("targetfolder",targetfolder);
            map2.put("timeused","0");
            try {
                        SendMsgInJson(map2,channel);
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
    }



    /**
     * 处理读取服务端发来的信息 的事件
     * @param key
     * @throws IOException  
     */  
    public static void read(SelectionKey key) throws IOException{  
        //和服务端的read方法一样  
            int count = 0,i = 0,j = 0;
             // 服务器可读取消息:得到事件发生的Socket通道  
//           final ConfigEnv cfg = ConfigEnv.getInstance();
        SocketChannel channel = (SocketChannel) key.channel();  
        // 创建读取的缓冲区  

        //获取事件key中的channel

        ByteBuffer buffer = ByteBuffer.allocate(1000);
        if(m_msgBuffer == null)
                m_msgBuffer = ByteBuffer.allocate(1000);

        do {
                //清理缓冲区,便于使用
                buffer.clear();
                //将channel中的字节流读入缓冲区
                count = channel.read(buffer);
                
                if (count > 0) {
                buffer.flip();
                //处理粘包

                for( i= 0,j = 0; i < buffer.remaining(); i ++) {
                        Byte b = buffer.get(i);
                        if(b != '\0') {

                                m_msgBuffer.put(j,b);
                                j += 1;
                        } else {
                                j = 0;
                               
                                String recv_msg = new String(m_msgBuffer.array(), "UTF-8");
                                try {
                                handlePacket(channel,recv_msg);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                               
                                if(i < buffer.remaining()){
                                        m_msgBuffer = ByteBuffer.allocate(1000);
                                        continue;
                                } else {
                                       
                                }
                        }
                }
                
                buffer.clear();
                
                } else if (count == 0) {
                   break;
                } else {
                    channel.close();
                    logger.info("客户端"+channel.toString()+"连接关闭!");
                    cs = ConnectState.disconnect;
                    return;
                }
                
        } while (count > 0);

    }


  //  public void ProcessFrame(const char *buf, int length)


    private static void handlePacket(SocketChannel channel,String msg/* ByteBuffer buffer*/) {
                // TODO Auto-generated method stub
            final ConfigEnv cfg = ConfigEnv.getInstance();
           
        System.out.println(msg);  

        System.out.println("++++ msg len:" + msg.length());
        if(msg.length() <= 0){
                return;
        }
        JSONObject obj = null;
        try{
                obj = JSONObject.fromObject(msg);
        }catch(JSONException e){
                System.out.println("wrong msg!");

                return;
        }
        RecvMessage rmsg = new RecvMessage();

                rmsg = (RecvMessage)JSONObject.toBean(obj, RecvMessage.class);
        System.out.println(rmsg);

//        String tp = rmsg.getType();
//        System.out.println("type:" + tp);
        if(rmsg.getType().equals("SvrLinkCheck")){
                Map<String, String> map = new HashMap<String, String>();
                map.put("id","" + cfg.GetServerId());
                map.put("ip", "127.0.0.1");
                map.put("servertype", "DOC");
                map.put("type", "LinkResponse");
               
                try {
                                SendMsgInJson(map,channel);
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
        } else if (rmsg.getType().equals("Transfer")){
                Map<String, String> map = new HashMap<String, String>();
                map.put("id",rmsg.getId());
                map.put("type","AcceptTask");
               
                try {
                                SendMsgInJson(map,channel);
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
               
                // TODO: check args & mount remote file system.
               
                String file = rmsg.getFile();
//                String id = rmsg.getId();
                String prefix = rmsg.getPrefix();
                String targetfolder = rmsg.getTargetfolder();
//               String transtype = rmsg.getTranstype();
               

                FileTransResult res = ConverterManager.SubmitTransTask(file,targetfolder + "/" + prefix + ".pdf");
                recvMsg = rmsg;
        } else if (rmsg.getType().equals("ResultResponse")) {
               
        }

//       buffer.clear();
        }

        public static void read_source(SelectionKey key) throws IOException{  
        //和服务端的read方法一样  
             // 服务器可读取消息:得到事件发生的Socket通道  
            final ConfigEnv cfg = ConfigEnv.getInstance();
            SourceChannel channel = (SourceChannel) key.channel();  
        //SockChannel = channel;
        // 创建读取的缓冲区  
        ByteBuffer buffer = ByteBuffer.allocate(1000);


        while (channel.read(buffer) > 0) {
                        buffer.flip();
                        System.out.println("Receive from sink:"
                                        + new String(buffer.array(), "UTF-8"));
                        buffer.clear();
                }

        String msg = SysTools.ByteBuffToString(buffer);

        System.out.println("---------- read_source:" + msg);

        System.out.println(msg);  
        System.out.println("++++ msg len:" + msg.length());
        if(msg.length() <= 0){
                return;
        }
        JSONObject obj = null;
        try{
                obj = JSONObject.fromObject(msg);
        }catch(JSONException e){
                System.out.println("wrong msg!");
                sourceChannel.close();
                return;
        }
        ConvertResult result = new ConvertResult();

                result = (ConvertResult)JSONObject.toBean(obj, ConvertResult.class);
        System.out.println("&&&& " + result);
        System.out.println("---------- send source:");
        SendResultMsg(result);

    }  

    /**
     * 启动客户端测试
     * @throws com.sun.star.io.IOException
     * @throws IOException  
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws com.sun.star.io.IOException
     */  

    public  void ReConnectTimer() {
            final ConfigEnv cfg = ConfigEnv.getInstance();
           
            if(cs == ConnectState.idel){
                    OpenChannelSelector();
            try {
                                initClient(cfg.GetDsIp(),cfg.GetDsPort());
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            try {
                                EventSelect();
                        } catch (com.sun.star.io.IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            }
    //        if((cs == ConnectState.disconnect) || (cs == ConnectState.connect_fail)){
         //       Timer timer = new Timer();
         //       timer.schedule(new TimerTask() {
         //           public void run() {
            else if(cs == ConnectState.disconnect){
                    OpenChannelSelector();
            try {
                                initClient(cfg.GetDsIp(),cfg.GetDsPort());
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            try {
                                EventSelect();
                        } catch (com.sun.star.io.IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            try {
                                Thread.sleep(3000);
                        } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            } else if(cs == ConnectState.connect_fail){
                    OpenChannelSelector();
            try {
                                initClient(cfg.GetDsIp(),cfg.GetDsPort());
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            try {
                                EventSelect();
                        } catch (com.sun.star.io.IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            try {
                                Thread.sleep(3000);
                        } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            }
                    //}
        //        }, 5000);// 设定指定的时间time,此处为2000毫秒
    //        }
    }


    public static void main(String[] args) throws com.sun.star.io.IOException, IOException  {
            String arg = null;
            if(args != null && args.length >0){
                    arg = args[0];
            } else {
                    System.out.println("no args!!!");
                    System.exit(0);
            }
            int id = 3000;
            try {
                    id = Integer.parseInt(arg);
            } catch (NumberFormatException e){
                    System.out.println("Parameter error!");
                    System.exit(0);
            }
           
           
            cs = ConnectState.idel;
            ConfigEnv cfg = null;
            try{
                    cfg = ConfigEnv.getInstance();
            }catch(ExceptionInInitializerError e){
                    System.out.println("config file error!");
                    System.exit(0);
            }
           
            cfg.SetServerId(id);
            cfg.SetConvertImpPort(id + 5000);
           
           
            try {
                        ConverterManager.StartConverter();
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
        NIOClient client = new NIOClient();
        //client.OpenChannelSelector();
        while(true){
                client.ReConnectTimer();
        }
           
    }  

}  

使用特权

评论回复
板凳
keer_zu|  楼主 | 2015-11-5 11:07 | 只看该作者
第一个java程序,写得有点乱,正在持续重构中。。。。。

使用特权

评论回复
发新帖 我要提问
您需要登录后才可以回帖 登录 | 注册

本版积分规则

1352

主题

12436

帖子

53

粉丝