Mercurial > hg > bitcoin
changeset 2172:fb6b62155b4a draft
Merge pull request #1036 from gavinandresen/pubsubcleanup
Remove half-implemented publish/subscribe system
author | Gavin Andresen <gavinandresen@gmail.com> |
---|---|
date | Thu, 05 Apr 2012 07:33:54 -0700 |
parents | 76a92e10d72d (current diff) 512987fd1698 (diff) |
children | 5d2c2f86d380 |
files | |
diffstat | 2 files changed, 0 insertions(+), 167 deletions(-) [+] |
line wrap: on
line diff
--- a/src/net.cpp +++ b/src/net.cpp @@ -289,105 +289,6 @@ -void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1) -{ - // If the dialog might get closed before the reply comes back, - // call this in the destructor so it doesn't get called after it's deleted. - CRITICAL_BLOCK(cs_vNodes) - { - BOOST_FOREACH(CNode* pnode, vNodes) - { - CRITICAL_BLOCK(pnode->cs_mapRequests) - { - for (map<uint256, CRequestTracker>::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();) - { - CRequestTracker& tracker = (*mi).second; - if (tracker.fn == fn && tracker.param1 == param1) - pnode->mapRequests.erase(mi++); - else - mi++; - } - } - } - } -} - - - - - - - -// -// Subscription methods for the broadcast and subscription system. -// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT. -// -// The subscription system uses a meet-in-the-middle strategy. -// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers -// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through. -// - -bool AnySubscribed(unsigned int nChannel) -{ - if (pnodeLocalHost->IsSubscribed(nChannel)) - return true; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->IsSubscribed(nChannel)) - return true; - return false; -} - -bool CNode::IsSubscribed(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return false; - return vfSubscribe[nChannel]; -} - -void CNode::Subscribe(unsigned int nChannel, unsigned int nHops) -{ - if (nChannel >= vfSubscribe.size()) - return; - - if (!AnySubscribed(nChannel)) - { - // Relay subscribe - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("subscribe", nChannel, nHops); - } - - vfSubscribe[nChannel] = true; -} - -void CNode::CancelSubscribe(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return; - - // Prevent from relaying cancel if wasn't subscribed - if (!vfSubscribe[nChannel]) - return; - vfSubscribe[nChannel] = false; - - if (!AnySubscribed(nChannel)) - { - // Relay subscription cancel - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("sub-cancel", nChannel); - } -} - - - - - - - CNode* FindNode(const CNetAddr& ip) @@ -486,13 +387,6 @@ void CNode::Cleanup() { - // All of a nodes broadcasts and subscriptions are automatically torn down - // when it goes down, so a node has to stay up to keep its broadcast going. - - // Cancel subscriptions - for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++) - if (vfSubscribe[nChannel]) - CancelSubscribe(nChannel); }
--- a/src/net.h +++ b/src/net.h @@ -29,7 +29,6 @@ inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 10*1000); } inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 10*1000); } -static const unsigned int PUBLISH_HOPS = 5; bool RecvLine(SOCKET hSocket, std::string& strLine); bool GetMyExternalIP(CNetAddr& ipRet); @@ -37,8 +36,6 @@ CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CService& ip); CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0); -void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1); -bool AnySubscribed(unsigned int nChannel); void MapPort(bool fMapPort); bool BindListenPort(std::string& strError=REF(std::string())); void StartNode(void* parg); @@ -160,9 +157,6 @@ CCriticalSection cs_inventory; std::multimap<int64, CInv> mapAskFor; - // publish and subscription - std::vector<char> vfSubscribe; - CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false) { nServices = 0; @@ -192,7 +186,6 @@ hashLastGetBlocksEnd = 0; nStartingHeight = -1; fGetAddr = false; - vfSubscribe.assign(256, false); nMisbehavior = 0; setInventoryKnown.max_size(SendBufferSize() / 1000); @@ -634,58 +627,4 @@ } - - - - - - -// -// Templates for the publish and subscription system. -// The object being published as T& obj needs to have: -// a set<unsigned int> setSources member -// specializations of AdvertInsert and AdvertErase -// Currently implemented for CTable and CProduct. -// - -template<typename T> -void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) -{ - // Add to sources - obj.setSources.insert(pfrom->addr.ip); - - if (!AdvertInsert(obj)) - return; - - // Relay - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel))) - pnode->PushMessage("publish", nChannel, nHops, obj); -} - -template<typename T> -void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) -{ - uint256 hash = obj.GetHash(); - - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel))) - pnode->PushMessage("pub-cancel", nChannel, nHops, hash); - - AdvertErase(obj); -} - -template<typename T> -void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) -{ - // Remove a source - obj.setSources.erase(pfrom->addr.ip); - - // If no longer supported by any sources, cancel it - if (obj.setSources.empty()) - AdvertStopPublish(pfrom, nChannel, nHops, obj); -} - #endif