changeset 2492:343e76e622f9 draft

Merge pull request #1101 from jgarzik/http11 Multithreaded JSON-RPC with HTTP 1.1 Keep-Alive support
author Jeff Garzik <jgarzik@exmulti.com>
date Fri, 11 May 2012 09:57:08 -0700
parents 1a8ec9ad0a62 (current diff) 7011ff7102f6 (diff)
children 4e8661575024 693e64d4233e f22c048e5427
files src/bitcoinrpc.cpp src/net.cpp
diffstat 4 files changed, 105 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/src/bitcoinrpc.cpp
+++ b/src/bitcoinrpc.cpp
@@ -46,6 +46,8 @@
 
 const Object emptyobj;
 
+void ThreadRPCServer3(void* parg);
+
 Object JSONRPCError(int code, const string& message)
 {
     Object error;
@@ -2021,7 +2023,7 @@
         throw JSONRPCError(-10, "Bitcoin is downloading blocks...");
 
     typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t;
-    static mapNewBlock_t mapNewBlock;
+    static mapNewBlock_t mapNewBlock;    // FIXME: thread safety
     static vector<CBlock*> vNewBlock;
     static CReserveKey reservekey(pwalletMain);
 
@@ -2355,7 +2357,7 @@
     return string(buffer);
 }
 
-static string HTTPReply(int nStatus, const string& strMsg)
+static string HTTPReply(int nStatus, const string& strMsg, bool keepalive)
 {
     if (nStatus == 401)
         return strprintf("HTTP/1.0 401 Authorization Required\r\n"
@@ -2384,7 +2386,7 @@
     return strprintf(
             "HTTP/1.1 %d %s\r\n"
             "Date: %s\r\n"
-            "Connection: close\r\n"
+            "Connection: %s\r\n"
             "Content-Length: %d\r\n"
             "Content-Type: application/json\r\n"
             "Server: bitcoin-json-rpc/%s\r\n"
@@ -2393,12 +2395,13 @@
         nStatus,
         cStatus,
         rfc1123Time().c_str(),
+        keepalive ? "keep-alive" : "close",
         strMsg.size(),
         FormatFullVersion().c_str(),
         strMsg.c_str());
 }
 
-int ReadHTTPStatus(std::basic_istream<char>& stream)
+int ReadHTTPStatus(std::basic_istream<char>& stream, int &proto)
 {
     string str;
     getline(stream, str);
@@ -2406,6 +2409,10 @@
     boost::split(vWords, str, boost::is_any_of(" "));
     if (vWords.size() < 2)
         return 500;
+    proto = 0;
+    const char *ver = strstr(str.c_str(), "HTTP/1.");
+    if (ver != NULL)
+        proto = atoi(ver+7);
     return atoi(vWords[1].c_str());
 }
 
@@ -2440,7 +2447,8 @@
     strMessageRet = "";
 
     // Read status
-    int nStatus = ReadHTTPStatus(stream);
+    int nProto;
+    int nStatus = ReadHTTPStatus(stream, nProto);
 
     // Read header
     int nLen = ReadHTTPHeader(stream, mapHeadersRet);
@@ -2455,6 +2463,16 @@
         strMessageRet = string(vch.begin(), vch.end());
     }
 
+    string sConHdr = mapHeadersRet["connection"];
+
+    if ((sConHdr != "close") && (sConHdr != "keep-alive"))
+    {
+        if (nProto >= 1)
+            mapHeadersRet["connection"] = "keep-alive";
+        else
+            mapHeadersRet["connection"] = "close";
+    }
+
     return nStatus;
 }
 
@@ -2507,7 +2525,7 @@
     if (code == -32600) nStatus = 400;
     else if (code == -32601) nStatus = 404;
     string strReply = JSONRPCReply(Value::null, objError, id);
-    stream << HTTPReply(nStatus, strReply) << std::flush;
+    stream << HTTPReply(nStatus, strReply, false) << std::flush;
 }
 
 bool ClientAllowed(const string& strAddress)
@@ -2573,20 +2591,34 @@
     SSLStream& stream;
 };
 
+class AcceptedConnection
+{
+    public:
+    SSLStream sslStream;
+    SSLIOStreamDevice d;
+    iostreams::stream<SSLIOStreamDevice> stream;
+
+    ip::tcp::endpoint peer;
+
+    AcceptedConnection(asio::io_service &io_service, ssl::context &context,
+     bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
+     stream(d) { ; }
+};
+
 void ThreadRPCServer(void* parg)
 {
     IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
     try
     {
-        vnThreadsRunning[THREAD_RPCSERVER]++;
+        vnThreadsRunning[THREAD_RPCLISTENER]++;
         ThreadRPCServer2(parg);
-        vnThreadsRunning[THREAD_RPCSERVER]--;
+        vnThreadsRunning[THREAD_RPCLISTENER]--;
     }
     catch (std::exception& e) {
-        vnThreadsRunning[THREAD_RPCSERVER]--;
+        vnThreadsRunning[THREAD_RPCLISTENER]--;
         PrintException(&e, "ThreadRPCServer()");
     } catch (...) {
-        vnThreadsRunning[THREAD_RPCSERVER]--;
+        vnThreadsRunning[THREAD_RPCLISTENER]--;
         PrintException(NULL, "ThreadRPCServer()");
     }
     printf("ThreadRPCServer exiting\n");
@@ -2664,55 +2696,78 @@
     loop
     {
         // Accept connection
-        SSLStream sslStream(io_service, context);
-        SSLIOStreamDevice d(sslStream, fUseSSL);
-        iostreams::stream<SSLIOStreamDevice> stream(d);
-
-        ip::tcp::endpoint peer;
-        vnThreadsRunning[THREAD_RPCSERVER]--;
-        acceptor.accept(sslStream.lowest_layer(), peer);
-        vnThreadsRunning[4]++;
+        AcceptedConnection *conn =
+                    new AcceptedConnection(io_service, context, fUseSSL);
+
+        vnThreadsRunning[THREAD_RPCLISTENER]--;
+        acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
+        vnThreadsRunning[THREAD_RPCLISTENER]++;
+
         if (fShutdown)
+        {
+            delete conn;
             return;
-
-        // Restrict callers by IP
-        if (!ClientAllowed(peer.address().to_string()))
+        }
+
+        // Restrict callers by IP.  It is important to
+        // do this before starting client thread, to filter out
+        // certain DoS and misbehaving clients.
+        if (!ClientAllowed(conn->peer.address().to_string()))
         {
             // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
             if (!fUseSSL)
-                stream << HTTPReply(403, "") << std::flush;
-            continue;
+                conn->stream << HTTPReply(403, "", false) << std::flush;
+            delete conn;
+        }
+
+        // start HTTP client thread
+        else if (!CreateThread(ThreadRPCServer3, conn)) {
+            printf("Failed to create RPC server client thread\n");
+            delete conn;
         }
-
+    }
+}
+
+void ThreadRPCServer3(void* parg)
+{
+    IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg));
+    vnThreadsRunning[THREAD_RPCHANDLER]++;
+    AcceptedConnection *conn = (AcceptedConnection *) parg;
+
+    bool fRun = true;
+    loop {
+        if (fShutdown || !fRun)
+        {
+            conn->stream.close();
+            delete conn;
+            --vnThreadsRunning[THREAD_RPCHANDLER];
+            return;
+        }
         map<string, string> mapHeaders;
         string strRequest;
 
-        boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
-        if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
-        {   // Timed out:
-            acceptor.cancel();
-            printf("ThreadRPCServer ReadHTTP timeout\n");
-            continue;
-        }
+        ReadHTTP(conn->stream, mapHeaders, strRequest);
 
         // Check authorization
         if (mapHeaders.count("authorization") == 0)
         {
-            stream << HTTPReply(401, "") << std::flush;
-            continue;
+            conn->stream << HTTPReply(401, "", false) << std::flush;
+            break;
         }
         if (!HTTPAuthorized(mapHeaders))
         {
-            printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str());
+            printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str());
             /* Deter brute-forcing short passwords.
                If this results in a DOS the user really
                shouldn't have their RPC port exposed.*/
             if (mapArgs["-rpcpassword"].size() < 20)
                 Sleep(250);
 
-            stream << HTTPReply(401, "") << std::flush;
-            continue;
+            conn->stream << HTTPReply(401, "", false) << std::flush;
+            break;
         }
+        if (mapHeaders["connection"] == "close")
+            fRun = false;
 
         Value id = Value::null;
         try
@@ -2750,17 +2805,22 @@
 
             // Send reply
             string strReply = JSONRPCReply(result, Value::null, id);
-            stream << HTTPReply(200, strReply) << std::flush;
+            conn->stream << HTTPReply(200, strReply, fRun) << std::flush;
         }
         catch (Object& objError)
         {
-            ErrorReply(stream, objError, id);
+            ErrorReply(conn->stream, objError, id);
+            break;
         }
         catch (std::exception& e)
         {
-            ErrorReply(stream, JSONRPCError(-32700, e.what()), id);
+            ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
+            break;
         }
     }
+
+    delete conn;
+    vnThreadsRunning[THREAD_RPCHANDLER]--;
 }
 
 json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array &params) const
--- a/src/bitcoinrpc.h
+++ b/src/bitcoinrpc.h
@@ -9,6 +9,7 @@
 #include <string>
 #include <map>
 
+#define BOOST_SPIRIT_THREADSAFE
 #include "json/json_spirit_reader_template.h"
 #include "json/json_spirit_writer_template.h"
 #include "json/json_spirit_utils.h"
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1839,12 +1839,13 @@
     if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
     if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
     if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
-    if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
+    if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
+    if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
     if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
     if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
     if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
     if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
-    while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0)
+    while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
         Sleep(20);
     Sleep(50);
     DumpAddresses();
--- a/src/net.h
+++ b/src/net.h
@@ -92,11 +92,12 @@
     THREAD_OPENCONNECTIONS,
     THREAD_MESSAGEHANDLER,
     THREAD_MINER,
-    THREAD_RPCSERVER,
+    THREAD_RPCLISTENER,
     THREAD_UPNP,
     THREAD_DNSSEED,
     THREAD_ADDEDCONNECTIONS,
     THREAD_DUMPADDRESS,
+    THREAD_RPCHANDLER,
 
     THREAD_MAX
 };