comet之 Tomcat6 的servlet异步处理实现 - birdmen - Jav...

来源:百度文库 编辑:神马文学网 时间:2024/04/27 16:41:48

comet之 Tomcat6 的servlet异步处理实现

文章分类:Java编程Comet支持
  Comet支持允许一个servlet异步处理IO,当数据在连接上可读的时候(而不是使用阻塞读)和往连接异步(最有可能的是来自一些其它原引发的事件)写回数据的时候接收事件。

CometEvent
  实现了org.apache.catalina.CometProcessor接口的Sevlets有他们的事件激活方法而不是使用平常的服务方法,依照谁发生的事件。事件对象引发存取通常的request和respose对象,它可能使用平常的方式。重要的区别是这些对象在任何时候保持有效和全功能,在事件BEGIN开始到END或者ERROR事件结束。下面是这些事件类型:

◆ EventType.BEGIN: 在连接处理开始将被调用。它能被用来初始化任何在request和response对象中使用的相关字段。 在这个事件处理之后和结束处理开始或者错误事件之间,是可以使用response对象来在连接上进行异步写。注意,reponse对象和依靠的 OutputStream和Writer仍然不是同步的,因此当多个线程存取的时候,同步是强制的。在初始事件处理之后,request才会提交。
◆ EventType.READ: 这个指示输入数据可以用了,可以无阻塞的读了。可用的和InputStream或者Read的读方法可以用来决定是否有阻塞风险: servlet会读报告可用的数据,还进一步读取可用的数据。当遇到读错误的时候,servlet通过抛出异常来报告它。抛出异常会导致一个错误事件被激活,连接会被关闭。另外,可以捕捉异常,清理sevlet用到的数据结构,然后调用事件的close方法。尝试读取这个方法执行的request对象的数据是不允许的。
  在一些平台上,例如Windows,一个客户端断开会有一个READ事件。从流读数据可能返回-1, IOException 或者 EOFException。确认你处理了所有这三种情况。如果你没有处理,Tomcat会立刻抛出你改捕捉事件,这个时候你会被通知错误。
◆ EventType.END:request 结束时调用。在开始方法中初始化的字段会被重置。在这个事件处理之后,request和response对象, 还有哪些依赖的对象,会被重新使用,用来处理其它的请求。 End在 数据可用和读文件结束的时候也会被调用。
◆ EventType.ERROR: 当容器在连接上遇到IO错误或者类似不可恢复的错误时,这个错误出现。在begin方法初始化的数据会被重置。这个事件处理之后,request和 response对象,还有一些依赖的对象,会被重新被其它请求使用。

这些事事件子类型,允许更详细的事件处理 (注意: 一些事件需要使用org.apache.catalina.valves.CometConnectionManagerValve 值):
◆ EventSubType.TIMEOUT: 连接超时 (ERROR子类型);注意这个错误类型不是致命的,连接不会被关闭除非servlet使用了事件的close方法.
◆ EventSubType.CLIENT_DISCONNECT: 客户端连接被关闭(ERROR子类型).事件的方法.
◆ EventSubType.IOEXCEPTION: 一个IO异常发生, 例如无效内容,一个无效的大块 (ERROR子类型).
◆ EventSubType.WEBAPP_RELOAD: web程序正在被重新加载T(END子类型).
◆ EventSubType.SESSION_END: sevlet结束会话 (END子类型).
   就像上面描述的,典型的Comet请求生命周期将会包含一系列的事件:BEGIN -> READ -> READ -> READ -> ERROR/TIMEOUT. 在任何时候,servlet可以使用事件对象的close方法关闭请求的处理。

CometFilter

类似普通的过滤器,一个过滤器链会被激活当comet事件处理的时候。这些过滤器要应用CometFilter接口(他和平常的过滤器接口一样), 要被声明和影射在部署描述符中,和常规过滤器相同的方式。当处理一个事件的时候,过滤器链仅仅包括哪些匹配所使用影射规则的过滤器,也要实现 CometFiler接口.

例子代码
  下面的伪代码servlet,实现了异步聊天功能,使用上面描述的API:
Java代码
  1. public class ChatServlet   
  2.        extends HttpServlet implements CometProcessor {   
  3.   
  4.        protected ArrayList connections =   
  5.            new ArrayList();   
  6.        protected MessageSender messageSender = null;   
  7.          
  8.        public void init() throws ServletException {   
  9.            messageSender = new MessageSender();   
  10.            Thread messageSenderThread =   
  11.                new Thread(messageSender, "MessageSender[" + getServletContext().getContextPath() + "]");   
  12.            messageSenderThread.setDaemon(true);   
  13.            messageSenderThread.start();   
  14.        }   
  15.   
  16.        public void destroy() {   
  17.            connections.clear();   
  18.            messageSender.stop();   
  19.            messageSender = null;   
  20.        }   
  21.   
  22.        /**  
  23.         * Process the given Comet event.  
  24.         *  
  25.         * @param event The Comet event that will be processed  
  26.         * @throws IOException  
  27.         * @throws ServletException  
  28.         */  
  29.        public void event(CometEvent event)   
  30.            throws IOException, ServletException {   
  31.            HttpServletRequest request = event.getHttpServletRequest();   
  32.            HttpServletResponse response = event.getHttpServletResponse();   
  33.            if (event.getEventType() == CometEvent.EventType.BEGIN) {   
  34.                log("Begin for session: " + request.getSession(true).getId());   
  35.                PrintWriter writer = response.getWriter();   
  36.                writer.println("");   
  37.                writer.println("JSP Chat");   
  38.                writer.flush();   
  39.                synchronized(connections) {   
  40.                    connections.add(response);   
  41.                }   
  42.            } else if (event.getEventType() == CometEvent.EventType.ERROR) {   
  43.                log("Error for session: " + request.getSession(true).getId());   
  44.                synchronized(connections) {   
  45.                    connections.remove(response);   
  46.                }   
  47.                event.close();   
  48.            } else if (event.getEventType() == CometEvent.EventType.END) {   
  49.                log("End for session: " + request.getSession(true).getId());   
  50.                synchronized(connections) {   
  51.                    connections.remove(response);   
  52.                }   
  53.                PrintWriter writer = response.getWriter();   
  54.                writer.println("");   
  55.                event.close();   
  56.            } else if (event.getEventType() == CometEvent.EventType.READ) {   
  57.                InputStream is = request.getInputStream();   
  58.                byte[] buf = new byte[512];   
  59.                do {   
  60.                    int n = is.read(buf); //can throw an IOException   
  61.                    if (n > 0) {   
  62.                        log("Read " + n + " bytes: " + new String(buf, 0, n)   
  63.                                + " for session: " + request.getSession(true).getId());   
  64.                    } else if (n < 0) {   
  65.                        error(event, request, response);   
  66.                        return;   
  67.                    }   
  68.                } while (is.available() > 0);   
  69.            }   
  70.        }   
  71.   
  72.        public class MessageSender implements Runnable {   
  73.   
  74.            protected boolean running = true;   
  75.            protected ArrayList messages = new ArrayList();   
  76.              
  77.            public MessageSender() {   
  78.            }   
  79.              
  80.            public void stop() {   
  81.                running = false;   
  82.            }   
  83.   
  84.            /**  
  85.             * Add message for sending.  
  86.             */  
  87.            public void send(String user, String message) {   
  88.                synchronized (messages) {   
  89.                    messages.add("[" + user + "]: " + message);   
  90.                    messages.notify();   
  91.                }   
  92.            }   
  93.   
  94.            public void run() {   
  95.   
  96.                while (running) {   
  97.   
  98.                    if (messages.size() == 0) {   
  99.                        try {   
  100.                            synchronized (messages) {   
  101.                                messages.wait();   
  102.                            }   
  103.                        } catch (InterruptedException e) {   
  104.                            // Ignore   
  105.                        }   
  106.                    }   
  107.   
  108.                    synchronized (connections) {   
  109.                        String[] pendingMessages = null;   
  110.                        synchronized (messages) {   
  111.                            pendingMessages = messages.toArray(new String[0]);   
  112.                            messages.clear();   
  113.                        }   
  114.                        // Send any pending message on all the open connections   
  115.                        for (int i = 0; i < connections.size(); i++) {   
  116.                            try {   
  117.                                PrintWriter writer = connections.get(i).getWriter();   
  118.                                for (int j = 0; j < pendingMessages.length; j++) {   
  119.                                    writer.println(pendingMessages[j] + "
    "
    );   
  120.                                }   
  121.                                writer.flush();   
  122.                            } catch (IOException e) {   
  123.                                log("IOExeption sending message", e);   
  124.                            }   
  125.                        }   
  126.                    }   
  127.   
  128.                }   
  129.   
  130.            }   
  131.   
  132.        }   
  133.   
  134.    }  


Comet timeouts
如果你正在使用NIO连接器,你可以设置不同的延时为不同的comet连接。设置一个延时,像下面简单的设置一个属性就可以:
CometEvent event.... event.setTimeout(30*1000);
或者
event.getHttpServletRequest().setAttribute("org.apache.tomcat.comet.timeout", new Integer(30 * 1000));

这里设置延时30秒. 重要提示, 为了设置这个延时, 必须在BEGIN 事件之前设置, 默认值是soTimeout。 如果你正在使用APR连接器, 所有的Comet连接有相同的延时值,是soTimeout*50。

异步写

当 APR或者NIO启用的时候,Tomcat支持发送大的静态文件。这些写操作,系统负载一增加,就在更高效的方式上被异步执行。而不是使用阻塞模式发送大的response,写内容到一个静态文件时可以能的,写它用一个sendfile代码。一个缓存值可以用来缓存response数据到一个文件而不是内存。 Sendfile支持是可用的,如果request属性org.apache.tomcat.sendfile.support被设置成 Boolean.TRUE.
  任何一个Servlet都可以指示Tomcat执行一个sendfile通过设置相应的response属性。当使用Sendfile的时候,最好确认request和response都没被包装,因为response主体将被connector随后发送,他不能被过滤。除了设置3个需要的response属性,servlet不发送任何response数据,但是它可以用来修改response头,例如设置 cookies。

◆ org.apache.tomcat.sendfile.filename: Canonical文件名, String类型
◆ org.apache.tomcat.sendfile.start: Start 偏移量,Long类型
◆ org.apache.tomcat.sendfile.start: End 偏移量,Long类型