Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:vizhestkov:xs114b:p311
saltbundle-zeromq
bsc1176258.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File bsc1176258.patch of Package saltbundle-zeromq
Index: zeromq-4.2.3/src/generic_mtrie.hpp =================================================================== --- /dev/null +++ zeromq-4.2.3/src/generic_mtrie.hpp @@ -0,0 +1,117 @@ +/* +Copyright (c) 2018 Contributors as noted in the AUTHORS file + +This file is part of libzmq, the ZeroMQ core engine in C++. + +libzmq is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License (LGPL) as published +by the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +As a special exception, the Contributors give you permission to link +this library with independent modules to produce an executable, +regardless of the license terms of these independent modules, and to +copy and distribute the resulting executable under terms of your choice, +provided that you also meet, for each linked independent module, the +terms and conditions of the license of that module. An independent +module is a module which is not derived from or based on this library. +If you modify this library, you must extend this exception to your +version of the library. + +libzmq is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_GENERIC_MTRIE_HPP_INCLUDED__ +#define __ZMQ_GENERIC_MTRIE_HPP_INCLUDED__ + +#include <stddef.h> +#include <set> + +#include "macros.hpp" +#include "stdint.hpp" + +namespace zmq +{ +// Multi-trie (prefix tree). Each node in the trie is a set of pointers. +template <typename T> class generic_mtrie_t +{ + public: + typedef T value_t; + typedef const unsigned char *prefix_t; + + enum rm_result + { + not_found, + last_value_removed, + values_remain + }; + + generic_mtrie_t (); + ~generic_mtrie_t (); + + // Add key to the trie. Returns true iff no entry with the same prefix_ + // and size_ existed before. + bool add (prefix_t prefix_, size_t size_, value_t *value_); + + // Remove all entries with a specific value from the trie. + // The call_on_uniq_ flag controls if the callback is invoked + // when there are no entries left on a prefix only (true) + // or on every removal (false). The arg_ argument is passed + // through to the callback function. + template <typename Arg> + void rm (value_t *value_, + void (*func_) (const unsigned char *data_, size_t size_, Arg arg_), + Arg arg_, + bool call_on_uniq_); + + // Removes a specific entry from the trie. + // Returns the result of the operation. + rm_result rm (prefix_t prefix_, size_t size_, value_t *value_); + + // Calls a callback function for all matching entries, i.e. any node + // corresponding to data_ or a prefix of it. The arg_ argument + // is passed through to the callback function. + template <typename Arg> + void match (prefix_t data_, + size_t size_, + void (*func_) (value_t *value_, Arg arg_), + Arg arg_); + + private: + bool is_redundant () const; + + typedef std::set<value_t *> pipes_t; + pipes_t *_pipes; + + unsigned char _min; + unsigned short _count; + unsigned short _live_nodes; + union _next_t + { + class generic_mtrie_t<value_t> *node; + class generic_mtrie_t<value_t> **table; + } _next; + + struct iter + { + generic_mtrie_t<value_t> *node; + generic_mtrie_t<value_t> *next_node; + prefix_t prefix; + size_t size; + unsigned short current_child; + unsigned char new_min; + unsigned char new_max; + bool processed_for_removal; + }; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (generic_mtrie_t) +}; +} + +#endif Index: zeromq-4.2.3/src/generic_mtrie_impl.hpp =================================================================== --- /dev/null +++ zeromq-4.2.3/src/generic_mtrie_impl.hpp @@ -0,0 +1,586 @@ +/* +Copyright (c) 2018 Contributors as noted in the AUTHORS file + +This file is part of libzmq, the ZeroMQ core engine in C++. + +libzmq is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License (LGPL) as published +by the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +As a special exception, the Contributors give you permission to link +this library with independent modules to produce an executable, +regardless of the license terms of these independent modules, and to +copy and distribute the resulting executable under terms of your choice, +provided that you also meet, for each linked independent module, the +terms and conditions of the license of that module. An independent +module is a module which is not derived from or based on this library. +If you modify this library, you must extend this exception to your +version of the library. + +libzmq is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_GENERIC_MTRIE_IMPL_HPP_INCLUDED__ +#define __ZMQ_GENERIC_MTRIE_IMPL_HPP_INCLUDED__ + + +#include <stdlib.h> + +#include <new> +#include <algorithm> +#include <list> + +#include "err.hpp" +#include "macros.hpp" +#include "generic_mtrie.hpp" + +namespace zmq +{ +template <typename T> +generic_mtrie_t<T>::generic_mtrie_t () : + _pipes (0), + _min (0), + _count (0), + _live_nodes (0) +{ +} + +template <typename T> generic_mtrie_t<T>::~generic_mtrie_t () +{ + LIBZMQ_DELETE (_pipes); + + if (_count == 1) { + zmq_assert (_next.node); + LIBZMQ_DELETE (_next.node); + } else if (_count > 1) { + for (unsigned short i = 0; i != _count; ++i) { + LIBZMQ_DELETE (_next.table[i]); + } + free (_next.table); + } +} + +template <typename T> +bool generic_mtrie_t<T>::add (prefix_t prefix_, size_t size_, value_t *pipe_) +{ + generic_mtrie_t<value_t> *it = this; + + while (size_) { + const unsigned char c = *prefix_; + + if (c < it->_min || c >= it->_min + it->_count) { + // The character is out of range of currently handled + // characters. We have to extend the table. + if (!it->_count) { + it->_min = c; + it->_count = 1; + it->_next.node = NULL; + } else if (it->_count == 1) { + const unsigned char oldc = it->_min; + generic_mtrie_t *oldp = it->_next.node; + it->_count = (it->_min < c ? c - it->_min : it->_min - c) + 1; + it->_next.table = static_cast<generic_mtrie_t **> ( + malloc (sizeof (generic_mtrie_t *) * it->_count)); + alloc_assert (it->_next.table); + for (unsigned short i = 0; i != it->_count; ++i) + it->_next.table[i] = 0; + it->_min = std::min (it->_min, c); + it->_next.table[oldc - it->_min] = oldp; + } else if (it->_min < c) { + // The new character is above the current character range. + const unsigned short old_count = it->_count; + it->_count = c - it->_min + 1; + it->_next.table = static_cast<generic_mtrie_t **> (realloc ( + it->_next.table, sizeof (generic_mtrie_t *) * it->_count)); + alloc_assert (it->_next.table); + for (unsigned short i = old_count; i != it->_count; i++) + it->_next.table[i] = NULL; + } else { + // The new character is below the current character range. + const unsigned short old_count = it->_count; + it->_count = (it->_min + old_count) - c; + it->_next.table = static_cast<generic_mtrie_t **> (realloc ( + it->_next.table, sizeof (generic_mtrie_t *) * it->_count)); + alloc_assert (it->_next.table); + memmove (it->_next.table + it->_min - c, it->_next.table, + old_count * sizeof (generic_mtrie_t *)); + for (unsigned short i = 0; i != it->_min - c; i++) + it->_next.table[i] = NULL; + it->_min = c; + } + } + + // If next node does not exist, create one. + if (it->_count == 1) { + if (!it->_next.node) { + it->_next.node = new (std::nothrow) generic_mtrie_t; + alloc_assert (it->_next.node); + ++(it->_live_nodes); + } + + ++prefix_; + --size_; + it = it->_next.node; + } else { + if (!it->_next.table[c - it->_min]) { + it->_next.table[c - it->_min] = + new (std::nothrow) generic_mtrie_t; + alloc_assert (it->_next.table[c - it->_min]); + ++(it->_live_nodes); + } + + ++prefix_; + --size_; + it = it->_next.table[c - it->_min]; + } + } + + // We are at the node corresponding to the prefix. We are done. + const bool result = !it->_pipes; + if (!it->_pipes) { + it->_pipes = new (std::nothrow) pipes_t; + alloc_assert (it->_pipes); + } + it->_pipes->insert (pipe_); + + return result; +} + +template <typename T> +template <typename Arg> +void generic_mtrie_t<T>::rm (value_t *pipe_, + void (*func_) (prefix_t data_, + size_t size_, + Arg arg_), + Arg arg_, + bool call_on_uniq_) +{ + // This used to be implemented as a non-tail recursive travesal of the trie, + // which means remote clients controlled the depth of the recursion and the + // stack size. + // To simulate the non-tail recursion, with post-recursion changes depending on + // the result of the recursive call, a stack is used to re-visit the same node + // and operate on it again after children have been visisted. + // A boolean is used to record whether the node had already been visited and to + // determine if the pre- or post- children visit actions have to be taken. + // In the case of a node with (N > 1) children, the node has to be re-visited + // N times, in the correct order after each child visit. + std::list<struct iter> stack; + unsigned char *buff = NULL; + size_t maxbuffsize = 0; + struct iter it = {this, NULL, NULL, 0, 0, 0, false}; + stack.push_back (it); + + while (!stack.empty ()) { + it = stack.back (); + stack.pop_back (); + + if (!it.processed_for_removal) { + // Remove the subscription from this node. + if (it.node->_pipes && it.node->_pipes->erase (pipe_)) { + if (!call_on_uniq_ || it.node->_pipes->empty ()) { + func_ (buff, it.size, arg_); + } + + if (it.node->_pipes->empty ()) { + LIBZMQ_DELETE (it.node->_pipes); + } + } + + // Adjust the buffer. + if (it.size >= maxbuffsize) { + maxbuffsize = it.size + 256; + buff = + static_cast<unsigned char *> (realloc (buff, maxbuffsize)); + alloc_assert (buff); + } + + switch (it.node->_count) { + case 0: + // If there are no subnodes in the trie, we are done with this node + // pre-processing. + break; + case 1: { + // If there's one subnode (optimisation). + + buff[it.size] = it.node->_min; + // Mark this node as pre-processed and push it, so that the next + // visit after the operation on the child can do the removals. + it.processed_for_removal = true; + stack.push_back (it); + struct iter next = { + it.node->_next.node, NULL, NULL, ++it.size, 0, 0, false}; + stack.push_back (next); + break; + } + default: { + // If there are multiple subnodes. + // When first visiting this node, initialize the new_min/max parameters + // which will then be used after each child has been processed, on the + // post-children iterations. + if (it.current_child == 0) { + // New min non-null character in the node table after the removal + it.new_min = it.node->_min + it.node->_count - 1; + // New max non-null character in the node table after the removal + it.new_max = it.node->_min; + } + + // Mark this node as pre-processed and push it, so that the next + // visit after the operation on the child can do the removals. + buff[it.size] = it.node->_min + it.current_child; + it.processed_for_removal = true; + stack.push_back (it); + if (it.node->_next.table[it.current_child]) { + struct iter next = { + it.node->_next.table[it.current_child], + NULL, + NULL, + it.size + 1, + 0, + 0, + false}; + stack.push_back (next); + } + } + } + } else { + // Reset back for the next time, in case this node doesn't get deleted. + // This is done unconditionally, unlike when setting this variable to true. + it.processed_for_removal = false; + + switch (it.node->_count) { + case 0: + // If there are no subnodes in the trie, we are done with this node + // post-processing. + break; + case 1: + // If there's one subnode (optimisation). + + // Prune the node if it was made redundant by the removal + if (it.node->_next.node->is_redundant ()) { + LIBZMQ_DELETE (it.node->_next.node); + it.node->_count = 0; + --it.node->_live_nodes; + zmq_assert (it.node->_live_nodes == 0); + } + break; + default: + // If there are multiple subnodes. + { + if (it.node->_next.table[it.current_child]) { + // Prune redundant nodes from the mtrie + if (it.node->_next.table[it.current_child] + ->is_redundant ()) { + LIBZMQ_DELETE ( + it.node->_next.table[it.current_child]); + + zmq_assert (it.node->_live_nodes > 0); + --it.node->_live_nodes; + } else { + // The node is not redundant, so it's a candidate for being + // the new min/max node. + // + // We loop through the node array from left to right, so the + // first non-null, non-redundant node encountered is the new + // minimum index. Conversely, the last non-redundant, non-null + // node encountered is the new maximum index. + if (it.current_child + it.node->_min + < it.new_min) + it.new_min = + it.current_child + it.node->_min; + if (it.current_child + it.node->_min + > it.new_max) + it.new_max = + it.current_child + it.node->_min; + } + } + + // If there are more children to visit, push again the current + // node, so that pre-processing can happen on the next child. + // If we are done, reset the child index so that the ::rm is + // fully idempotent. + ++it.current_child; + if (it.current_child >= it.node->_count) + it.current_child = 0; + else { + stack.push_back (it); + continue; + } + + // All children have been visited and removed if needed, and + // all pre- and post-visit operations have been carried. + // Resize/free the node table if needed. + zmq_assert (it.node->_count > 1); + + // Free the node table if it's no longer used. + switch (it.node->_live_nodes) { + case 0: + free (it.node->_next.table); + it.node->_next.table = NULL; + it.node->_count = 0; + break; + case 1: + // Compact the node table if possible + + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + zmq_assert (it.new_min == it.new_max); + zmq_assert (it.new_min >= it.node->_min); + zmq_assert (it.new_min + < it.node->_min + it.node->_count); + { + generic_mtrie_t *node = + it.node->_next + .table[it.new_min - it.node->_min]; + zmq_assert (node); + free (it.node->_next.table); + it.node->_next.node = node; + } + it.node->_count = 1; + it.node->_min = it.new_min; + break; + default: + if (it.new_min > it.node->_min + || it.new_max < it.node->_min + + it.node->_count - 1) { + zmq_assert (it.new_max - it.new_min + 1 + > 1); + + generic_mtrie_t **old_table = + it.node->_next.table; + zmq_assert (it.new_min > it.node->_min + || it.new_max + < it.node->_min + + it.node->_count - 1); + zmq_assert (it.new_min >= it.node->_min); + zmq_assert (it.new_max + <= it.node->_min + + it.node->_count - 1); + zmq_assert (it.new_max - it.new_min + 1 + < it.node->_count); + + it.node->_count = + it.new_max - it.new_min + 1; + it.node->_next.table = + static_cast<generic_mtrie_t **> ( + malloc (sizeof (generic_mtrie_t *) + * it.node->_count)); + alloc_assert (it.node->_next.table); + + memmove (it.node->_next.table, + old_table + + (it.new_min - it.node->_min), + sizeof (generic_mtrie_t *) + * it.node->_count); + free (old_table); + + it.node->_min = it.new_min; + } + } + } + } + } + } + + free (buff); +} + +template <typename T> +typename generic_mtrie_t<T>::rm_result +generic_mtrie_t<T>::rm (prefix_t prefix_, size_t size_, value_t *pipe_) +{ + // This used to be implemented as a non-tail recursive travesal of the trie, + // which means remote clients controlled the depth of the recursion and the + // stack size. + // To simulate the non-tail recursion, with post-recursion changes depending on + // the result of the recursive call, a stack is used to re-visit the same node + // and operate on it again after children have been visisted. + // A boolean is used to record whether the node had already been visited and to + // determine if the pre- or post- children visit actions have to be taken. + rm_result ret = not_found; + std::list<struct iter> stack; + struct iter it = {this, NULL, prefix_, size_, 0, 0, 0, false}; + stack.push_back (it); + + while (!stack.empty ()) { + it = stack.back (); + stack.pop_back (); + + if (!it.processed_for_removal) { + if (!it.size) { + if (!it.node->_pipes) { + ret = not_found; + continue; + } + + typename pipes_t::size_type erased = + it.node->_pipes->erase (pipe_); + if (it.node->_pipes->empty ()) { + zmq_assert (erased == 1); + LIBZMQ_DELETE (it.node->_pipes); + ret = last_value_removed; + continue; + } + + ret = (erased == 1) ? values_remain : not_found; + continue; + } + + it.current_child = *it.prefix; + if (!it.node->_count || it.current_child < it.node->_min + || it.current_child >= it.node->_min + it.node->_count) { + ret = not_found; + continue; + } + + it.next_node = + it.node->_count == 1 + ? it.node->_next.node + : it.node->_next.table[it.current_child - it.node->_min]; + if (!it.next_node) { + ret = not_found; + continue; + } + + it.processed_for_removal = true; + stack.push_back (it); + struct iter next = { + it.next_node, NULL, it.prefix + 1, it.size - 1, 0, 0, 0, false}; + stack.push_back (next); + } else { + it.processed_for_removal = false; + + if (it.next_node->is_redundant ()) { + LIBZMQ_DELETE (it.next_node); + zmq_assert (it.node->_count > 0); + + if (it.node->_count == 1) { + it.node->_next.node = NULL; + it.node->_count = 0; + --it.node->_live_nodes; + zmq_assert (it.node->_live_nodes == 0); + } else { + it.node->_next.table[it.current_child - it.node->_min] = 0; + zmq_assert (it.node->_live_nodes > 1); + --it.node->_live_nodes; + + // Compact the table if possible + if (it.node->_live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + unsigned short i; + for (i = 0; i < it.node->_count; ++i) + if (it.node->_next.table[i]) + break; + + zmq_assert (i < it.node->_count); + it.node->_min += i; + it.node->_count = 1; + generic_mtrie_t *oldp = it.node->_next.table[i]; + free (it.node->_next.table); + it.node->_next.table = NULL; + it.node->_next.node = oldp; + } else if (it.current_child == it.node->_min) { + // We can compact the table "from the left" + unsigned short i; + for (i = 1; i < it.node->_count; ++i) + if (it.node->_next.table[i]) + break; + + zmq_assert (i < it.node->_count); + it.node->_min += i; + it.node->_count -= i; + generic_mtrie_t **old_table = it.node->_next.table; + it.node->_next.table = + static_cast<generic_mtrie_t **> (malloc ( + sizeof (generic_mtrie_t *) * it.node->_count)); + alloc_assert (it.node->_next.table); + memmove (it.node->_next.table, old_table + i, + sizeof (generic_mtrie_t *) * it.node->_count); + free (old_table); + } else if (it.current_child + == it.node->_min + it.node->_count - 1) { + // We can compact the table "from the right" + unsigned short i; + for (i = 1; i < it.node->_count; ++i) + if (it.node->_next.table[it.node->_count - 1 - i]) + break; + + zmq_assert (i < it.node->_count); + it.node->_count -= i; + generic_mtrie_t **old_table = it.node->_next.table; + it.node->_next.table = + static_cast<generic_mtrie_t **> (malloc ( + sizeof (generic_mtrie_t *) * it.node->_count)); + alloc_assert (it.node->_next.table); + memmove (it.node->_next.table, old_table, + sizeof (generic_mtrie_t *) * it.node->_count); + free (old_table); + } + } + } + } + } + + return ret; +} + +template <typename T> +template <typename Arg> +void generic_mtrie_t<T>::match (prefix_t data_, + size_t size_, + void (*func_) (value_t *pipe_, Arg arg_), + Arg arg_) +{ + for (generic_mtrie_t *current = this; current; data_++, size_--) { + // Signal the pipes attached to this node. + if (current->_pipes) { + for (typename pipes_t::iterator it = current->_pipes->begin (), + end = current->_pipes->end (); + it != end; ++it) { + func_ (*it, arg_); + } + } + + // If we are at the end of the message, there's nothing more to match. + if (!size_) + break; + + // If there are no subnodes in the trie, return. + if (current->_count == 0) + break; + + if (current->_count == 1) { + // If there's one subnode (optimisation). + if (data_[0] != current->_min) { + break; + } + current = current->_next.node; + } else { + // If there are multiple subnodes. + if (data_[0] < current->_min + || data_[0] >= current->_min + current->_count) { + break; + } + current = current->_next.table[data_[0] - current->_min]; + } + } +} + +template <typename T> bool generic_mtrie_t<T>::is_redundant () const +{ + return !_pipes && _live_nodes == 0; +} +} + + +#endif Index: zeromq-4.2.3/src/mtrie.hpp =================================================================== --- zeromq-4.2.3.orig/src/mtrie.hpp +++ zeromq-4.2.3/src/mtrie.hpp @@ -30,73 +30,23 @@ #ifndef __ZMQ_MTRIE_HPP_INCLUDED__ #define __ZMQ_MTRIE_HPP_INCLUDED__ -#include <stddef.h> -#include <set> +#include "generic_mtrie.hpp" -#include "stdint.hpp" +#if __cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER > 1600) +#define ZMQ_HAS_EXTERN_TEMPLATE 1 +#else +#define ZMQ_HAS_EXTERN_TEMPLATE 0 +#endif namespace zmq { +class pipe_t; - class pipe_t; - - // Multi-trie. Each node in the trie is a set of pointers to pipes. - - class mtrie_t - { - public: - - mtrie_t (); - ~mtrie_t (); - - // Add key to the trie. Returns true if it's a new subscription - // rather than a duplicate. - bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_); - - // Remove all subscriptions for a specific peer from the trie. - // The call_on_uniq_ flag controls if the callback is invoked - // when there are no subscriptions left on some topics or on - // every removal. - void rm (zmq::pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_, bool call_on_uniq_); - - // Remove specific subscription from the trie. Return true is it was - // actually removed rather than de-duplicated. - bool rm (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_); - - // Signal all the matching pipes. - void match (unsigned char *data_, size_t size_, - void (*func_) (zmq::pipe_t *pipe_, void *arg_), void *arg_); - - private: - - bool add_helper (unsigned char *prefix_, size_t size_, - zmq::pipe_t *pipe_); - void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_, bool call_on_uniq_); - bool rm_helper (unsigned char *prefix_, size_t size_, - zmq::pipe_t *pipe_); - bool is_redundant () const; - - typedef std::set <zmq::pipe_t*> pipes_t; - pipes_t *pipes; - - unsigned char min; - unsigned short count; - unsigned short live_nodes; - union { - class mtrie_t *node; - class mtrie_t **table; - } next; - - mtrie_t (const mtrie_t&); - const mtrie_t &operator = (const mtrie_t&); - }; +#if ZMQ_HAS_EXTERN_TEMPLATE +extern template class generic_mtrie_t<pipe_t>; +#endif +typedef generic_mtrie_t<pipe_t> mtrie_t; } #endif - Index: zeromq-4.2.3/src/macros.hpp =================================================================== --- zeromq-4.2.3.orig/src/macros.hpp +++ zeromq-4.2.3/src/macros.hpp @@ -10,3 +10,12 @@ } /******************************************************************************/ + + +#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname) \ + public: \ + classname (const classname &) = delete; \ + classname &operator= (const classname &) = delete; \ + classname (classname &&) = delete; \ + classname &operator= (classname &&) = delete; + Index: zeromq-4.2.3/src/mtrie.cpp =================================================================== --- zeromq-4.2.3.orig/src/mtrie.cpp +++ zeromq-4.2.3/src/mtrie.cpp @@ -28,407 +28,10 @@ */ #include "precompiled.hpp" -#include <stdlib.h> - -#include <new> -#include <algorithm> - -#include "err.hpp" -#include "pipe.hpp" -#include "macros.hpp" #include "mtrie.hpp" +#include "generic_mtrie_impl.hpp" -zmq::mtrie_t::mtrie_t () : - pipes (0), - min (0), - count (0), - live_nodes (0) -{ -} - -zmq::mtrie_t::~mtrie_t () -{ - LIBZMQ_DELETE(pipes); - - if (count == 1) { - zmq_assert (next.node); - LIBZMQ_DELETE(next.node); - } - else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) { - LIBZMQ_DELETE(next.table[i]); - } - free (next.table); - } -} - -bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return add_helper (prefix_, size_, pipe_); -} - -bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) { - pipes = new (std::nothrow) pipes_t; - alloc_assert (pipes); - } - pipes->insert (pipe_); - return result; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // characters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else - if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else - if (min < c) { - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc (next.table, - sizeof (mtrie_t*) * count); - alloc_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc (next.table, - sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - alloc_assert (next.node); - ++live_nodes; - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - alloc_assert (next.table [c - min]); - ++live_nodes; - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } -} - - -void zmq::mtrie_t::rm (pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_, bool call_on_uniq_) -{ - unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_); - free (buff); -} - -void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_, bool call_on_uniq_) -{ - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_)) { - if (!call_on_uniq_ || pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - } - - if (pipes->empty ()) { - LIBZMQ_DELETE(pipes); - } - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_, call_on_uniq_); - - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - LIBZMQ_DELETE(next.node); - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_, call_on_uniq_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - LIBZMQ_DELETE(next.table[c]); - - zmq_assert (live_nodes > 0); - --live_nodes; - } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; - } - } - } - - zmq_assert (count > 1); - - // Free the node table if it's no longer used. - if (live_nodes == 0) { - free (next.table); - next.table = NULL; - count = 0; - } - // Compact the node table if possible - else - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - zmq_assert (new_min == new_max); - zmq_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - zmq_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else - if (new_min > min || new_max < min + count - 1) { - zmq_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - zmq_assert (new_min > min || new_max < min + count - 1); - zmq_assert (new_min >= min); - zmq_assert (new_max <= min + count - 1); - zmq_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } -} - -bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - zmq_assert (erased == 1); - if (pipes->empty ()) { - LIBZMQ_DELETE(pipes); - } - } - return !pipes; - } - - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; - - mtrie_t *next_node = - count == 1 ? next.node : next.table [c - min]; - - if (!next_node) - return false; - - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); - - if (next_node->is_redundant ()) { - LIBZMQ_DELETE(next_node); - zmq_assert (count > 0); - - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - zmq_assert (live_nodes > 1); - --live_nodes; - - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - unsigned short i; - for (i = 0; i < count; ++i) - if (next.table [i]) - break; - - zmq_assert (i < count); - min += i; - count = 1; - mtrie_t *oldp = next.table [i]; - free (next.table); - next.node = oldp; - } - else - if (c == min) { - // We can compact the table "from the left" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table [i]) - break; - - zmq_assert (i < count); - min += i; - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table, old_table + i, sizeof (mtrie_t*) * count); - free (old_table); - } - else - if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table [count - 1 - i]) - break; - - zmq_assert (i < count); - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table, old_table, sizeof (mtrie_t*) * count); - free (old_table); - } - } - } - - return ret; -} - -void zmq::mtrie_t::match (unsigned char *data_, size_t size_, - void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) -{ - mtrie_t *current = this; - while (true) { - - // Signal the pipes attached to this node. - if (current->pipes) { - for (pipes_t::iterator it = current->pipes->begin (); - it != current->pipes->end (); ++it) - func_ (*it, arg_); - } - - // If we are at the end of the message, there's nothing more to match. - if (!size_) - break; - - // If there are no subnodes in the trie, return. - if (current->count == 0) - break; - - // If there's one subnode (optimisation). - if (current->count == 1) { - if (data_ [0] != current->min) - break; - current = current->next.node; - data_++; - size_--; - continue; - } - - // If there are multiple subnodes. - if (data_ [0] < current->min || data_ [0] >= - current->min + current->count) - break; - if (!current->next.table [data_ [0] - current->min]) - break; - current = current->next.table [data_ [0] - current->min]; - data_++; - size_--; - } -} - -bool zmq::mtrie_t::is_redundant () const +namespace zmq { - return !pipes && live_nodes == 0; +template class generic_mtrie_t<pipe_t>; } Index: zeromq-4.2.3/src/xpub.cpp =================================================================== --- zeromq-4.2.3.orig/src/xpub.cpp +++ zeromq-4.2.3/src/xpub.cpp @@ -35,6 +35,7 @@ #include "err.hpp" #include "msg.hpp" #include "macros.hpp" +#include "generic_mtrie_impl.hpp" zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), @@ -205,7 +206,7 @@ int zmq::xpub_t::xsetsockopt (int option return 0; } -static void stub (unsigned char *data_, size_t size_, void *arg_) +static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_) { LIBZMQ_UNUSED(data_); LIBZMQ_UNUSED(size_); @@ -222,7 +223,7 @@ void zmq::xpub_t::xpipe_terminated (pipe // Remove pipe without actually sending the message as it was taken // care of by the manual call above. subscriptions is the real mtrie, // so the pipe must be removed from there or it will be left over. - subscriptions.rm (pipe_, stub, NULL, false); + subscriptions.rm (pipe_, stub, static_cast<void*>(NULL), false); } else { @@ -235,9 +236,8 @@ void zmq::xpub_t::xpipe_terminated (pipe dist.pipe_terminated (pipe_); } -void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) +void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self) { - xpub_t *self = (xpub_t*) arg_; self->dist.match (pipe_); } @@ -317,11 +317,9 @@ bool zmq::xpub_t::xhas_in () return !pending_data.empty (); } -void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, - void *arg_) +void zmq::xpub_t::send_unsubscription (zmq::mtrie_t::prefix_t data_, size_t size_, + xpub_t *self) { - xpub_t *self = (xpub_t*) arg_; - if (self->options.type != ZMQ_PUB) { // Place the unsubscription to the queue of pending (un)subscriptions // to be retrieved by the user later on. Index: zeromq-4.2.3/src/xpub.hpp =================================================================== --- zeromq-4.2.3.orig/src/xpub.hpp +++ zeromq-4.2.3/src/xpub.hpp @@ -70,11 +70,11 @@ namespace zmq // Function to be applied to the trie to send all the subscriptions // upstream. - static void send_unsubscription (unsigned char *data_, size_t size_, - void *arg_); + static void send_unsubscription (zmq::mtrie_t::prefix_t data_, size_t size_, + xpub_t *self); // Function to be applied to each matching pipes. - static void mark_as_matching (zmq::pipe_t *pipe_, void *arg_); + static void mark_as_matching (zmq::pipe_t *pipe_, xpub_t *arg_); // List of all subscriptions mapped to corresponding pipes. mtrie_t subscriptions;
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor