root/blogbird/src/main/scala/org/aiotrade/httpd/ClientConnection.scala @ 266:99aa5be7e923

Revision 266:99aa5be7e923, 7.1 KB (checked in by dcaoyuan, 4 months ago)

Moved actors to lib.util.io

Line 
1package org.aiotrade.httpd
2
3import java.io.IOException
4import java.nio.ByteBuffer
5import java.nio.channels.SocketChannel
6
7import org.aiotrade.lib.util.io.ChannelListener
8import org.aiotrade.lib.util.io.SelectActor.Read
9import org.aiotrade.lib.util.io.SelectActor.Write
10import scala.actors.Actor
11import Debugger._
12
13/**
14 * Represents an open HTTP connection coming from a client.
15 */
16class ClientConnection(val channel: SocketChannel) extends Actor with ChannelListener {
17  private var _isOpen = true
18  private var requestParser = new HttpRequestParser
19
20  start
21  println("clientConnection started")
22 
23  /**
24   * Is this connection currentnly open?
25   */
26  def isOpen = _isOpen && channel.isOpen && channel.isConnected
27
28  /**
29   * Closes the connection.
30   */
31  def close {
32    _isOpen = false
33    channel.close
34    super.exit
35  }
36
37  def act = loop {
38    react {
39      case Read(sender) =>
40        try {
41          val hasRequest = readMoreInput
42
43          // add back this connection to the read selector.
44          sender.addListener(this)
45
46          // was enough input read to complete a request?
47          if (hasRequest) {
48            ConnectionManager.selectActor.produce(takeRequest)
49          }
50        } catch {case ex: IOException => debugln(ex.getMessage)}
51       
52      case Write(sender) =>
53        debugln(" ** Sending reply!")
54        try {
55          val finished = flushWriteBuffers
56          if (finished) {
57            close
58          } else {
59            // add back this connection to the write selector.
60            sender.addListener(this)
61          }
62        } catch {case ex: IOException => debugln(ex.getMessage)}
63    }
64  }
65
66  /**
67   * Has this connection parsed a request?
68   */
69  def hasRequest = (requestParser != null) && requestParser.parsed
70
71  /**
72   * Produces the request parsed by this parser.
73   */
74  def takeRequest: Request = {
75    val req = requestParser.toRequest(this)
76    // TODO: Needs to be a seconday request parser
77    // for Connection: Keep-Alive
78    requestParser = null
79    req
80  }
81
82  /**
83   * Stores input during reads.
84   */
85  private val readBuf = ByteBuffer.allocate(8192)
86
87  /**
88   * Reads more input, if any, and passes it to the http request parser.
89   *
90   * @return True if all data has been sent; false if not.
91   */
92  def readMoreInput: Boolean = {
93    debugln(" ** Connection has input.")
94
95    if (!_isOpen)
96      throw new IOException("Cannot read a closed connection!")
97
98    readBuf.clear
99    var numRead = -1
100    try {
101      numRead = channel.read(readBuf)
102    } catch {
103      case ioe: IOException =>
104        debugln(ioe.getMessage)
105        close
106        throw ioe
107    }
108
109    if (numRead == -1) {
110      close
111      debugln("Remote entity shut down socket (cleanly).")
112      throw new IOException("Remote entity shut down socket (cleanly).")
113    }
114
115    val bytes = readBuf.array
116    var i = 0
117    while (i < numRead) {
118      requestParser.consume(bytes(i).asInstanceOf[Char])
119      i += 1
120    }
121   
122    hasRequest
123  }
124
125  /**
126   * A list of buffers that need to be written.
127   */
128  private val writeMutex = new Object()
129  private var writeBuffers = List[ByteBuffer]()
130
131
132  /**
133   * Queues a reply to send to this connection.
134   *
135   * <code>flushWriteBuffers</code> must be called until it returns true to
136   * guarantee the reply has been sent.
137   */
138  def send(reply: Reply) {
139    writeBuffers = reply.headerByteBuffer :: reply.dataByteBuffer :: writeBuffers
140  }
141
142
143  /**
144   * Sends as much data in its buffer queue as it can without blocking.
145   *
146   * @return True if all data has been sent; false if not.
147   */
148  def flushWriteBuffers: Boolean = {
149    if (!_isOpen) {
150      System.err.println("ERROR: Attempt to write to closed channel.")
151      return true
152    }
153
154    if (!writeBuffers.isEmpty) {
155      if (writeBuffers.head.remaining > 0) {
156        try {
157          channel.write(writeBuffers.head)
158        } catch {
159          case ioe: IOException =>
160            close
161            throw ioe
162        }
163      }
164      if (writeBuffers.head.remaining == 0) {
165        writeBuffers = writeBuffers.tail
166      }
167    }
168
169    writeBuffers isEmpty
170  }
171}
172
173
174/**
175 * Represents an HTTP request method.
176 */
177trait HttpMethod
178
179case object POST extends HttpMethod
180case object GET  extends HttpMethod
181
182/**
183 * A continuation-based HTTP request parser.
184 *
185 * The parser consumes one character at a time,
186 * which means that the parsing process can be suspended at any time.
187 *
188 * At the moment, this does not support Keep-Alive HTTP connections.
189 */
190class HttpRequestParser {
191
192  /**
193   * Indicates the current position of the parser.
194   */
195  private trait State
196  private case class FirstLine(line: String) extends State
197  private case class HeaderLine(line: String) extends State
198  private case class NextLine(line: String) extends State
199  private case class Data(data: StringBuilder) extends State
200  private case object End extends State
201  private case object EndHeaders extends State
202
203  /**
204   * Marks the current state of the parser.
205   */
206  private var state: State = FirstLine("")
207
208  /* Components of the request. */
209  private var method: HttpMethod = _
210  private var resource: String = _
211  private var headers = Map[String, String]()
212  private var data: String = _
213  private var contentLength = -1
214
215  /**
216   * Has a complete request been parsed?
217   */
218  def parsed = state == End || method == GET && state == EndHeaders
219
220  private def processFirstLine(line: String) {
221    val parts = line.split(" ")
222    parts(0) match {
223      case "GET"  => method = GET
224      case "POST" => method = POST
225      case _ => throw new Exception("Unknown HTTP method: " + parts(0))
226    }
227    resource = parts(1)
228  }
229
230  private def processHeaderLine(line :String) {
231    line split ": " toList match {
232      case List(key, value) =>
233        headers += (key -> value)
234        if (key == "Content-Length") contentLength = value.toInt
235      case _ =>
236    }
237  }
238
239
240  /**
241   * Update the state of the parser with the next character.
242   */
243  def consume(c: Char) {
244    state = (state, c) match {
245      case (FirstLine(ln), '\r' | '\n') => processFirstLine(ln); NextLine("" + c)
246      case (FirstLine(ln), _) => FirstLine(ln + c)
247
248      case (NextLine("\n"),     '\n') => EndHeaders
249      case (NextLine("\r\n\r"), '\n') => EndHeaders
250      case (NextLine("\r"),     '\n') => NextLine("\r\n")
251      case (NextLine("\r\n"),   '\r') => NextLine("\r\n\r")
252
253      case (NextLine(_), c) => HeaderLine(String.valueOf(c))
254
255      case (HeaderLine(ln), '\r' | '\n') => processHeaderLine(ln); NextLine("" + c)
256      case (HeaderLine(ln), _) => HeaderLine(ln + c)
257
258      case (EndHeaders, c) if contentLength > 1 =>
259        val sb = new StringBuilder
260        sb.append(c)
261        Data(sb)
262
263      case (Data(sb), c) if contentLength > (sb.length + 1) =>
264        sb.append(c)
265        Data(sb)
266
267      case (Data(sb), c) =>
268        sb.append(c)
269        this.data = sb.toString
270        End
271
272      case _ => throw new Exception(state + "/" + c + "\n\n" + headers)
273    }
274  }
275
276  /**
277   * If a request is completed, produce the Request object.
278   */
279  def toRequest(conn: ClientConnection): Request = {
280    if (this.parsed)
281      Request(conn, method, resource, headers, data)
282    else
283      throw new Exception("HTTP request not completed!")
284  }
285}
Note: See TracBrowser for help on using the browser.