Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:crameleon:LibertaCasa
python-iptables
python-iptables-1.0.0.obscpio
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File python-iptables-1.0.0.obscpio of Package python-iptables
07070100000000000081A40000000000000000000000015EE6923E000001DD000000000000000000000000000000000000002100000000python-iptables-1.0.0/.gitignore*.o *.so *.pyc *.pyo *.bak /build /doc/_build /.project /MANIFEST /dist /scripts /Vagrantfile /.vagrant /tags */__pycache__/ *.pyc /build/ /.pybuild/ *.egg-info # Debian build related files /debian/python-iptables/ /debian/python3-iptables/ /debian/python-iptables-dbg/ /debian/python3-iptables-dbg/ /debian/python-iptables-doc/ /debian/files /debian/outfile /debian/*.debhelper /debian/*.log /debian/*-stamp /debian/*.substvars # Added exclusion for PyCharm files .idea/* 07070100000001000081A40000000000000000000000015EE6923E00000108000000000000000000000000000000000000002200000000python-iptables-1.0.0/.travis.ymldist: xenial language: python python: - "2.7" - "3.4" - "3.5" - "3.6" - "3.7" install: - python setup.py build - python setup.py install - sudo pip install coveralls script: - sudo PATH=$PATH coverage run setup.py test after_success: coveralls 07070100000002000081A40000000000000000000000015EE6923E00000043000000000000000000000000000000000000002200000000python-iptables-1.0.0/MANIFEST.ininclude doc/* iptc/* libxtwrapper/* NOTICE README setup.py test.py 07070100000003000081A40000000000000000000000015EE6923E000029BD000000000000000000000000000000000000001D00000000python-iptables-1.0.0/NOTICE Copyright (c) 2010-, Vilmos Nebehaj and others Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS 07070100000004000081ED0000000000000000000000015EE6923E0000545A000000000000000000000000000000000000002000000000python-iptables-1.0.0/README.mdIntroduction ============ About python-iptables --------------------- **Iptables** is the tool that is used to manage **netfilter**, the standard packet filtering and manipulation framework under Linux. As the iptables manpage puts it: > Iptables is used to set up, maintain, and inspect the tables of IPv4 > packet filter rules in the Linux kernel. Several different tables may > be defined. > > Each table contains a number of built-in chains and may also contain > user- defined chains. > > Each chain is a list of rules which can match a set of packets. Each > rule specifies what to do with a packet that matches. This is called a > target, which may be a jump to a user-defined chain in the same table. `Python-iptables` provides a pythonesque wrapper via python bindings to iptables under Linux. Interoperability with iptables is achieved via using the iptables C libraries (`libiptc`, `libxtables`, and the iptables extensions), not calling the iptables binary and parsing its output. It is meant primarily for dynamic and/or complex routers and firewalls, where rules are often updated or changed, or Python programs wish to interface with the Linux iptables framework.. If you are looking for `ebtables` python bindings, check out [python-ebtables](https://github.com/ldx/python-ebtables/). `Python-iptables` supports Python 2.6, 2.7 and 3.4. [![Flattr](http://api.flattr.com/button/flattr-badge-large.png)](https://flattr.com/submit/auto?user_id=ldx&url=https%3A%2F%2Fgithub.com%2Fldx%2Fpython-iptables) [![Latest Release](https://img.shields.io/pypi/v/python-iptables.svg)](https://pypi.python.org/pypi/python-iptables) [![Build Status](https://travis-ci.org/ldx/python-iptables.png?branch=master)](https://travis-ci.org/ldx/python-iptables) [![Coverage Status](https://coveralls.io/repos/ldx/python-iptables/badge.svg?branch=codecoverage)](https://coveralls.io/r/ldx/python-iptables?branch=codecoverage) [![Code Health](https://landscape.io/github/ldx/python-iptables/codecoverage/landscape.svg)](https://landscape.io/github/ldx/python-iptables/codecoverage) [![Number of Downloads](https://img.shields.io/pypi/dm/python-iptables.svg)](https://pypi.python.org/pypi/python-iptables) [![License](https://img.shields.io/pypi/l/python-iptables.svg)](https://pypi.python.org/pypi/python-iptables) Installing via pip ------------------ The usual way: pip install --upgrade python-iptables Compiling from source --------------------- First make sure you have iptables installed (most Linux distributions install it by default). `Python-iptables` needs the shared libraries `libiptc.so` and `libxtables.so` coming with iptables, they are installed in `/lib` on Ubuntu. You can compile `python-iptables` in the usual distutils way: % cd python-iptables % python setup.py build If you like, `python-iptables` can also be installed into a `virtualenv`: % mkvirtualenv python-iptables % python setup.py install If you install `python-iptables` as a system package, make sure the directory where `distutils` installs shared libraries is in the dynamic linker's search path (it's in `/etc/ld.so.conf` or in one of the files in the folder `/etc/ld.co.conf.d`). Under Ubuntu `distutils` by default installs into `/usr/local/lib`. Now you can run the tests: % sudo PATH=$PATH python setup.py test WARNING: this test will manipulate iptables rules. Don't do this on a production machine. Would you like to continue? y/n y [...] The `PATH=$PATH` part is necessary after `sudo` if you have installed into a `virtualenv`, since `sudo` will reset your environment to a system setting otherwise.. Once everything is in place you can fire up python to check whether the package can be imported: % sudo PATH=$PATH python >>> import iptc >>> Of course you need to be root to be able to use iptables. Using a custom iptables install ------------------------------- If you are stuck on a system with an old version of `iptables`, you can install a more up to date version to a custom location, and ask `python-iptables` to use libraries at that location. To install `iptables` to `/tmp/iptables`: % git clone git://git.netfilter.org/iptables && cd iptables % ./autogen.sh % ./configure --prefix=/tmp/iptables % make % make install Make sure the dependencies `iptables` needs are installed. Now you can point `python-iptables` to this install path via: % sudo PATH=$PATH IPTABLES_LIBDIR=/tmp/iptables/lib XTABLES_LIBDIR=/tmp/iptables/lib/xtables python >>> import iptc >>> What is supported ----------------- The basic iptables framework and all the match/target extensions are supported by `python-iptables`, including IPv4 and IPv6 ones. All IPv4 and IPv6 tables are supported as well. Full documentation with API reference is available [here](http://ldx.github.com/python-iptables/). Examples ======== High level abstractions ----------------------- ``python-iptables`` implements a low-level interface that tries to closely match the underlying C libraries. The module ``iptc.easy`` improves the usability of the library by providing a rich set of high-level functions designed to simplify the interaction with the library, for example: >>> import iptc >>> iptc.easy.dump_table('nat', ipv6=False) {'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []} >>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False) [{'comment': {'comment': 'DNS traffic to Google'}, 'counters': (1, 56), 'dst': '8.8.8.8/32', 'protocol': 'udp', 'target': 'ACCEPT', 'udp': {'dport': '53'}}] >>> iptc.easy.add_chain('filter', 'TestChain') True >>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} >>> iptc.easy.insert_rule('filter', 'TestChain', rule_d) >>> iptc.easy.dump_chain('filter', 'TestChain') [{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}] >>> iptc.easy.delete_chain('filter', 'TestChain', flush=True) >>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto >>> iptc.easy.add_chain('filter', 'TestChainGoto') >>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}} >>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d) Rules ----- In `python-iptables`, you usually first create a rule, and set any source/destination address, in/out interface and protocol specifiers, for example: >>> import iptc >>> rule = iptc.Rule() >>> rule.in_interface = "eth0" >>> rule.src = "192.168.1.0/255.255.255.0" >>> rule.protocol = "tcp" This creates a rule that will match TCP packets coming in on eth0, with a source IP address of 192.168.1.0/255.255.255.0. A rule may contain matches and a target. A match is like a filter matching certain packet attributes, while a target tells what to do with the packet (drop it, accept it, transform it somehow, etc). One can create a match or target via a Rule: >>> rule = iptc.Rule() >>> m = rule.create_match("tcp") >>> t = rule.create_target("DROP") Match and target parameters can be changed after creating them. It is also perfectly valid to create a match or target via instantiating them with their constructor, but you still need a rule and you have to add the matches and the target to their rule manually: >>> rule = iptc.Rule() >>> match = iptc.Match(rule, "tcp") >>> target = iptc.Target(rule, "DROP") >>> rule.add_match(match) >>> rule.target = target Any parameters a match or target might take can be set via the attributes of the object. To set the destination port for a TCP match: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> match = rule.create_match("tcp") >>> match.dport = "80" To set up a rule that matches packets marked with 0xff: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> match = rule.create_match("mark") >>> match.mark = "0xff" Parameters are always strings. You can supply any string as the parameter value, but note that most extensions validate their parameters. For example this: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> rule.target = iptc.Target(rule, "ACCEPT") >>> match = iptc.Match(rule, "state") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> match.state = "RELATED,ESTABLISHED" >>> rule.add_match(match) >>> chain.insert_rule(rule) will work. However, if you change the state parameter: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> rule.target = iptc.Target(rule, "ACCEPT") >>> match = iptc.Match(rule, "state") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> match.state = "RELATED,ESTABLISHED,FOOBAR" >>> rule.add_match(match) >>> chain.insert_rule(rule) `python-iptables` will throw an exception: Traceback (most recent call last): File "state.py", line 7, in <module> match.state = "RELATED,ESTABLISHED,FOOBAR" File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 369, in __setattr__ self.parse(name.replace("_", "-"), value) File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 286, in parse self._parse(argv, inv, entry) File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 516, in _parse ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p))) File "/home/user/Projects/python-iptables/iptc/xtables.py", line 736, in new ret = fn(*args) File "/home/user/Projects/python-iptables/iptc/xtables.py", line 1031, in parse_match argv[1])) iptc.xtables.XTablesError: state: parameter error -2 (RELATED,ESTABLISHED,FOOBAR) Certain parameters take a string that optionally consists of multiple words. The comment match is a good example: >>> rule = iptc.Rule() >>> rule.src = "127.0.0.1" >>> rule.protocol = "udp" >>> rule.target = rule.create_target("ACCEPT") >>> match = rule.create_match("comment") >>> match.comment = "this is a test comment" >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) Note that this is still just one parameter value. However, when a match or a target takes multiple parameter values, that needs to be passed in as a list. Let's assume you have created and set up an `ipset` called `blacklist` via the `ipset` command. To create a rule with a match for this set: >>> rule = iptc.Rule() >>> m = rule.create_match("set") >>> m.match_set = ['blacklist', 'src'] Note how this time a list was used for the parameter value, since the `set` match `match_set` parameter expects two values. See the `iptables` manpages to find out what the extensions you use expect. See [ipset](http://ipset.netfilter.org/) for more information. When you are ready constructing your rule, add them to the chain you want it to show up in: >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) This will put your rule into the INPUT chain in the filter table. Chains and tables ----------------- You can of course also check what a rule's source/destination address, in/out inteface etc is. To print out all rules in the FILTER table: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> for chain in table.chains: >>> print "=======================" >>> print "Chain ", chain.name >>> for rule in chain.rules: >>> print "Rule", "proto:", rule.protocol, "src:", rule.src, "dst:", \ >>> rule.dst, "in:", rule.in_interface, "out:", rule.out_interface, >>> print "Matches:", >>> for match in rule.matches: >>> print match.name, >>> print "Target:", >>> print rule.target.name >>> print "=======================" As you see in the code snippet above, rules are organized into chains, and chains are in tables. You have a fixed set of tables; for IPv4: - `FILTER`, - `NAT`, - `MANGLE` and - `RAW`. For IPv6 the tables are: - `FILTER`, - `MANGLE`, - `RAW` and - `SECURITY`. To access a table: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> print table.name filter To create a new chain in the FILTER table: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = table.create_chain("testchain") $ sudo iptables -L -n [...] Chain testchain (0 references) target prot opt source destination To access an existing chain: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, "INPUT") >>> chain.name 'INPUT' >>> len(chain.rules) 10 >>> More about matches and targets ------------------------------ There are basic targets, such as `DROP` and `ACCEPT`. E.g. to reject packets with source address `127.0.0.1/255.0.0.0` coming in on any of the `eth` interfaces: >>> import iptc >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> rule = iptc.Rule() >>> rule.in_interface = "eth+" >>> rule.src = "127.0.0.1/255.0.0.0" >>> target = iptc.Target(rule, "DROP") >>> rule.target = target >>> chain.insert_rule(rule) To instantiate a target or match, we can either create an object like above, or use the `rule.create_target(target_name)` and `rule.create_match(match_name)` methods. For example, in the code above target could have been created as: >>> target = rule.create_target("DROP") instead of: >>> target = iptc.Target(rule, "DROP") >>> rule.target = target The former also adds the match or target to the rule, saving a call. Another example, using a target which takes parameters. Let's mark packets going to `192.168.1.2` UDP port `1234` with `0xffff`: >>> import iptc >>> chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE), "PREROUTING") >>> rule = iptc.Rule() >>> rule.dst = "192.168.1.2" >>> rule.protocol = "udp" >>> match = iptc.Match(rule, "udp") >>> match.dport = "1234" >>> rule.add_match(match) >>> target = iptc.Target(rule, "MARK") >>> target.set_mark = "0xffff" >>> rule.target = target >>> chain.insert_rule(rule) Matches are optional (specifying a target is mandatory). E.g. to insert a rule to NAT TCP packets going out via `eth0`: >>> import iptc >>> chain = iptc.Chain(iptc.Table(iptc.Table.NAT), "POSTROUTING") >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> rule.out_interface = "eth0" >>> target = iptc.Target(rule, "MASQUERADE") >>> target.to_ports = "1234" >>> rule.target = target >>> chain.insert_rule(rule) Here only the properties of the rule decide whether the rule will be applied to a packet. Matches are optional, but we can add multiple matches to a rule. In the following example we will do that, using the `iprange` and the `tcp` matches: >>> import iptc >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> match = iptc.Match(rule, "tcp") >>> match.dport = "22" >>> rule.add_match(match) >>> match = iptc.Match(rule, "iprange") >>> match.src_range = "192.168.1.100-192.168.1.200" >>> match.dst_range = "172.22.33.106" >>> rule.add_match(match) >>> rule.target = iptc.Target(rule, "DROP") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) This is the `python-iptables` equivalent of the following iptables command: # iptables -A INPUT -p tcp –destination-port 22 -m iprange –src-range 192.168.1.100-192.168.1.200 –dst-range 172.22.33.106 -j DROP You can of course negate matches, just like when you use `!` in front of a match with iptables. For example: >>> import iptc >>> rule = iptc.Rule() >>> match = iptc.Match(rule, "mac") >>> match.mac_source = "!00:11:22:33:44:55" >>> rule.add_match(match) >>> rule.target = iptc.Target(rule, "ACCEPT") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) This results in: $ sudo iptables -L -n Chain INPUT (policy ACCEPT) target prot opt source destination ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 MAC ! 00:11:22:33:44:55 Chain FORWARD (policy ACCEPT) target prot opt source destination Chain OUTPUT (policy ACCEPT) target prot opt source destination Counters -------- You can query rule and chain counters, e.g.: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes However, the counters are only refreshed when the underlying low-level iptables connection is refreshed in `Table` via `table.refresh()`. For example: >>> import time, sys >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes >>> print "Please send some traffic" >>> sys.stdout.flush() >>> time.sleep(3) >>> for rule in chain.rules: >>> # Here you will get back the same counter values as above >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes This will show you the same counter values even if there was traffic hitting your rules. You have to refresh your table to get update your counters: >>> import time, sys >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes >>> print "Please send some traffic" >>> sys.stdout.flush() >>> time.sleep(3) >>> table.refresh() # Here: refresh table to update rule counters >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes What is more, if you add: iptables -A OUTPUT -p tcp --sport 80 iptables -A OUTPUT -p tcp --sport 22 you can query rule and chain counters together with the protocol and sport(or dport), e.g.: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> for match in rule.matches: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes, match.name, match.sport Autocommit ---------- `Python-iptables` by default automatically performs an iptables commit after each operation. That is, after you add a rule in `python-iptables`, that will take effect immediately. It may happen that you want to batch together certain operations. A typical use case is traversing a chain and removing rules matching a specific criteria. If you do this with autocommit enabled, after the first delete operation, your chain's state will change and you have to restart the traversal. You can do something like this: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> removed = True >>> chain = iptc.Chain(table, "FORWARD") >>> while removed == True: >>> removed = False >>> for rule in chain.rules: >>> if rule.out_interface and "eth0" in rule.out_interface: >>> chain.delete_rule(rule) >>> removed = True >>> break This is clearly not ideal and the code is not very readable. An alternative is to disable autocommits, traverse the chain, removing one or more rules, than commit it: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> table.autocommit = False >>> chain = iptc.Chain(table, "FORWARD") >>> for rule in chain.rules: >>> if rule.out_interface and "eth0" in rule.out_interface: >>> chain.delete_rule(rule) >>> table.commit() >>> table.autocommit = True The drawback is that Table is a singleton, and if you disable autocommit, it will be disabled for all instances of that Table. Easy rules with dictionaries ---------------------------- To simplify operations with ``python-iptables`` rules we have included support to define and convert Rules object into python dictionaries. >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, "INPUT") >>> # Create an iptc.Rule object from dictionary >>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} >>> rule = iptc.easy.encode_iptc_rule(rule_d) >>> # Obtain a dictionary representation from the iptc.Rule >>> iptc.easy.decode_iptc_rule(rule) {'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match tcp.22'}, 'target': 'ACCEPT'} Known Issues ============ These issues are mainly caused by complex interaction with upstream's Netfilter implementation, and will require quite significant effort to fix. Workarounds are available. - The `hashlimit` match requires explicitly setting `hashlimit_htable_expire`. See [Issue \#201](https://github.com/ldx/python-iptables/issues/201). - The `NOTRACK` target is problematic; use `CT --notrack` instead. See [Issue \#204](https://github.com/ldx/python-iptables/issues/204). 07070100000005000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001D00000000python-iptables-1.0.0/debian07070100000006000081A40000000000000000000000015EE6923E000000B9000000000000000000000000000000000000002400000000python-iptables-1.0.0/debian/READMEThe Debian Package python-iptables ---------------------------- Comments regarding the Package -- Juliano Martinez <juliano.martinez@locaweb.com.br> Tue, 13 Dec 2011 10:10:44 -0200 07070100000007000081A40000000000000000000000015EE6923E000000D4000000000000000000000000000000000000002B00000000python-iptables-1.0.0/debian/README.Debianpython-iptables for Debian -------------------------- <possible notes regarding this package - if none, delete this file> -- Juliano Martinez <juliano.martinez@locaweb.com.br> Tue, 13 Dec 2011 10:10:44 -0200 07070100000008000081A40000000000000000000000015EE6923E000000D1000000000000000000000000000000000000002B00000000python-iptables-1.0.0/debian/README.sourcepython-iptables for Debian -------------------------- <this file describes information about the source package, see Debian policy manual section 4.14. You WILL either need to modify or delete this file> 07070100000009000081A40000000000000000000000015EE6923E0000020C000000000000000000000000000000000000002700000000python-iptables-1.0.0/debian/changelogpython-iptables (0.12.0) xenial; urgency=low * update debian changelog -- Dan Fuhry <df@datto.com> Fri, 11 Nov 2016 14:03:32 -0500 python-iptables (0.5-git-20140925) precise; urgency=low * update debian/ remove cdbs build python3 packages build debug packages -- Markus Kötter <koetter@rrzn.uni-hannover.de> Thu, 25 Sep 2014 10:48:41 +0200 python-iptables (0.1.1) unstable; urgency=low * Initial Release. -- Juliano Martinez <juliano.martinez@locaweb.com.br> Tue, 13 Dec 2011 10:10:44 -0200 0707010000000A000081A40000000000000000000000015EE6923E00000002000000000000000000000000000000000000002400000000python-iptables-1.0.0/debian/compat7 0707010000000B000081A40000000000000000000000015EE6923E0000067C000000000000000000000000000000000000002500000000python-iptables-1.0.0/debian/controlSource: python-iptables Section: net Priority: extra Maintainer: Juliano Martinez <juliano@martinez.io> Build-depends: python-all-dev (>= 2.7), python-all-dbg (>= 2.7), python3-all-dev (>= 3.2), python3-all-dbg, debhelper (>= 7) X-Python-Version: >= 2.7 X-Python3-Version: >= 3.2 Standards-Version: 3.8.4 Homepage: https://github.com/ldx/python-iptables Package: python-iptables Architecture: any Depends: ${python:Depends}, ${shlibs:Depends}, ${misc:Depends} Provides: ${python:Provides} Description: Python bindings for iptables Python-iptables is a Python project that provides bindings to the iptables C libraries in Linux. Interoperability with iptables is achieved using the iptables C libraries (libiptc, libxtables, and iptables extensions), not calling the iptables executable and parsing its output as most other iptables wrapper libraries do; this makes python-iptables faster and not prone to parsing errors, at the same time leveraging all available iptables match and target extensions without further work. Package: python-iptables-dbg Section: debug Priority: extra Architecture: any Depends: ${python:Depends}, ${shlibs:Depends}, ${misc:Depends}, python-iptables (= ${binary:Version}) Provides: ${python:Provides} Description: Python bindings for iptables Package: python3-iptables Architecture: any Depends: ${python3:Depends}, ${shlibs:Depends}, ${misc:Depends} Description: Python3 bindings for iptables Package: python3-iptables-dbg Section: debug Priority: extra Architecture: any Depends: ${python3:Depends}, ${shlibs:Depends}, ${misc:Depends}, python3-iptables (= ${binary:Version}) Description: Python3 bindings for iptables 0707010000000C000081A40000000000000000000000015EE6923E00000444000000000000000000000000000000000000002700000000python-iptables-1.0.0/debian/copyrightThis work was packaged for Debian by: Juliano Martinez <juliano.martinez@locaweb.com.br> on Tue, 13 Dec 2011 10:10:44 -0200 It was downloaded from: https://github.com/ldx/python-iptables Upstream Author(s): Vilmos Nebehaj -> https://github.com/ldx Copyright: Vilmos Nebehaj License: Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. On Debian systems, the complete text of the Apache version 2.0 license can be found in "/usr/share/common-licenses/Apache-2.0". The Debian packaging is: Copyright (C) 2011 Juliano Martinez a.k.a (ncode) <juliano@martinez.io> 0707010000000D000081A40000000000000000000000015EE6923E0000000A000000000000000000000000000000000000002200000000python-iptables-1.0.0/debian/docsREADME.md 0707010000000E000081A40000000000000000000000015EE6923E0000001A000000000000000000000000000000000000003700000000python-iptables-1.0.0/debian/python3-iptables.postinst#!/bin/sh /sbin/ldconfig 0707010000000F000081ED0000000000000000000000015EE6923E000005ED000000000000000000000000000000000000002300000000python-iptables-1.0.0/debian/rules#!/usr/bin/make -f # This file was automatically generated by stdeb 0.6.0+git at # Thu, 14 Nov 2013 16:04:50 +0100 PY3VERS := $(shell py3versions -r) PY2VERS := $(shell pyversions -r) %: dh $@ --with python2,python3 --buildsystem=python_distutils .PHONY: override_dh_clean override_dh_clean: rm -rf build/* dh_clean .PHONY: override_dh_auto_install override_dh_auto_install: set -e; \ for py in $(PY3VERS); do \ $$py -B setup.py install --root debian/python3-iptables --install-layout deb; \ $$py-dbg -B setup.py install --root debian/python3-iptables-dbg --install-layout deb; \ done # On Ubuntu 14.04 and possibly other versions, /usr/lib/python3/dist-packages is not # included in the system library search path, so we add it here. set -e; \ mkdir -p $(CURDIR)/debian/python3-iptables/etc/ld.so.conf.d; \ echo "/usr/lib/python3/dist-packages" > $(CURDIR)/debian/python3-iptables/etc/ld.so.conf.d/python3-dist-packages.conf set -e; \ for py in $(PY2VERS); do \ $$py -B setup.py install --root debian/python-iptables --install-layout deb; \ $$py-dbg -B setup.py install --root debian/python-iptables-dbg --install-layout deb; \ done .PHONY: override_dh_strip override_dh_strip: set -e; \ dh_strip -ppython-ev --dbg-package=python-iptables-dbg; \ dh_strip -ppython3-ev --dbg-package=python3-iptables-dbg; override_dh_python2: dh_python2 -ppython-iptables dh_python2 -ppython-iptables-dbg override_dh_python3: dh_python3 -ppython3-iptables dh_python3 -ppython3-iptables-dbg 07070100000010000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000002400000000python-iptables-1.0.0/debian/source07070100000011000081A40000000000000000000000015EE6923E0000000D000000000000000000000000000000000000002B00000000python-iptables-1.0.0/debian/source/format3.0 (native) 07070100000012000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001A00000000python-iptables-1.0.0/doc07070100000013000081A40000000000000000000000015EE6923E00000E14000000000000000000000000000000000000002300000000python-iptables-1.0.0/doc/Makefile# Makefile for Sphinx documentation # # You can set these variables from the command line. SPHINXOPTS = SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build # Internal variables. PAPEROPT_a4 = -D latex_paper_size=a4 PAPEROPT_letter = -D latex_paper_size=letter ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . GH_PAGES_SOURCES = ../doc ../iptc .PHONY: help clean html dirhtml pickle json htmlhelp qthelp latex changes linkcheck doctest help: @echo "Please use \`make <target>' where <target> is one of" @echo " html to make standalone HTML files" @echo " dirhtml to make HTML files named index.html in directories" @echo " pickle to make pickle files" @echo " json to make JSON files" @echo " htmlhelp to make HTML files and a HTML help project" @echo " qthelp to make HTML files and a qthelp project" @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" @echo " changes to make an overview of all changed/added/deprecated items" @echo " linkcheck to check all external links for integrity" @echo " doctest to run all doctests embedded in the documentation (if enabled)" clean: -rm -rf $(BUILDDIR)/* html: $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html @echo @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." dirhtml: $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml @echo @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." pickle: $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle @echo @echo "Build finished; now you can process the pickle files." json: $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json @echo @echo "Build finished; now you can process the JSON files." htmlhelp: $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp @echo @echo "Build finished; now you can run HTML Help Workshop with the" \ ".hhp project file in $(BUILDDIR)/htmlhelp." qthelp: $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp @echo @echo "Build finished; now you can run "qcollectiongenerator" with the" \ ".qhcp project file in $(BUILDDIR)/qthelp, like this:" @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/python-iptables.qhcp" @echo "To view the help file:" @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/python-iptables.qhc" latex: $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex @echo @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." @echo "Run \`make all-pdf' or \`make all-ps' in that directory to" \ "run these through (pdf)latex." changes: $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes @echo @echo "The overview file is in $(BUILDDIR)/changes." linkcheck: $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck @echo @echo "Link check complete; look for any errors in the above output " \ "or in $(BUILDDIR)/linkcheck/output.txt." doctest: $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest @echo "Testing of doctests in the sources finished, look at the " \ "results in $(BUILDDIR)/doctest/output.txt." gh-pages: git checkout gh-pages git checkout master $(GH_PAGES_SOURCES) make html && cp -rf _build/html/* ../ && make clean git add ../*.* ../_* git commit -m "Generated gh-pages for `git log master -1 --pretty=short --abbrev-commit`" && git push origin gh-pages git reset --hard git clean -f .. git checkout master markdown: intro.rst examples.rst pandoc -o ../README.md -f rst -t markdown intro.rst examples.rst 07070100000014000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000002500000000python-iptables-1.0.0/doc/_templates07070100000015000081A40000000000000000000000015EE6923E00000229000000000000000000000000000000000000003100000000python-iptables-1.0.0/doc/_templates/layout.html{% extends "!layout.html" %} {% block footer %} {{ super() }} <script type="text/javascript"> var _gaq = _gaq || []; _gaq.push(['_setAccount', 'UA-38352912-1']); _gaq.push(['_trackPageview']); (function() { var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); })(); </script> {% endblock %} 07070100000016000081A40000000000000000000000015EE6923E00001A1B000000000000000000000000000000000000002200000000python-iptables-1.0.0/doc/conf.py# -*- coding: utf-8 -*- # # python-iptables documentation build configuration file, created by # sphinx-quickstart on Wed Oct 27 21:54:51 2010. # # This file is execfile()d with the current directory set to its containing dir. # # Note that not all possible configuration values are present in this # autogenerated file. # # All configuration values have a default; values that are commented out # serve to show the default. import sys, os # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. #sys.path.append(os.path.abspath('.')) sys.path.append(os.path.abspath('../')) # -- General configuration ----------------------------------------------------- # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.intersphinx', 'sphinx.ext.todo', 'sphinx.ext.coverage', 'sphinx.ext.ifconfig'] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] # The suffix of source filenames. source_suffix = '.rst' # The encoding of source files. #source_encoding = 'utf-8' # The master toctree document. master_doc = 'index' # General information about the project. project = u'python-iptables' copyright = u'2010-2014, Vilmos Nebehaj' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. version = '0.4.0' # The full version, including alpha/beta/rc tags. release = '0.4.0-dev' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. #language = None # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: #today = '' # Else, today_fmt is used as the format for a strftime call. #today_fmt = '%B %d, %Y' # List of documents that shouldn't be included in the build. #unused_docs = [] # List of directories, relative to source directory, that shouldn't be searched # for source files. exclude_trees = ['_build'] # The reST default role (used for this markup: `text`) to use for all documents. #default_role = None # If true, '()' will be appended to :func: etc. cross-reference text. add_function_parentheses = True # If true, the current module name will be prepended to all description # unit titles (such as .. function::). #add_module_names = True # If true, sectionauthor and moduleauthor directives will be shown in the # output. They are ignored by default. #show_authors = False # The name of the Pygments (syntax highlighting) style to use. pygments_style = 'sphinx' # A list of ignored prefixes for module index sorting. #modindex_common_prefix = [] # -- Options for HTML output --------------------------------------------------- # The theme to use for HTML and HTML Help pages. Major themes that come with # Sphinx are currently 'default' and 'sphinxdoc'. html_theme = 'default' # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the # documentation. #html_theme_options = {} # Add any paths that contain custom themes here, relative to this directory. #html_theme_path = [] # The name for this set of Sphinx documents. If None, it defaults to # "<project> v<release> documentation". #html_title = None # A shorter title for the navigation bar. Default is the same as html_title. #html_short_title = None # The name of an image file (relative to this directory) to place at the top # of the sidebar. #html_logo = None # The name of an image file (within the static path) to use as favicon of the # docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 # pixels large. #html_favicon = None # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". html_static_path = ['_static'] # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. #html_last_updated_fmt = '%b %d, %Y' # If true, SmartyPants will be used to convert quotes and dashes to # typographically correct entities. #html_use_smartypants = True # Custom sidebar templates, maps document names to template names. #html_sidebars = {} # Additional templates that should be rendered to pages, maps page names to # template names. #html_additional_pages = {} # If false, no module index is generated. #html_use_modindex = True # If false, no index is generated. #html_use_index = True # If true, the index is split into individual pages for each letter. #html_split_index = False # If true, links to the reST sources are added to the pages. #html_show_sourcelink = True # If true, an OpenSearch description file will be output, and all pages will # contain a <link> tag referring to it. The value of this option must be the # base URL from which the finished HTML is served. #html_use_opensearch = '' # If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml"). #html_file_suffix = '' # Output file base name for HTML help builder. htmlhelp_basename = 'python-iptablesdoc' # -- Options for LaTeX output -------------------------------------------------- # The paper size ('letter' or 'a4'). #latex_paper_size = 'letter' # The font size ('10pt', '11pt' or '12pt'). #latex_font_size = '10pt' # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ ('index', 'python-iptables.tex', u'python-iptables Documentation', u'Vilmos Nebehaj', 'manual'), ] # The name of an image file (relative to this directory) to place at the top of # the title page. #latex_logo = None # For "manual" documents, if this is true, then toplevel headings are parts, # not chapters. #latex_use_parts = False # Additional stuff for the LaTeX preamble. #latex_preamble = '' # Documents to append as an appendix to all manuals. #latex_appendices = [] # If false, no module index is generated. #latex_use_modindex = True # Example configuration for intersphinx: refer to the Python standard library. intersphinx_mapping = {'http://docs.python.org/': None} autoclass_content="both" 07070100000017000081A40000000000000000000000015EE6923E00004195000000000000000000000000000000000000002700000000python-iptables-1.0.0/doc/examples.rstExamples ======== High level abstractions ----------------------- ``python-iptables`` implements a low-level interface that tries to closely match the underlying C libraries. The module ``iptc.easy`` improves the usability of the library by providing a rich set of high-level functions designed to simplify the interaction with the library, for example: >>> import iptc >>> iptc.easy.dump_table('nat', ipv6=False) {'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []} >>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False) [{'comment': {'comment': 'DNS traffic to Google'}, 'counters': (1, 56), 'dst': '8.8.8.8/32', 'protocol': 'udp', 'target': 'ACCEPT', 'udp': {'dport': '53'}}] >>> iptc.easy.add_chain('filter', 'TestChain') True >>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} >>> iptc.easy.insert_rule('filter', 'TestChain', rule_d) >>> iptc.easy.dump_chain('filter', 'TestChain') [{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}] >>> iptc.easy.delete_chain('filter', 'TestChain', flush=True) >>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto >>> iptc.easy.add_chain('filter', 'TestChainGoto') >>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}} >>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d) Rules ----- In ``python-iptables``, you usually first create a rule, and set any source/destination address, in/out interface and protocol specifiers, for example:: >>> import iptc >>> rule = iptc.Rule() >>> rule.in_interface = "eth0" >>> rule.src = "192.168.1.0/255.255.255.0" >>> rule.protocol = "tcp" This creates a rule that will match TCP packets coming in on eth0, with a source IP address of 192.168.1.0/255.255.255.0. A rule may contain matches and a target. A match is like a filter matching certain packet attributes, while a target tells what to do with the packet (drop it, accept it, transform it somehow, etc). One can create a match or target via a Rule:: >>> rule = iptc.Rule() >>> m = rule.create_match("tcp") >>> t = rule.create_target("DROP") Match and target parameters can be changed after creating them. It is also perfectly valid to create a match or target via instantiating them with their constructor, but you still need a rule and you have to add the matches and the target to their rule manually:: >>> rule = iptc.Rule() >>> match = iptc.Match(rule, "tcp") >>> target = iptc.Target(rule, "DROP") >>> rule.add_match(match) >>> rule.target = target Any parameters a match or target might take can be set via the attributes of the object. To set the destination port for a TCP match:: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> match = rule.create_match("tcp") >>> match.dport = "80" To set up a rule that matches packets marked with 0xff:: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> match = rule.create_match("mark") >>> match.mark = "0xff" Parameters are always strings. You can supply any string as the parameter value, but note that most extensions validate their parameters. For example this:: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> rule.target = iptc.Target(rule, "ACCEPT") >>> match = iptc.Match(rule, "state") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> match.state = "RELATED,ESTABLISHED" >>> rule.add_match(match) >>> chain.insert_rule(rule) will work. However, if you change the `state` parameter:: >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> rule.target = iptc.Target(rule, "ACCEPT") >>> match = iptc.Match(rule, "state") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> match.state = "RELATED,ESTABLISHED,FOOBAR" >>> rule.add_match(match) >>> chain.insert_rule(rule) ``python-iptables`` will throw an exception:: Traceback (most recent call last): File "state.py", line 7, in <module> match.state = "RELATED,ESTABLISHED,FOOBAR" File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 369, in __setattr__ self.parse(name.replace("_", "-"), value) File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 286, in parse self._parse(argv, inv, entry) File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 516, in _parse ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p))) File "/home/user/Projects/python-iptables/iptc/xtables.py", line 736, in new ret = fn(*args) File "/home/user/Projects/python-iptables/iptc/xtables.py", line 1031, in parse_match argv[1])) iptc.xtables.XTablesError: state: parameter error -2 (RELATED,ESTABLISHED,FOOBAR) Certain parameters take a string that optionally consists of multiple words. The comment match is a good example:: >>> rule = iptc.Rule() >>> rule.src = "127.0.0.1" >>> rule.protocol = "udp" >>> rule.target = rule.create_target("ACCEPT") >>> match = rule.create_match("comment") >>> match.comment = "this is a test comment" >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) Note that this is still just one parameter value. However, when a match or a target takes multiple parameter values, that needs to be passed in as a list. Let's assume you have created and set up an ``ipset`` called ``blacklist`` via the ``ipset`` command. To create a rule with a match for this set:: >>> rule = iptc.Rule() >>> m = rule.create_match("set") >>> m.match_set = ['blacklist', 'src'] Note how this time a list was used for the parameter value, since the ``set`` match ``match_set`` parameter expects two values. See the ``iptables`` manpages to find out what the extensions you use expect. See ipset_ for more information. .. _ipset: http://ipset.netfilter.org/ When you are ready constructing your rule, add them to the chain you want it to show up in:: >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) This will put your rule into the INPUT chain in the filter table. Chains and tables ----------------- You can of course also check what a rule's source/destination address, in/out inteface etc is. To print out all rules in the FILTER table:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> for chain in table.chains: >>> print "=======================" >>> print "Chain ", chain.name >>> for rule in chain.rules: >>> print "Rule", "proto:", rule.protocol, "src:", rule.src, "dst:", \ >>> rule.dst, "in:", rule.in_interface, "out:", rule.out_interface, >>> print "Matches:", >>> for match in rule.matches: >>> print match.name, >>> print "Target:", >>> print rule.target.name >>> print "=======================" As you see in the code snippet above, rules are organized into chains, and chains are in tables. You have a fixed set of tables; for IPv4: * ``FILTER``, * ``NAT``, * ``MANGLE`` and * ``RAW``. For IPv6 the tables are: * ``FILTER``, * ``MANGLE``, * ``RAW`` and * ``SECURITY``. To access a table:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> print table.name filter To create a new chain in the FILTER table:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = table.create_chain("testchain") $ sudo iptables -L -n [...] Chain testchain (0 references) target prot opt source destination To access an existing chain:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, "INPUT") >>> chain.name 'INPUT' >>> len(chain.rules) 10 >>> More about matches and targets ------------------------------ There are basic targets, such as ``DROP`` and ``ACCEPT``. E.g. to reject packets with source address ``127.0.0.1/255.0.0.0`` coming in on any of the ``eth`` interfaces:: >>> import iptc >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> rule = iptc.Rule() >>> rule.in_interface = "eth+" >>> rule.src = "127.0.0.1/255.0.0.0" >>> target = iptc.Target(rule, "DROP") >>> rule.target = target >>> chain.insert_rule(rule) To instantiate a target or match, we can either create an object like above, or use the ``rule.create_target(target_name)`` and ``rule.create_match(match_name)`` methods. For example, in the code above target could have been created as:: >>> target = rule.create_target("DROP") instead of:: >>> target = iptc.Target(rule, "DROP") >>> rule.target = target The former also adds the match or target to the rule, saving a call. Another example, using a target which takes parameters. Let's mark packets going to ``192.168.1.2`` UDP port ``1234`` with ``0xffff``:: >>> import iptc >>> chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE), "PREROUTING") >>> rule = iptc.Rule() >>> rule.dst = "192.168.1.2" >>> rule.protocol = "udp" >>> match = iptc.Match(rule, "udp") >>> match.dport = "1234" >>> rule.add_match(match) >>> target = iptc.Target(rule, "MARK") >>> target.set_mark = "0xffff" >>> rule.target = target >>> chain.insert_rule(rule) Matches are optional (specifying a target is mandatory). E.g. to insert a rule to NAT TCP packets going out via ``eth0``:: >>> import iptc >>> chain = iptc.Chain(iptc.Table(iptc.Table.NAT), "POSTROUTING") >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> rule.out_interface = "eth0" >>> target = iptc.Target(rule, "MASQUERADE") >>> target.to_ports = "1234" >>> rule.target = target >>> chain.insert_rule(rule) Here only the properties of the rule decide whether the rule will be applied to a packet. Matches are optional, but we can add multiple matches to a rule. In the following example we will do that, using the ``iprange`` and the ``tcp`` matches:: >>> import iptc >>> rule = iptc.Rule() >>> rule.protocol = "tcp" >>> match = iptc.Match(rule, "tcp") >>> match.dport = "22" >>> rule.add_match(match) >>> match = iptc.Match(rule, "iprange") >>> match.src_range = "192.168.1.100-192.168.1.200" >>> match.dst_range = "172.22.33.106" >>> rule.add_match(match) >>> rule.target = iptc.Target(rule, "DROP") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) This is the ``python-iptables`` equivalent of the following iptables command:: # iptables -A INPUT -p tcp –destination-port 22 -m iprange –src-range 192.168.1.100-192.168.1.200 –dst-range 172.22.33.106 -j DROP You can of course negate matches, just like when you use ``!`` in front of a match with iptables. For example:: >>> import iptc >>> rule = iptc.Rule() >>> match = iptc.Match(rule, "mac") >>> match.mac_source = "!00:11:22:33:44:55" >>> rule.add_match(match) >>> rule.target = iptc.Target(rule, "ACCEPT") >>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") >>> chain.insert_rule(rule) This results in:: $ sudo iptables -L -n Chain INPUT (policy ACCEPT) target prot opt source destination ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 MAC ! 00:11:22:33:44:55 Chain FORWARD (policy ACCEPT) target prot opt source destination Chain OUTPUT (policy ACCEPT) target prot opt source destination Counters -------- You can query rule and chain counters, e.g.:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes However, the counters are only refreshed when the underlying low-level iptables connection is refreshed in ``Table`` via ``table.refresh()``. For example:: >>> import time, sys >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes >>> print "Please send some traffic" >>> sys.stdout.flush() >>> time.sleep(3) >>> for rule in chain.rules: >>> # Here you will get back the same counter values as above >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes This will show you the same counter values even if there was traffic hitting your rules. You have to refresh your table to get update your counters:: >>> import time, sys >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes >>> print "Please send some traffic" >>> sys.stdout.flush() >>> time.sleep(3) >>> table.refresh() # Here: refresh table to update rule counters >>> for rule in chain.rules: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes What is more, if you add:: iptables -A OUTPUT -p tcp --sport 80 iptables -A OUTPUT -p tcp --sport 22 you can query rule and chain counters together with the protocol and sport(or dport), e.g.:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, 'OUTPUT') >>> for rule in chain.rules: >>> for match in rule.matches: >>> (packets, bytes) = rule.get_counters() >>> print packets, bytes, match.name, match.sport Autocommit ---------- ``Python-iptables`` by default automatically performs an iptables commit after each operation. That is, after you add a rule in ``python-iptables``, that will take effect immediately. It may happen that you want to batch together certain operations. A typical use case is traversing a chain and removing rules matching a specific criteria. If you do this with autocommit enabled, after the first delete operation, your chain's state will change and you have to restart the traversal. You can do something like this:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> removed = True >>> chain = iptc.Chain(table, "FORWARD") >>> while removed == True: >>> removed = False >>> for rule in chain.rules: >>> if rule.out_interface and "eth0" in rule.out_interface: >>> chain.delete_rule(rule) >>> removed = True >>> break This is clearly not ideal and the code is not very readable. An alternative is to disable autocommits, traverse the chain, removing one or more rules, than commit it:: >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> table.autocommit = False >>> chain = iptc.Chain(table, "FORWARD") >>> for rule in chain.rules: >>> if rule.out_interface and "eth0" in rule.out_interface: >>> chain.delete_rule(rule) >>> table.commit() >>> table.autocommit = True The drawback is that `Table` is a singleton, and if you disable autocommit, it will be disabled for all instances of that `Table`. Easy rules with dictionaries ---------------------------- To simplify operations with ``python-iptables`` rules we have included support to define and convert Rules object into python dictionaries. >>> import iptc >>> table = iptc.Table(iptc.Table.FILTER) >>> chain = iptc.Chain(table, "INPUT") >>> # Create an iptc.Rule object from dictionary >>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} >>> rule = iptc.easy.encode_iptc_rule(rule_d) >>> # Obtain a dictionary representation from the iptc.Rule >>> iptc.easy.decode_iptc_rule(rule) {'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match tcp.22'}, 'target': 'ACCEPT'} Known Issues ============ These issues are mainly caused by complex interaction with upstream's Netfilter implementation, and will require quite significant effort to fix. Workarounds are available. - The ``hashlimit`` match requires explicitly setting ``hashlimit_htable_expire``. See `Issue #201 <https://github.com/ldx/python-iptables/issues/201>`_. - The ``NOTRACK`` target is problematic; use ``CT --notrack`` instead. See `Issue #204 <https://github.com/ldx/python-iptables/issues/204>`_. 07070100000018000081A40000000000000000000000015EE6923E000001DF000000000000000000000000000000000000002400000000python-iptables-1.0.0/doc/index.rst.. python-iptables documentation master file, created by sphinx-quickstart on Wed Oct 27 21:54:51 2010. You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. Welcome to python-iptables's documentation! =========================================== Contents: .. toctree:: :maxdepth: 2 intro usage examples Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` 07070100000019000081A40000000000000000000000015EE6923E0000142A000000000000000000000000000000000000002400000000python-iptables-1.0.0/doc/intro.rstIntroduction ============ About python-iptables --------------------- **Iptables** is the tool that is used to manage **netfilter**, the standard packet filtering and manipulation framework under Linux. As the iptables manpage puts it: Iptables is used to set up, maintain, and inspect the tables of IPv4 packet filter rules in the Linux kernel. Several different tables may be defined. Each table contains a number of built-in chains and may also contain user- defined chains. Each chain is a list of rules which can match a set of packets. Each rule specifies what to do with a packet that matches. This is called a `target`, which may be a jump to a user-defined chain in the same table. ``Python-iptables`` provides a pythonesque wrapper via python bindings to iptables under Linux. Interoperability with iptables is achieved via using the iptables C libraries (``libiptc``, ``libxtables``, and the iptables extensions), not calling the iptables binary and parsing its output. It is meant primarily for dynamic and/or complex routers and firewalls, where rules are often updated or changed, or Python programs wish to interface with the Linux iptables framework.. If you are looking for ``ebtables`` python bindings, check out `python-ebtables <https://github.com/ldx/python-ebtables/>`_. ``Python-iptables`` supports Python 2.6, 2.7 and 3.4. .. image:: http://api.flattr.com/button/flattr-badge-large.png :target: https://flattr.com/submit/auto?user_id=ldx&url=https%3A%2F%2Fgithub.com%2Fldx%2Fpython-iptables :alt: Flattr .. image:: https://pypip.in/v/python-iptables/badge.png :target: https://pypi.python.org/pypi/python-iptables :alt: Latest Release .. image:: https://travis-ci.org/ldx/python-iptables.png?branch=master :target: https://travis-ci.org/ldx/python-iptables :alt: Build Status .. image:: https://coveralls.io/repos/ldx/python-iptables/badge.svg?branch=codecoverage :target: https://coveralls.io/r/ldx/python-iptables?branch=codecoverage :alt: Coverage Status .. image:: https://landscape.io/github/ldx/python-iptables/codecoverage/landscape.svg :target: https://landscape.io/github/ldx/python-iptables/codecoverage :alt: Code Health .. image:: https://pypip.in/d/python-iptables/badge.png :target: https://pypi.python.org/pypi/python-iptables :alt: Number of Downloads .. image:: https://pypip.in/license/python-iptables/badge.png :target: https://pypi.python.org/pypi/python-iptables :alt: License Installing via pip ------------------ The usual way:: pip install --upgrade python-iptables Compiling from source ---------------------- First make sure you have iptables installed (most Linux distributions install it by default). ``Python-iptables`` needs the shared libraries ``libiptc.so`` and ``libxtables.so`` coming with iptables, they are installed in ``/lib`` on Ubuntu. You can compile ``python-iptables`` in the usual distutils way:: % cd python-iptables % python setup.py build If you like, ``python-iptables`` can also be installed into a ``virtualenv``:: % mkvirtualenv python-iptables % python setup.py install If you install ``python-iptables`` as a system package, make sure the directory where ``distutils`` installs shared libraries is in the dynamic linker's search path (it's in ``/etc/ld.so.conf`` or in one of the files in the folder ``/etc/ld.co.conf.d``). Under Ubuntu ``distutils`` by default installs into ``/usr/local/lib``. Now you can run the tests:: % sudo PATH=$PATH python setup.py test WARNING: this test will manipulate iptables rules. Don't do this on a production machine. Would you like to continue? y/n y [...] The ``PATH=$PATH`` part is necessary after ``sudo`` if you have installed into a ``virtualenv``, since ``sudo`` will reset your environment to a system setting otherwise.. Once everything is in place you can fire up python to check whether the package can be imported:: % sudo PATH=$PATH python >>> import iptc >>> Of course you need to be root to be able to use iptables. Using a custom iptables install ------------------------------- If you are stuck on a system with an old version of ``iptables``, you can install a more up to date version to a custom location, and ask ``python-iptables`` to use libraries at that location. To install ``iptables`` to ``/tmp/iptables``:: % git clone git://git.netfilter.org/iptables && cd iptables % ./autogen.sh % ./configure --prefix=/tmp/iptables % make % make install Make sure the dependencies ``iptables`` needs are installed. Now you can point ``python-iptables`` to this install path via:: % sudo PATH=$PATH IPTABLES_LIBDIR=/tmp/iptables/lib XTABLES_LIBDIR=/tmp/iptables/lib/xtables python >>> import iptc >>> What is supported ----------------- The basic iptables framework and all the match/target extensions are supported by ``python-iptables``, including IPv4 and IPv6 ones. All IPv4 and IPv6 tables are supported as well. Full documentation with API reference is available here_. .. _here: http://ldx.github.com/python-iptables/ 0707010000001A000081A40000000000000000000000015EE6923E00000900000000000000000000000000000000000000002400000000python-iptables-1.0.0/doc/usage.rstUsage ===== The python API in ``python-iptables`` tries to mimic the logic of iptables. You have * **Tables**, **Table.FILTER**, **Table.NAT**, **Table.MANGLE** and **Table.RAW** for IPv4; **Table6.FILTER**, **Table6.SECURITY**, **Table6.MANGLE** and **Table6.RAW** for IPv6. They can be used to filter packets, do network address translation or modify packets in various ways. * **Chains** inside tables. Each table has a few built-in chains, but you can also create your own chains and jump into them from other chains. When you create your chains you should also specify which table it will be used in. **Chains** have **Policies**, which tell what to do when the end of a chain is reached. * Each chain has zero or more **rules**. A rule specifies what kind of packets to match (matches, each rule can have zero, one or more matches) and what to do with them (target, each rule has one of them). Iptables implements a plethora of match and target extensions. For IPv4, the class implementing this is called *Rule*, for IPv6 it is called *Rule6*. * **Matches**, specifying when a rule needs to be applied to a packet. To create a match object you also has to specify the rule to which it belongs. * **Targets**, specifying what to do when a rule is applied to a packet. To create a target object you also has to specify the rule to which it belongs. The python API is quite high-level and hides the low-level details from the user. Using only the classes *Table*, *Chain*, *Rule*, *Match* and *Target* virtually anything can be achieved that you can do with iptables from the command line. .. currentmodule:: iptc Table ----- .. autoclass:: Table :members: Table6 ------ .. autoclass:: Table6 :members: :inherited-members: Chain ----- .. autoclass:: Chain :members: Policy ------ .. autoclass:: Policy :members: Match ----- .. autoclass:: Match :members: :inherited-members: Target ------ .. autoclass:: Target :members: :inherited-members: Rule ---- .. autoclass:: Rule :members: :inherited-members: Rule6 ----- .. autoclass:: Rule6 :members: :inherited-members: IPTCError --------- .. autoexception:: IPTCError 0707010000001B000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001B00000000python-iptables-1.0.0/iptc0707010000001C000081A40000000000000000000000015EE6923E00000156000000000000000000000000000000000000002700000000python-iptables-1.0.0/iptc/__init__.py# -*- coding: utf-8 -*- """ .. module:: iptc :synopsis: Python bindings for libiptc. .. moduleauthor:: Vilmos Nebehaj """ from iptc.ip4tc import (is_table_available, Table, Chain, Rule, Match, Target, Policy, IPTCError) from iptc.ip6tc import is_table6_available, Table6, Rule6 from iptc.errors import * import iptc.easy __all__ = [] 0707010000001D000081A40000000000000000000000015EE6923E000043A4000000000000000000000000000000000000002300000000python-iptables-1.0.0/iptc/easy.py# -*- coding: utf-8 -*- # TODO: # - Add documentation # - Add HowToUse examples from .ip4tc import Rule, Table, Chain, IPTCError from .ip6tc import Rule6, Table6 _BATCH_MODE = False def flush_all(ipv6=False): """ Flush all available tables """ for table in get_tables(ipv6): flush_table(table, ipv6) def flush_table(table, ipv6=False, raise_exc=True): """ Flush a table """ try: iptc_table = _iptc_gettable(table, ipv6) iptc_table.flush() except Exception as e: if raise_exc: raise def flush_chain(table, chain, ipv6=False, raise_exc=True): """ Flush a chain in table """ try: iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_chain.flush() except Exception as e: if raise_exc: raise def zero_all(table, ipv6=False): """ Zero all tables """ for table in get_tables(ipv6): zero_table(table, ipv6) def zero_table(table, ipv6=False): """ Zero a table """ iptc_table = _iptc_gettable(table, ipv6) iptc_table.zero_entries() def zero_chain(table, chain, ipv6=False): """ Zero a chain in table """ iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_chain.zero_counters() def has_chain(table, chain, ipv6=False): """ Return True if chain exists in table False otherwise """ return _iptc_gettable(table, ipv6).is_chain(chain) def has_rule(table, chain, rule_d, ipv6=False): """ Return True if rule exists in chain False otherwise """ iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_rule = encode_iptc_rule(rule_d, ipv6) return iptc_rule in iptc_chain.rules def add_chain(table, chain, ipv6=False, raise_exc=True): """ Return True if chain was added successfully to a table, raise Exception otherwise """ try: iptc_table = _iptc_gettable(table, ipv6) iptc_table.create_chain(chain) return True except Exception as e: if raise_exc: raise return False def add_rule(table, chain, rule_d, position=0, ipv6=False): """ Add a rule to a chain in a given position. 0=append, 1=first, n=nth position """ iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_rule = encode_iptc_rule(rule_d, ipv6) if position == 0: # Insert rule in last position -> append iptc_chain.append_rule(iptc_rule) elif position > 0: # Insert rule in given position -> adjusted as iptables CLI iptc_chain.insert_rule(iptc_rule, position - 1) elif position < 0: # Insert rule in given position starting from bottom -> not available in iptables CLI nof_rules = len(iptc_chain.rules) _position = position + nof_rules # Insert at the top if the position has looped over if _position <= 0: _position = 0 iptc_chain.insert_rule(iptc_rule, _position) def insert_rule(table, chain, rule_d, ipv6=False): """ Add a rule to a chain in the 1st position """ add_rule(table, chain, rule_d, position=1, ipv6=ipv6) def delete_chain(table, chain, ipv6=False, flush=False, raise_exc=True): """ Delete a chain """ try: if flush: flush_chain(table, chain, ipv6, raise_exc) iptc_table = _iptc_gettable(table, ipv6) iptc_table.delete_chain(chain) except Exception as e: if raise_exc: raise def delete_rule(table, chain, rule_d, ipv6=False, raise_exc=True): """ Delete a rule from a chain """ try: iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_rule = encode_iptc_rule(rule_d, ipv6) iptc_chain.delete_rule(iptc_rule) except Exception as e: if raise_exc: raise def get_tables(ipv6=False): """ Get all tables """ iptc_tables = _iptc_gettables(ipv6) return [t.name for t in iptc_tables] def get_chains(table, ipv6=False): """ Return the existing chains of a table """ iptc_table = _iptc_gettable(table, ipv6) return [iptc_chain.name for iptc_chain in iptc_table.chains] def get_rule(table, chain, position=0, ipv6=False, raise_exc=True): """ Get a rule from a chain in a given position. 0=all rules, 1=first, n=nth position """ try: if position == 0: # Return all rules return dump_chain(table, chain, ipv6) elif position > 0: # Return specific rule by position iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_rule = iptc_chain.rules[position - 1] return decode_iptc_rule(iptc_rule, ipv6) elif position < 0: # Return last rule -> not available in iptables CLI iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_rule = iptc_chain.rules[position] return decode_iptc_rule(iptc_rule, ipv6) except Exception as e: if raise_exc: raise def replace_rule(table, chain, old_rule_d, new_rule_d, ipv6=False): """ Replaces an existing rule of a chain """ iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_old_rule = encode_iptc_rule(old_rule_d, ipv6) iptc_new_rule = encode_iptc_rule(new_rule_d, ipv6) iptc_chain.replace_rule(iptc_new_rule, iptc_chain.rules.index(iptc_old_rule)) def get_rule_counters(table, chain, rule_d, ipv6=False): """ Return a tuple with the rule counters (numberOfBytes, numberOfPackets) """ if not has_rule(table, chain, rule_d, ipv6): raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain, table, rule_d)) iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_rule = encode_iptc_rule(rule_d, ipv6) iptc_rule_index = iptc_chain.rules.index(iptc_rule) return iptc_chain.rules[iptc_rule_index].get_counters() def get_rule_position(table, chain, rule_d, ipv6=False): """ Return the position of a rule within a chain """ if not has_rule(table, chain, rule_d): raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain, table, rule_d)) iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_rule = encode_iptc_rule(rule_d, ipv6) return iptc_chain.rules.index(iptc_rule) def test_rule(rule_d, ipv6=False): """ Return True if the rule is a well-formed dictionary, False otherwise """ try: encode_iptc_rule(rule_d, ipv6) return True except: return False def test_match(name, value, ipv6=False): """ Return True if the match is valid, False otherwise """ try: iptc_rule = Rule6() if ipv6 else Rule() _iptc_setmatch(iptc_rule, name, value) return True except: return False def test_target(name, value, ipv6=False): """ Return True if the target is valid, False otherwise """ try: iptc_rule = Rule6() if ipv6 else Rule() _iptc_settarget(iptc_rule, {name:value}) return True except: return False def get_policy(table, chain, ipv6=False): """ Return the default policy of chain in a table """ iptc_chain = _iptc_getchain(table, chain, ipv6) return iptc_chain.get_policy().name def set_policy(table, chain, policy='ACCEPT', ipv6=False): """ Set the default policy of chain in a table """ iptc_chain = _iptc_getchain(table, chain, ipv6) iptc_chain.set_policy(policy) def dump_all(ipv6=False): """ Return a dictionary representation of all tables """ return {table: dump_table(table, ipv6) for table in get_tables(ipv6)} def dump_table(table, ipv6=False): """ Return a dictionary representation of a table """ return {chain: dump_chain(table, chain, ipv6) for chain in get_chains(table, ipv6)} def dump_chain(table, chain, ipv6=False): """ Return a list with the dictionary representation of the rules of a table """ iptc_chain = _iptc_getchain(table, chain, ipv6) return [decode_iptc_rule(iptc_rule, ipv6) for iptc_rule in iptc_chain.rules] def batch_begin(table = None, ipv6=False): """ Disable autocommit on a table """ _BATCH_MODE = True if table: tables = (table, ) else: tables = get_tables(ipv6) for table in tables: iptc_table = _iptc_gettable(table, ipv6) iptc_table.autocommit = False def batch_end(table = None, ipv6=False): """ Enable autocommit on table and commit changes """ _BATCH_MODE = False if table: tables = (table, ) else: tables = get_tables(ipv6) for table in tables: iptc_table = _iptc_gettable(table, ipv6) iptc_table.autocommit = True def batch_add_chains(table, chains, ipv6=False, flush=True): """ Add multiple chains to a table """ iptc_table = _batch_begin_table(table, ipv6) for chain in chains: if iptc_table.is_chain(chain): iptc_chain = Chain(iptc_table, chain) else: iptc_chain = iptc_table.create_chain(chain) if flush: iptc_chain.flush() _batch_end_table(table, ipv6) def batch_delete_chains(table, chains, ipv6=False): """ Delete multiple chains of a table """ iptc_table = _batch_begin_table(table, ipv6) for chain in chains: if iptc_table.is_chain(chain): iptc_chain = Chain(iptc_table, chain) iptc_chain.flush() iptc_table.delete_chain(chain) _batch_end_table(table, ipv6) def batch_add_rules(table, batch_rules, ipv6=False): """ Add multiple rules to a table with format (chain, rule_d, position) """ iptc_table = _batch_begin_table(table, ipv6) for (chain, rule_d, position) in batch_rules: iptc_chain = Chain(iptc_table, chain) iptc_rule = encode_iptc_rule(rule_d, ipv6) if position == 0: # Insert rule in last position -> append iptc_chain.append_rule(iptc_rule) elif position > 0: # Insert rule in given position -> adjusted as iptables CLI iptc_chain.insert_rule(iptc_rule, position-1) elif position < 0: # Insert rule in given position starting from bottom -> not available in iptables CLI nof_rules = len(iptc_chain.rules) iptc_chain.insert_rule(iptc_rule, position + nof_rules) _batch_end_table(table, ipv6) def batch_delete_rules(table, batch_rules, ipv6=False, raise_exc=True): """ Delete multiple rules from table with format (chain, rule_d) """ try: iptc_table = _batch_begin_table(table, ipv6) for (chain, rule_d) in batch_rules: iptc_chain = Chain(iptc_table, chain) iptc_rule = encode_iptc_rule(rule_d, ipv6) iptc_chain.delete_rule(iptc_rule) _batch_end_table(table, ipv6) except Exception as e: if raise_exc: raise def encode_iptc_rule(rule_d, ipv6=False): """ Return a Rule(6) object from the input dictionary """ # Sanity check assert(isinstance(rule_d, dict)) # Basic rule attributes rule_attr = ('src', 'dst', 'protocol', 'in-interface', 'out-interface', 'fragment') iptc_rule = Rule6() if ipv6 else Rule() # Set default target rule_d.setdefault('target', '') # Avoid issues with matches that require basic parameters to be configured first for name in rule_attr: if name in rule_d: setattr(iptc_rule, name.replace('-', '_'), rule_d[name]) for name, value in rule_d.items(): try: if name in rule_attr: continue elif name == 'counters': _iptc_setcounters(iptc_rule, value) elif name == 'target': _iptc_settarget(iptc_rule, value) else: _iptc_setmatch(iptc_rule, name, value) except Exception as e: #print('Ignoring unsupported field <{}:{}>'.format(name, value)) continue return iptc_rule def decode_iptc_rule(iptc_rule, ipv6=False): """ Return a dictionary representation of the Rule(6) object Note: host IP addresses are appended their corresponding CIDR """ d = {} if ipv6==False and iptc_rule.src != '0.0.0.0/0.0.0.0': _ip, _netmask = iptc_rule.src.split('/') _netmask = _netmask_v4_to_cidr(_netmask) d['src'] = '{}/{}'.format(_ip, _netmask) elif ipv6==True and iptc_rule.src != '::/0': d['src'] = iptc_rule.src if ipv6==False and iptc_rule.dst != '0.0.0.0/0.0.0.0': _ip, _netmask = iptc_rule.dst.split('/') _netmask = _netmask_v4_to_cidr(_netmask) d['dst'] = '{}/{}'.format(_ip, _netmask) elif ipv6==True and iptc_rule.dst != '::/0': d['dst'] = iptc_rule.dst if iptc_rule.protocol != 'ip': d['protocol'] = iptc_rule.protocol if iptc_rule.in_interface is not None: d['in-interface'] = iptc_rule.in_interface if iptc_rule.out_interface is not None: d['out-interface'] = iptc_rule.out_interface if ipv6 == False and iptc_rule.fragment: d['fragment'] = iptc_rule.fragment for m in iptc_rule.matches: if m.name not in d: d[m.name] = m.get_all_parameters() elif isinstance(d[m.name], list): d[m.name].append(m.get_all_parameters()) else: d[m.name] = [d[m.name], m.get_all_parameters()] if iptc_rule.target and iptc_rule.target.name and len(iptc_rule.target.get_all_parameters()): name = iptc_rule.target.name.replace('-', '_') d['target'] = {name:iptc_rule.target.get_all_parameters()} elif iptc_rule.target and iptc_rule.target.name: if iptc_rule.target.goto: d['target'] = {'goto':iptc_rule.target.name} else: d['target'] = iptc_rule.target.name # Get counters d['counters'] = iptc_rule.counters # Return a filtered dictionary return _filter_empty_field(d) ### INTERNAL FUNCTIONS ### def _iptc_table_available(table, ipv6=False): """ Return True if the table is available, False otherwise """ try: iptc_table = Table6(table) if ipv6 else Table(table) return True except: return False def _iptc_gettables(ipv6=False): """ Return an updated view of all available iptc_table """ iptc_cls = Table6 if ipv6 else Table return [_iptc_gettable(t, ipv6) for t in iptc_cls.ALL if _iptc_table_available(t, ipv6)] def _iptc_gettable(table, ipv6=False): """ Return an updated view of an iptc_table """ iptc_table = Table6(table) if ipv6 else Table(table) if _BATCH_MODE is False: iptc_table.commit() iptc_table.refresh() return iptc_table def _iptc_getchain(table, chain, ipv6=False, raise_exc=True): """ Return an iptc_chain of an updated table """ try: iptc_table = _iptc_gettable(table, ipv6) if not iptc_table.is_chain(chain): raise AttributeError('Table <{}> has no chain <{}>'.format(table, chain)) return Chain(iptc_table, chain) except Exception as e: if raise_exc: raise def _iptc_setcounters(iptc_rule, value): # Value is a tuple (numberOfBytes, numberOfPackets) iptc_rule.counters = value def _iptc_setmatch(iptc_rule, name, value): # Iterate list/tuple recursively if isinstance(value, list) or isinstance(value, tuple): for inner_value in value: _iptc_setmatch(iptc_rule, name, inner_value) # Assign dictionary value elif isinstance(value, dict): iptc_match = iptc_rule.create_match(name) [iptc_match.set_parameter(k, v) for k, v in value.items()] # Assign value directly else: iptc_match = iptc_rule.create_match(name) iptc_match.set_parameter(name, value) def _iptc_settarget(iptc_rule, value): # Target is dictionary - Use only 1st pair key/value if isinstance(value, dict): t_name, t_value = next(iter(value.items())) if t_name == 'goto': iptc_target = iptc_rule.create_target(t_value, goto=True) else: iptc_target = iptc_rule.create_target(t_name) [iptc_target.set_parameter(k, v) for k, v in t_value.items()] # Simple target else: iptc_target = iptc_rule.create_target(value) def _batch_begin_table(table, ipv6=False): """ Disable autocommit on a table """ iptc_table = _iptc_gettable(table, ipv6) iptc_table.autocommit = False return iptc_table def _batch_end_table(table, ipv6=False): """ Enable autocommit on table and commit changes """ iptc_table = _iptc_gettable(table, ipv6) iptc_table.autocommit = True return iptc_table def _filter_empty_field(data_d): """ Remove empty lists from dictionary values Before: {'target': {'CHECKSUM': {'checksum-fill': []}}} After: {'target': {'CHECKSUM': {'checksum-fill': ''}}} Before: {'tcp': {'dport': ['22']}}} After: {'tcp': {'dport': '22'}}} """ for k, v in data_d.items(): if isinstance(v, dict): data_d[k] = _filter_empty_field(v) elif isinstance(v, list) and len(v) != 0: v = [_filter_empty_field(_v) if isinstance(_v, dict) else _v for _v in v ] if isinstance(v, list) and len(v) == 1: data_d[k] = v.pop() elif isinstance(v, list) and len(v) == 0: data_d[k] = '' return data_d def _netmask_v4_to_cidr(netmask_addr): # Implement Subnet Mask conversion without dependencies return sum([bin(int(x)).count('1') for x in netmask_addr.split('.')]) ### /INTERNAL FUNCTIONS ### 0707010000001E000081A40000000000000000000000015EE6923E00000093000000000000000000000000000000000000002500000000python-iptables-1.0.0/iptc/errors.py# -*- coding: utf-8 -*- class XTablesError(Exception): """Raised when an xtables call fails for some reason.""" __all__ = ['XTablesError'] 0707010000001F000081A40000000000000000000000015EE6923E00010274000000000000000000000000000000000000002400000000python-iptables-1.0.0/iptc/ip4tc.py# -*- coding: utf-8 -*- import os import re import shlex import sys import ctypes as ct import socket import struct import weakref from .util import find_library, load_kernel, find_libc from .xtables import (XT_INV_PROTO, NFPROTO_IPV4, XTablesError, xtables, xt_align, xt_counters, xt_entry_target, xt_entry_match) __all__ = ["Table", "Chain", "Rule", "Match", "Target", "Policy", "IPTCError"] try: load_kernel("ip_tables") except: pass # Add IPPROTO_SCTP to socket module if not available if not hasattr(socket, 'IPPROTO_SCTP'): setattr(socket, 'IPPROTO_SCTP', 132) _IFNAMSIZ = 16 _libc = find_libc() _get_errno_loc = _libc.__errno_location _get_errno_loc.restype = ct.POINTER(ct.c_int) _malloc = _libc.malloc _malloc.restype = ct.POINTER(ct.c_ubyte) _malloc.argtypes = [ct.c_size_t] _free = _libc.free _free.restype = None _free.argtypes = [ct.POINTER(ct.c_ubyte)] # Make sure xt_params is set up. xtables(NFPROTO_IPV4) def is_table_available(name): try: Table(name) return True except IPTCError: pass return False class in_addr(ct.Structure): """This class is a representation of the C struct in_addr.""" _fields_ = [("s_addr", ct.c_uint32)] class ipt_ip(ct.Structure): """This class is a representation of the C struct ipt_ip.""" _fields_ = [("src", in_addr), ("dst", in_addr), ("smsk", in_addr), ("dmsk", in_addr), ("iniface", ct.c_char * _IFNAMSIZ), ("outiface", ct.c_char * _IFNAMSIZ), ("iniface_mask", ct.c_char * _IFNAMSIZ), ("outiface_mask", ct.c_char * _IFNAMSIZ), ("proto", ct.c_uint16), ("flags", ct.c_uint8), ("invflags", ct.c_uint8)] # flags IPT_F_FRAG = 0x01 # set if rule is a fragment rule IPT_F_GOTO = 0x02 # set if jump is a goto IPT_F_MASK = 0x03 # all possible flag bits mask # invflags IPT_INV_VIA_IN = 0x01 # invert the sense of IN IFACE IPT_INV_VIA_OUT = 0x02 # invert the sense of OUT IFACE IPT_INV_TOS = 0x04 # invert the sense of TOS IPT_INV_SRCIP = 0x08 # invert the sense of SRC IP IPT_INV_DSTIP = 0x10 # invert the sense of DST OP IPT_INV_FRAG = 0x20 # invert the sense of FRAG IPT_INV_PROTO = XT_INV_PROTO # invert the sense of PROTO (XT_INV_PROTO) IPT_INV_MASK = 0x7F # all possible flag bits mask def __init__(self): # default: full netmask self.smsk.s_addr = self.dmsk.s_addr = 0xffffffff class ipt_entry(ct.Structure): """This class is a representation of the C struct ipt_entry.""" _fields_ = [("ip", ipt_ip), ("nfcache", ct.c_uint), # mark with fields that we care about ("target_offset", ct.c_uint16), # size of ipt_entry + matches ("next_offset", ct.c_uint16), # size of e + matches + target ("comefrom", ct.c_uint), # back pointer ("counters", xt_counters), # packet and byte counters ("elems", ct.c_ubyte * 0)] # any matches then the target class IPTCError(Exception): """This exception is raised when a low-level libiptc error occurs. It contains a short description about the error that occurred while executing an iptables operation. """ _libiptc, _ = find_library("ip4tc", "iptc") # old iptables versions use iptc class iptc(object): """This class contains all libiptc API calls.""" iptc_init = _libiptc.iptc_init iptc_init.restype = ct.POINTER(ct.c_int) iptc_init.argstype = [ct.c_char_p] iptc_free = _libiptc.iptc_free iptc_free.restype = None iptc_free.argstype = [ct.c_void_p] iptc_commit = _libiptc.iptc_commit iptc_commit.restype = ct.c_int iptc_commit.argstype = [ct.c_void_p] iptc_builtin = _libiptc.iptc_builtin iptc_builtin.restype = ct.c_int iptc_builtin.argstype = [ct.c_char_p, ct.c_void_p] iptc_first_chain = _libiptc.iptc_first_chain iptc_first_chain.restype = ct.c_char_p iptc_first_chain.argstype = [ct.c_void_p] iptc_next_chain = _libiptc.iptc_next_chain iptc_next_chain.restype = ct.c_char_p iptc_next_chain.argstype = [ct.c_void_p] iptc_is_chain = _libiptc.iptc_is_chain iptc_is_chain.restype = ct.c_int iptc_is_chain.argstype = [ct.c_char_p, ct.c_void_p] iptc_create_chain = _libiptc.iptc_create_chain iptc_create_chain.restype = ct.c_int iptc_create_chain.argstype = [ct.c_char_p, ct.c_void_p] iptc_delete_chain = _libiptc.iptc_delete_chain iptc_delete_chain.restype = ct.c_int iptc_delete_chain.argstype = [ct.c_char_p, ct.c_void_p] iptc_rename_chain = _libiptc.iptc_rename_chain iptc_rename_chain.restype = ct.c_int iptc_rename_chain.argstype = [ct.c_char_p, ct.c_char_p, ct.c_void_p] iptc_flush_entries = _libiptc.iptc_flush_entries iptc_flush_entries.restype = ct.c_int iptc_flush_entries.argstype = [ct.c_char_p, ct.c_void_p] iptc_zero_entries = _libiptc.iptc_zero_entries iptc_zero_entries.restype = ct.c_int iptc_zero_entries.argstype = [ct.c_char_p, ct.c_void_p] # get the policy of a given built-in chain iptc_get_policy = _libiptc.iptc_get_policy iptc_get_policy.restype = ct.c_char_p iptc_get_policy.argstype = [ct.c_char_p, ct.POINTER(xt_counters), ct.c_void_p] # Set the policy of a chain iptc_set_policy = _libiptc.iptc_set_policy iptc_set_policy.restype = ct.c_int iptc_set_policy.argstype = [ct.c_char_p, ct.c_char_p, ct.POINTER(xt_counters), ct.c_void_p] # Get first rule in the given chain: NULL for empty chain. iptc_first_rule = _libiptc.iptc_first_rule iptc_first_rule.restype = ct.POINTER(ipt_entry) iptc_first_rule.argstype = [ct.c_char_p, ct.c_void_p] # Returns NULL when rules run out. iptc_next_rule = _libiptc.iptc_next_rule iptc_next_rule.restype = ct.POINTER(ipt_entry) iptc_next_rule.argstype = [ct.POINTER(ipt_entry), ct.c_void_p] # Returns a pointer to the target name of this entry. iptc_get_target = _libiptc.iptc_get_target iptc_get_target.restype = ct.c_char_p iptc_get_target.argstype = [ct.POINTER(ipt_entry), ct.c_void_p] # These functions return TRUE for OK or 0 and set errno. If errno == # 0, it means there was a version error (ie. upgrade libiptc). # Rule numbers start at 1 for the first rule. # Insert the entry `e' in chain `chain' into position `rulenum'. iptc_insert_entry = _libiptc.iptc_insert_entry iptc_insert_entry.restype = ct.c_int iptc_insert_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry), ct.c_int, ct.c_void_p] # Atomically replace rule `rulenum' in `chain' with `e'. iptc_replace_entry = _libiptc.iptc_replace_entry iptc_replace_entry.restype = ct.c_int iptc_replace_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry), ct.c_int, ct.c_void_p] # Append entry `e' to chain `chain'. Equivalent to insert with # rulenum = length of chain. iptc_append_entry = _libiptc.iptc_append_entry iptc_append_entry.restype = ct.c_int iptc_append_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry), ct.c_void_p] # Delete the first rule in `chain' which matches `e', subject to # matchmask (array of length == origfw) iptc_delete_entry = _libiptc.iptc_delete_entry iptc_delete_entry.restype = ct.c_int iptc_delete_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry), ct.POINTER(ct.c_ubyte), ct.c_void_p] # Delete the rule in position `rulenum' in `chain'. iptc_delete_num_entry = _libiptc.iptc_delete_num_entry iptc_delete_num_entry.restype = ct.c_int iptc_delete_num_entry.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p] # Check the packet `e' on chain `chain'. Returns the verdict, or # NULL and sets errno. # iptc_check_packet = _libiptc.iptc_check_packet # iptc_check_packet.restype = ct.c_char_p # iptc_check_packet.argstype = [ct.c_char_p, ct.POINTER(ipt), ct.c_void_p] # Get the number of references to this chain iptc_get_references = _libiptc.iptc_get_references iptc_get_references.restype = ct.c_int iptc_get_references.argstype = [ct.c_uint, ct.c_char_p, ct.c_void_p] # read packet and byte counters for a specific rule iptc_read_counter = _libiptc.iptc_read_counter iptc_read_counter.restype = ct.POINTER(xt_counters) iptc_read_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p] # zero packet and byte counters for a specific rule iptc_zero_counter = _libiptc.iptc_zero_counter iptc_zero_counter.restype = ct.c_int iptc_zero_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p] # set packet and byte counters for a specific rule iptc_set_counter = _libiptc.iptc_set_counter iptc_set_counter.restype = ct.c_int iptc_set_counter.argstype = [ct.c_char_p, ct.c_uint, ct.POINTER(xt_counters), ct.c_void_p] # Translates errno numbers into more human-readable form than strerror. iptc_strerror = _libiptc.iptc_strerror iptc_strerror.restype = ct.c_char_p iptc_strerror.argstype = [ct.c_int] class IPTCModule(object): """Superclass for Match and Target.""" pattern = re.compile( '\s*(!)?\s*--([-\w]+)\s+(!)?\s*"?([^"]*?)"?(?=\s*(?:!?\s*--|$))') def __init__(self): self._name = None self._rule = None self._module = None self._revision = None self._ptr = None self._ptrptr = None raise NotImplementedError() def set_parameter(self, parameter, value=None): """ Set a parameter for target or match extension, with an optional value. @param parameter: name of the parameter to set @type parameter: C{str} @param value: optional value of the parameter, defaults to C{None} @type value: C{str} or a C{list} of C{str} """ if value is None: value = "" return self.parse(parameter.replace("_", "-"), value) def parse(self, parameter, value): # Parameter name must always be a string. parameter = parameter.encode() # Check if we are dealing with an inverted parameter value. inv = ct.c_int(0) if len(value) > 0 and value[0] == "!": inv = ct.c_int(1) value = value[1:] # Value can be either a string, or a list of strings, e.g. "8888", # "!0:65535" or ["!", "example_set", "dst"]. args = [] is_str = isinstance(value, str) try: if not is_str: is_str = isinstance(value, unicode) except: pass if is_str: args = [value.encode()] else: try: args = [val.encode() for val in value] except: raise TypeError("Invalid parameter value: " "must be string or list of strings") if not self._module.extra_opts and not self._module.x6_options: raise AttributeError("%s: invalid parameter %s" % (self._module.name, parameter)) parameter = parameter.strip() N = len(args) argv = (ct.c_char_p * (N + 1))() argv[0] = parameter for i in range(N): argv[i + 1] = args[i] entry = self._rule.entry and ct.pointer(self._rule.entry) or None self._parse(argv, inv, entry) def _parse(self, argv, inv, entry): raise NotImplementedError() def final_check(self): if self._module: self._update_parameters() self._final_check() # subclasses override this def _final_check(self): raise NotImplementedError() def _get_saved_buf(self, ip): if not self._module or not self._module.save: return None # redirect C stdout to a pipe and read back the output of m->save # Flush stdout to avoid getting buffered results sys.stdout.flush() # Save the current C stdout. stdout = os.dup(1) try: # Create a pipe and use the write end to replace the original C # stdout. pipes = os.pipe() os.dup2(pipes[1], 1) self._xt.save(self._module, ip, self._ptr) # Use the read end to read whatever was written. buf = os.read(pipes[0], 1024) # Clean up the pipe. os.close(pipes[0]) os.close(pipes[1]) return buf finally: # Put the original C stdout back in place. os.dup2(stdout, 1) # Clean up the copy we made. os.close(stdout) def save(self, name): return self._save(name, self.rule.get_ip()) def _save(self, name, ip): buf = self._get_saved_buf(ip).decode() if buf is None: return None if not self._module or not self._module.save: return None if name: return self._get_value(buf, name) else: return self._get_all_values(buf) def _get_all_values(self, buf): table = {} # variable -> (value, inverted) res = re.findall(IPTCModule.pattern, buf) for x in res: value, invert = (x[3], x[0] or x[2]) table[x[1].replace("-", "_")] = "%s%s" % (invert and "!" or "", value) return table def _get_value(self, buf, name): table = {} # variable -> (value, inverted) res = re.findall(IPTCModule.pattern, buf) for x in res: table[x[1]] = (x[3], x[0] or x[2]) try: value, invert = table[name] return "%s%s" % (invert and "!" or "", value) except KeyError: return None def get_all_parameters(self): params = {} ip = self.rule.get_ip() buf = self._get_saved_buf(ip) if buf is None: return params if type(buf) != str: # In Python3, string and bytes are different types. buf = buf.decode() res = shlex.split(buf) res.reverse() inv = False key = None while len(res) > 0: x = res.pop() if x == '!': # Next parameter is negated. inv = True continue if x.startswith('--'): # This is a parameter name. key = x[2:] if inv: params[key] = ['!'] else: params[key] = [] inv = False continue # At this point key should be set, unless the output from save is # not formatted right. Let's be defensive, since some users # reported that problem. if key is not None: params[key].append(x) # This is a parameter value. return params def _update_parameters(self): params = self.get_all_parameters().items() self.reset() for k, v in params: self.set_parameter(k, v) def _get_alias_name(self): if not self._module or not self._ptr: return None alias = getattr(self._module, 'alias', None) if not alias: return None return self._module.alias(self._ptr).decode() def __setattr__(self, name, value): if not name.startswith('_') and name not in dir(self): self.parse(name.replace("_", "-"), value) else: object.__setattr__(self, name, value) def __getattr__(self, name): if not name.startswith('_'): return self.save(name.replace("_", "-")) def _get_parameters(self): return self.save(None) parameters = property(_get_parameters) """Dictionary with all parameters in the form of name -> value. A match or target might have default parameters as well, so this dictionary will contain those set by the module by default too.""" def _get_name(self): alias = self._get_alias_name() return alias and alias or self._name name = property(_get_name) """Name of this target or match.""" def _get_rule(self): return self._rule def _set_rule(self, rule): self._rule = rule rule = property(_get_rule, _set_rule) """The rule this target or match belong to.""" class _Buffer(object): def __init__(self, size=0): if size > 0: self.buffer = _malloc(size) if self.buffer is None: raise Exception("Can't allocate buffer") else: self.buffer = None def __del__(self): if self.buffer is not None: _free(self.buffer) class Match(IPTCModule): """Matches are extensions which can match for special header fields or other attributes of a packet. Target and match extensions in iptables have parameters. These parameters are implemented as instance attributes in python. However, to make the names of parameters legal attribute names they have to be converted. The rule is to cut the leading double dash from the name, and replace dashes in parameter names with underscores so they are accepted by python as attribute names. E.g. the *TOS* target has parameters *--set-tos*, *--and-tos*, *--or-tos* and *--xor-tos*; they become *target.set_tos*, *target.and_tos*, *target.or_tos* and *target.xor_tos*, respectively. The value of a parameter is always a string, if a parameter does not take any value in the iptables extension, an empty string *""* should be used. """ def __init__(self, rule, name=None, match=None, revision=None): """ *rule* is the Rule object this match belongs to; it can be changed later via *set_rule()*. *name* is the name of the iptables match extension (in lower case), *match* is the raw buffer of the match structure if the caller has it. Either *name* or *match* must be provided. *revision* is the revision number of the extension that should be used; different revisions use different structures in C and they usually only work with certain kernel versions. Python-iptables by default will use the latest revision available. """ if not name and not match: raise ValueError("can't create match based on nothing") if not name: name = match.u.user.name.decode() self._name = name self._rule = rule self._orig_parse = None self._orig_options = None self._xt = xtables(rule.nfproto) module = self._xt.find_match(name) real_name = module and getattr(module[0], 'real_name', None) or None if real_name: # Alias name, look up real module. self._name = real_name.decode() self._orig_parse = getattr(module[0], 'x6_parse', None) self._orig_options = getattr(module[0], 'x6_options', None) module = self._xt.find_match(real_name) if not module: raise XTablesError("can't find match %s" % (name)) self._module = module[0] self._module.mflags = 0 if revision is not None: self._revision = revision else: self._revision = self._module.revision self._match_buf = (ct.c_ubyte * self.size)() if match: ct.memmove(ct.byref(self._match_buf), ct.byref(match), self.size) self._update_pointers() self._check_alias() else: self.reset() def _check_alias(self): name = self._get_alias_name() if name is None: return alias_module = self._xt.find_match(name) if alias_module is None: return self._alias_module = alias_module[0] self._orig_parse = getattr(self._alias_module, 'x6_parse', None) self._orig_options = getattr(self._alias_module, 'x6_options', None) def __eq__(self, match): basesz = ct.sizeof(xt_entry_match) if (self.name == match.name and self.match_buf[basesz:self.usersize] == match.match_buf[basesz:match.usersize]): return True return False def __hash__(self): return (hash(self.match.u.match_size) ^ hash(self.match.u.user.name) ^ hash(self.match.u.user.revision) ^ hash(bytes(self.match_buf))) def __ne__(self, match): return not self.__eq__(match) def _final_check(self): self._xt.final_check_match(self._module) def _parse(self, argv, inv, entry): self._xt.parse_match(argv, inv, self._module, entry, ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p)), self._orig_parse, self._orig_options) def _get_size(self): return xt_align(self._module.size + ct.sizeof(xt_entry_match)) size = property(_get_size) """This is the full size of the underlying C structure.""" def _get_user_size(self): return self._module.userspacesize + ct.sizeof(xt_entry_match) usersize = property(_get_user_size) """This is the size of the part of the underlying C structure that is used in userspace.""" def _update_pointers(self): self._ptr = ct.cast(ct.byref(self._match_buf), ct.POINTER(xt_entry_match)) self._ptrptr = ct.cast(ct.pointer(self._ptr), ct.POINTER(ct.POINTER(xt_entry_match))) self._module.m = self._ptr self._update_name() def _update_name(self): m = self._ptr[0] m.u.user.name = self.name.encode() def reset(self): """Reset the match. Parameters are set to their default values, any flags are cleared.""" ct.memset(ct.byref(self._match_buf), 0, self.size) self._update_pointers() m = self._ptr[0] m.u.match_size = self.size m.u.user.revision = self._revision if self._module.init: self._module.init(self._ptr) self._module.mflags = 0 udata_size = getattr(self._module, 'udata_size', 0) if udata_size > 0: udata_buf = (ct.c_ubyte * udata_size)() self._module.udata = ct.cast(ct.byref(udata_buf), ct.c_void_p) def _get_match(self): return ct.cast(ct.byref(self.match_buf), ct.POINTER(xt_entry_match))[0] match = property(_get_match) """This is the C structure used by the extension.""" def _get_match_buf(self): return self._match_buf match_buf = property(_get_match_buf) """This is the buffer holding the C structure used by the extension.""" class Target(IPTCModule): """Targets specify what to do with a packet when a match is found while traversing the list of rule entries in a chain. Target and match extensions in iptables have parameters. These parameters are implemented as instance attributes in python. However, to make the names of parameters legal attribute names they have to be converted. The rule is to cut the leading double dash from the name, and replace dashes in parameter names with underscores so they are accepted by python as attribute names. E.g. the *TOS* target has parameters *--set-tos*, *--and-tos*, *--or-tos* and *--xor-tos*; they become *target.set_tos*, *target.and_tos*, *target.or_tos* and *target.xor_tos*, respectively. The value of a parameter is always a string, if a parameter does not take any value in the iptables extension, an empty string i.e. "" should be used. """ def __init__(self, rule, name=None, target=None, revision=None, goto=None): """ *rule* is the Rule object this match belongs to; it can be changed later via *set_rule()*. *name* is the name of the iptables target extension (in upper case), *target* is the raw buffer of the target structure if the caller has it. Either *name* or *target* must be provided. *revision* is the revision number of the extension that should be used; different revisions use different structures in C and they usually only work with certain kernel versions. Python-iptables by default will use the latest revision available. If goto is True, then it converts '-j' to '-g'. """ if name is None and target is None: raise ValueError("can't create target based on nothing") if name is None: name = target.u.user.name.decode() self._name = name self._rule = rule self._orig_parse = None self._orig_options = None # NOTE: # get_ip() returns the 'ip' structure that contains (1)the 'flags' field, and # (2)the value for the GOTO flag. # We *must* use get_ip() because the actual name of the field containing the # structure apparently differs between implementation ipstruct = rule.get_ip() f_goto_attrs = [a for a in dir(ipstruct) if a.endswith('_F_GOTO')] if len(f_goto_attrs) == 0: raise RuntimeError('What kind of struct is this? It does not have "*_F_GOTO" constant!') _F_GOTO = getattr(ipstruct, f_goto_attrs[0]) if target is not None or goto is None: # We are 'decoding' existing Target self._goto = bool(ipstruct.flags & _F_GOTO) if goto is not None: assert isinstance(goto, bool) self._goto = goto if goto: ipstruct.flags |= _F_GOTO else: ipstruct.flags &= ~_F_GOTO self._xt = xtables(rule.nfproto) module = (self._is_standard_target() and self._xt.find_target('') or self._xt.find_target(name)) real_name = module and getattr(module[0], 'real_name', None) or None if real_name: # Alias name, look up real module. self._name = real_name.decode() self._orig_parse = getattr(module[0], 'x6_parse', None) self._orig_options = getattr(module[0], 'x6_options', None) module = self._xt.find_target(real_name) if not module: raise XTablesError("can't find target %s" % (name)) self._module = module[0] self._module.tflags = 0 if revision is not None: self._revision = revision else: self._revision = self._module.revision self._create_buffer(target) if self._is_standard_target(): self.standard_target = name elif target: self._check_alias() def _check_alias(self): name = self._get_alias_name() if name is None: return alias_module = self._xt.find_target(name) if alias_module is None: return self._alias_module = alias_module[0] self._orig_parse = getattr(self._alias_module, 'x6_parse', None) self._orig_options = getattr(self._alias_module, 'x6_options', None) def __eq__(self, targ): basesz = ct.sizeof(xt_entry_target) if (self.target.u.target_size != targ.target.u.target_size or self.target.u.user.name != targ.target.u.user.name or self.target.u.user.revision != targ.target.u.user.revision): return False if (self.target.u.user.name == b"" or self.target.u.user.name == b"standard" or self.target.u.user.name == b"ACCEPT" or self.target.u.user.name == b"DROP" or self.target.u.user.name == b"RETURN" or self.target.u.user.name == b"ERROR" or self._is_standard_target()): return True if (self._target_buf[basesz:self.usersize] == targ._target_buf[basesz:targ.usersize]): return True return False def __ne__(self, target): return not self.__eq__(target) def _create_buffer(self, target): self._buffer = _Buffer(self.size) self._target_buf = self._buffer.buffer if target: ct.memmove(self._target_buf, ct.byref(target), self.size) self._update_pointers() else: self.reset() def _is_standard_target(self): for t in self._rule.tables: if t.is_chain(self._name): return True return False def _final_check(self): self._xt.final_check_target(self._module) def _parse(self, argv, inv, entry): self._xt.parse_target(argv, inv, self._module, entry, ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p)), self._orig_parse, self._orig_options) self._target_buf = ct.cast(self._module.t, ct.POINTER(ct.c_ubyte)) if self._buffer.buffer != self._target_buf: self._buffer.buffer = self._target_buf self._update_pointers() def _get_size(self): return xt_align(self._module.size + ct.sizeof(xt_entry_target)) size = property(_get_size) """This is the full size of the underlying C structure.""" def _get_user_size(self): return self._module.userspacesize + ct.sizeof(xt_entry_target) usersize = property(_get_user_size) """This is the size of the part of the underlying C structure that is used in userspace.""" def _get_standard_target(self): t = self._ptr[0] return t.u.user.name.decode() def _set_standard_target(self, name): t = self._ptr[0] if isinstance(name, str): name = name.encode() t.u.user.name = name if isinstance(name, bytes): name = name.decode() self._name = name standard_target = property(_get_standard_target, _set_standard_target) """This attribute is used for standard targets. It can be set to *ACCEPT*, *DROP*, *RETURN* or to a name of a chain the rule should jump into.""" def _update_pointers(self): self._ptr = ct.cast(self._target_buf, ct.POINTER(xt_entry_target)) self._ptrptr = ct.cast(ct.pointer(self._ptr), ct.POINTER(ct.POINTER(xt_entry_target))) self._module.t = self._ptr self._update_name() def _update_name(self): m = self._ptr[0] m.u.user.name = self.name.encode() def reset(self): """Reset the target. Parameters are set to their default values, any flags are cleared.""" ct.memset(self._target_buf, 0, self.size) self._update_pointers() t = self._ptr[0] t.u.target_size = self.size t.u.user.revision = self._revision if self._module.init: self._module.init(self._ptr) self._module.tflags = 0 udata_size = getattr(self._module, 'udata_size', 0) if udata_size > 0: udata_buf = (ct.c_ubyte * udata_size)() self._module.udata = ct.cast(ct.byref(udata_buf), ct.c_void_p) def _get_target(self): return self._ptr[0] target = property(_get_target) """This is the C structure used by the extension.""" def _get_goto(self): return self._goto goto = property(_get_goto) class Policy(object): """ If the end of a built-in chain is reached or a rule in a built-in chain with target RETURN is matched, the target specified by the chain policy determines the fate of the packet. """ ACCEPT = "ACCEPT" """If no matching rule has been found so far then accept the packet.""" DROP = "DROP" """If no matching rule has been found so far then drop the packet.""" QUEUE = "QUEUE" """If no matching rule has been found so far then queue the packet to userspace.""" RETURN = "RETURN" """Return to calling chain.""" _cache = weakref.WeakValueDictionary() def __new__(cls, name): obj = Policy._cache.get(name, None) if not obj: obj = object.__new__(cls) Policy._cache[name] = obj return obj def __init__(self, name): self.name = name def _a_to_i(addr): return struct.unpack("I", addr)[0] def _i_to_a(ip): return struct.pack("I", int(ip.s_addr)) class Rule(object): """Rules are entries in chains. Each rule has three parts: * An entry with protocol family attributes like source and destination address, transport protocol, etc. If the packet does not match the attributes set here, then processing continues with the next rule or the chain policy is applied at the end of the chain. * Any number of matches. They are optional, and make it possible to match for further packet attributes. * One target. This determines what happens with the packet if it is matched. """ protocols = {0: "all", socket.IPPROTO_AH: "ah", socket.IPPROTO_DSTOPTS: "dstopts", socket.IPPROTO_EGP: "egp", socket.IPPROTO_ESP: "esp", socket.IPPROTO_FRAGMENT: "fragment", socket.IPPROTO_GRE: "gre", socket.IPPROTO_HOPOPTS: "hopopts", socket.IPPROTO_ICMP: "icmp", socket.IPPROTO_ICMPV6: "icmpv6", socket.IPPROTO_IDP: "idp", socket.IPPROTO_IGMP: "igmp", socket.IPPROTO_IP: "ip", socket.IPPROTO_IPIP: "ipip", socket.IPPROTO_IPV6: "ipv6", socket.IPPROTO_NONE: "none", socket.IPPROTO_PIM: "pim", socket.IPPROTO_PUP: "pup", socket.IPPROTO_RAW: "raw", socket.IPPROTO_ROUTING: "routing", socket.IPPROTO_RSVP: "rsvp", socket.IPPROTO_SCTP: "sctp", socket.IPPROTO_TCP: "tcp", socket.IPPROTO_TP: "tp", socket.IPPROTO_UDP: "udp", } def __init__(self, entry=None, chain=None): """ *entry* is the ipt_entry buffer or None if the caller does not have it. *chain* is the chain object this rule belongs to. """ self.nfproto = NFPROTO_IPV4 self._matches = [] self._target = None self.chain = chain self.rule = entry def __eq__(self, rule): if self._target != rule._target: return False if len(self._matches) != len(rule._matches): return False if set(rule._matches) != set([x for x in rule._matches if x in self._matches]): return False if (self.src == rule.src and self.dst == rule.dst and self.protocol == rule.protocol and self.fragment == rule.fragment and self.in_interface == rule.in_interface and self.out_interface == rule.out_interface): return True return False def __ne__(self, rule): return not self.__eq__(rule) def _get_tables(self): return [Table(t) for t in Table.ALL if is_table_available(t)] tables = property(_get_tables) """This is the list of tables for our protocol.""" def final_check(self): """Do a final check on the target and the matches.""" if self.target: self.target.final_check() for match in self.matches: match.final_check() def create_match(self, name, revision=None): """Create a *match*, and add it to the list of matches in this rule. *name* is the name of the match extension, *revision* is the revision to use.""" match = Match(self, name=name, revision=revision) self.add_match(match) return match def create_target(self, name, revision=None, goto=False): """Create a new *target*, and set it as this rule's target. *name* is the name of the target extension, *revision* is the revision to use. *goto* determines if target uses '-j' (default) or '-g'.""" target = Target(self, name=name, revision=revision, goto=goto) self.target = target return target def add_match(self, match): """Adds a match to the rule. One can add any number of matches.""" match.rule = self self._matches.append(match) def remove_match(self, match): """Removes *match* from the list of matches.""" self._matches.remove(match) def get_ip(self): return self.entry.ip def _get_matches(self): return self._matches[:] # return a copy matches = property(_get_matches) """This is the list of matches held in this rule.""" def _get_target(self): return self._target def _set_target(self, target): target.rule = self self._target = target target = property(_get_target, _set_target) """This is the target of the rule.""" def get_src(self): src = "" if self.entry.ip.invflags & ipt_ip.IPT_INV_SRCIP: src = "".join([src, "!"]) paddr = _i_to_a(self.entry.ip.src) try: addr = socket.inet_ntop(socket.AF_INET, paddr) except socket.error: raise IPTCError("error in internal state: invalid address") src = "".join([src, addr, "/"]) paddr = _i_to_a(self.entry.ip.smsk) try: netmask = socket.inet_ntop(socket.AF_INET, paddr) except socket.error: raise IPTCError("error in internal state: invalid netmask") src = "".join([src, netmask]) return src def set_src(self, src): if src[0] == "!": self.entry.ip.invflags |= ipt_ip.IPT_INV_SRCIP src = src[1:] else: self.entry.ip.invflags &= (~ipt_ip.IPT_INV_SRCIP & ipt_ip.IPT_INV_MASK) slash = src.find("/") if slash == -1: addr = src netm = "255.255.255.255" else: addr = src[:slash] netm = src[slash + 1:] try: saddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr)) except socket.error: raise ValueError("invalid address %s" % (addr)) if not netm.isdigit(): try: nmask = _a_to_i(socket.inet_pton(socket.AF_INET, netm)) except socket.error: raise ValueError("invalid netmask %s" % (netm)) else: imask = int(netm) if imask > 32 or imask < 0: raise ValueError("invalid netmask %s" % (netm)) nmask = socket.htonl((2 ** imask - 1) << (32 - imask)) neta = in_addr() neta.s_addr = ct.c_uint32(nmask) self.entry.ip.smsk = neta # Apply subnet mask to IP address ina = in_addr() ina.s_addr = ct.c_uint32(saddr & nmask) self.entry.ip.src = ina src = property(get_src, set_src) """This is the source network address with an optional network mask in string form.""" def get_dst(self): dst = "" if self.entry.ip.invflags & ipt_ip.IPT_INV_DSTIP: dst = "".join([dst, "!"]) paddr = _i_to_a(self.entry.ip.dst) try: addr = socket.inet_ntop(socket.AF_INET, paddr) except socket.error: raise IPTCError("error in internal state: invalid address") dst = "".join([dst, addr, "/"]) paddr = _i_to_a(self.entry.ip.dmsk) try: netmask = socket.inet_ntop(socket.AF_INET, paddr) except socket.error: raise IPTCError("error in internal state: invalid netmask") dst = "".join([dst, netmask]) return dst def set_dst(self, dst): if dst[0] == "!": self.entry.ip.invflags |= ipt_ip.IPT_INV_DSTIP dst = dst[1:] else: self.entry.ip.invflags &= (~ipt_ip.IPT_INV_DSTIP & ipt_ip.IPT_INV_MASK) slash = dst.find("/") if slash == -1: addr = dst netm = "255.255.255.255" else: addr = dst[:slash] netm = dst[slash + 1:] try: daddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr)) except socket.error: raise ValueError("invalid address %s" % (addr)) if not netm.isdigit(): try: nmask = _a_to_i(socket.inet_pton(socket.AF_INET, netm)) except socket.error: raise ValueError("invalid netmask %s" % (netm)) else: imask = int(netm) if imask > 32 or imask < 0: raise ValueError("invalid netmask %s" % (netm)) nmask = socket.htonl((2 ** imask - 1) << (32 - imask)) neta = in_addr() neta.s_addr = ct.c_uint32(nmask) self.entry.ip.dmsk = neta # Apply subnet mask to IP address ina = in_addr() ina.s_addr = ct.c_uint32(daddr & nmask) self.entry.ip.dst = ina dst = property(get_dst, set_dst) """This is the destination network address with an optional network mask in string form.""" def get_in_interface(self): intf = "" if self.entry.ip.invflags & ipt_ip.IPT_INV_VIA_IN: intf = "!" iface = self.entry.ip.iniface.decode() mask = self.entry.ip.iniface_mask if len(mask) == 0: return None intf += iface if len(iface) == len(mask): intf += '+' intf = intf[:_IFNAMSIZ] return intf def set_in_interface(self, intf): if intf[0] == "!": self.entry.ip.invflags |= ipt_ip.IPT_INV_VIA_IN intf = intf[1:] else: self.entry.ip.invflags &= (~ipt_ip.IPT_INV_VIA_IN & ipt_ip.IPT_INV_MASK) if len(intf) >= _IFNAMSIZ: raise ValueError("interface name %s too long" % (intf)) masklen = len(intf) + 1 if intf[len(intf) - 1] == "+": intf = intf[:-1] masklen -= 2 self.entry.ip.iniface = b"".join([intf.encode(), b'\x00' * (_IFNAMSIZ - len(intf))]) self.entry.ip.iniface_mask = b"".join([b'\xff' * masklen, b'\x00' * (_IFNAMSIZ - masklen)]) in_interface = property(get_in_interface, set_in_interface) """This is the input network interface e.g. *eth0*. A wildcard match can be achieved via *+* e.g. *ppp+* matches any *ppp* interface.""" def get_out_interface(self): intf = "" if self.entry.ip.invflags & ipt_ip.IPT_INV_VIA_OUT: intf = "!" iface = self.entry.ip.outiface.decode() mask = self.entry.ip.outiface_mask if len(mask) == 0: return None intf += iface if len(iface) == len(mask): intf += '+' intf = intf[:_IFNAMSIZ] return intf def set_out_interface(self, intf): if intf[0] == "!": self.entry.ip.invflags |= ipt_ip.IPT_INV_VIA_OUT intf = intf[1:] else: self.entry.ip.invflags &= (~ipt_ip.IPT_INV_VIA_OUT & ipt_ip.IPT_INV_MASK) if len(intf) >= _IFNAMSIZ: raise ValueError("interface name %s too long" % (intf)) masklen = len(intf) + 1 if intf[len(intf) - 1] == "+": intf = intf[:-1] masklen -= 2 self.entry.ip.outiface = b"".join([intf.encode(), b'\x00' * (_IFNAMSIZ - len(intf))]) self.entry.ip.outiface_mask = b"".join([b'\xff' * masklen, b'\x00' * (_IFNAMSIZ - masklen)]) out_interface = property(get_out_interface, set_out_interface) """This is the output network interface e.g. *eth0*. A wildcard match can be achieved via *+* e.g. *ppp+* matches any *ppp* interface.""" def get_fragment(self): frag = bool(self.entry.ip.flags & ipt_ip.IPT_F_FRAG) if self.entry.ip.invflags & ipt_ip.IPT_INV_FRAG: frag = not frag return frag def set_fragment(self, frag): self.entry.ip.invflags &= ~ipt_ip.IPT_INV_FRAG & ipt_ip.IPT_INV_MASK if frag: self.entry.ip.flags |= ipt_ip.IPT_F_FRAG else: self.entry.ip.flags &= ~ipt_ip.IPT_F_FRAG fragment = property(get_fragment, set_fragment) """This means that the rule refers to the second and further fragments of fragmented packets. It can be *True* or *False*.""" def get_protocol(self): if self.entry.ip.invflags & ipt_ip.IPT_INV_PROTO: proto = "!" else: proto = "" proto = "".join([proto, self.protocols.get(self.entry.ip.proto, str(self.entry.ip.proto))]) return proto def set_protocol(self, proto): proto = str(proto) if proto[0] == "!": self.entry.ip.invflags |= ipt_ip.IPT_INV_PROTO proto = proto[1:] else: self.entry.ip.invflags &= (~ipt_ip.IPT_INV_PROTO & ipt_ip.IPT_INV_MASK) if proto.isdigit(): self.entry.ip.proto = int(proto) return for p in self.protocols.items(): if proto.lower() == p[1]: self.entry.ip.proto = p[0] return raise ValueError("invalid protocol %s" % (proto)) protocol = property(get_protocol, set_protocol) """This is the transport layer protocol.""" def get_counters(self): """This method returns a tuple pair of the packet and byte counters of the rule.""" counters = self.entry.counters return counters.pcnt, counters.bcnt def set_counters(self, counters): """This method set a tuple pair of the packet and byte counters of the rule.""" self.entry.counters.pcnt = counters[0] self.entry.counters.bcnt = counters[1] counters = property(get_counters, set_counters) """This is the packet and byte counters of the rule.""" # override the following three for the IPv6 subclass def _entry_size(self): return xt_align(ct.sizeof(ipt_entry)) def _entry_type(self): return ipt_entry def _new_entry(self): return ipt_entry() def _get_rule(self): if not self.entry or not self._target or not self._target.target: return None entrysz = self._entry_size() matchsz = 0 for m in self._matches: matchsz += xt_align(m.size) targetsz = xt_align(self._target.size) self.entry.target_offset = entrysz + matchsz self.entry.next_offset = entrysz + matchsz + targetsz # allocate array of full length (entry + matches + target) buf = (ct.c_ubyte * (entrysz + matchsz + targetsz))() # copy entry to buf ptr = ct.cast(ct.pointer(self.entry), ct.POINTER(ct.c_ubyte)) buf[:entrysz] = ptr[:entrysz] # copy matches to buf at offset of entrysz + match size offset = 0 for m in self._matches: sz = xt_align(m.size) buf[entrysz + offset:entrysz + offset + sz] = m.match_buf[:sz] offset += sz # copy target to buf at offset of entrysz + matchsz ptr = ct.cast(ct.pointer(self._target.target), ct.POINTER(ct.c_ubyte)) buf[entrysz + matchsz:entrysz + matchsz + targetsz] = ptr[:targetsz] return buf def _set_rule(self, entry): if not entry: self.entry = self._new_entry() return else: self.entry = ct.cast(ct.pointer(entry), ct.POINTER(self._entry_type()))[0] if not isinstance(entry, self._entry_type()): raise TypeError("Invalid rule type %s; expected %s" % (entry, self._entry_type())) entrysz = self._entry_size() matchsz = entry.target_offset - entrysz # targetsz = entry.next_offset - entry.target_offset # iterate over matches to create blob if matchsz: off = 0 while entrysz + off < entry.target_offset: match = ct.cast(ct.byref(entry.elems, off), ct.POINTER(xt_entry_match))[0] m = Match(self, match=match) self.add_match(m) off += m.size target = ct.cast(ct.byref(entry, entry.target_offset), ct.POINTER(xt_entry_target))[0] self.target = Target(self, target=target) jump = self.chain.table.get_target(entry) # standard target is special if jump: self._target.standard_target = jump rule = property(_get_rule, _set_rule) """This is the raw rule buffer as iptables expects and returns it.""" def _get_mask(self): if not self.entry: return None entrysz = self._entry_size() matchsz = self.entry.target_offset - entrysz targetsz = self.entry.next_offset - self.entry.target_offset # allocate array for mask mask = (ct.c_ubyte * (entrysz + matchsz + targetsz))() # fill it out pos = 0 for i in range(pos, pos + entrysz): mask[i] = 0xff pos += entrysz for m in self._matches: for i in range(pos, pos + m.usersize): mask[i] = 0xff pos += m.size for i in range(pos, pos + self._target.usersize): mask[i] = 0xff return mask mask = property(_get_mask) """This is the raw mask buffer as iptables uses it when removing rules.""" class Chain(object): """Rules are contained by chains. *iptables* has built-in chains for every table, and users can also create additional chains. Rule targets can specify to jump into another chain and continue processing its rules, or return to the caller chain. """ _cache = weakref.WeakValueDictionary() def __new__(cls, table, name): table_name = type(table).__name__ + "." + table.name obj = Chain._cache.get(table_name + "." + name, None) if not obj: obj = object.__new__(cls) Chain._cache[table_name + "." + name] = obj return obj def __init__(self, table, name): """*table* is the table this chain belongs to, *name* is the chain's name. If a chain already exists with *name* in *table* it is returned. """ self.name = name self.table = table def delete(self): """Delete chain from its table.""" self.table.delete_chain(self.name) def rename(self, new_name): """Rename chain to *new_name*.""" self.table.rename_chain(self.name, new_name) def flush(self): """Flush all rules from the chain.""" self.table.flush_entries(self.name) def get_counters(self): """This method returns a tuple pair of the packet and byte counters of the chain.""" policy, counters = self.table.get_policy(self.name) return counters def zero_counters(self): """This method zeroes the packet and byte counters of the chain.""" self.table.zero_entries(self.name) def set_policy(self, policy, counters=None): """Set the chain policy to *policy*, which should either be a string or a Policy object. If *counters* is not *None*, the chain counters are also adjusted. *Counters* is a list or tuple with two elements.""" if isinstance(policy, Policy): policy = policy.name self.table.set_policy(self.name, policy, counters) def get_policy(self): """Returns the policy of the chain as a Policy object.""" policy, counters = self.table.get_policy(self.name) return policy def is_builtin(self): """Returns whether the chain is a built-in one.""" return self.table.builtin_chain(self.name) def append_rule(self, rule): """Append *rule* to the end of the chain.""" rule.final_check() rbuf = rule.rule if not rbuf: raise ValueError("invalid rule") self.table.append_entry(self.name, rbuf) def insert_rule(self, rule, position=0): """Insert *rule* as the first entry in the chain if *position* is 0 or not specified, else *rule* is inserted in the given position.""" rule.final_check() rbuf = rule.rule if not rbuf: raise ValueError("invalid rule") self.table.insert_entry(self.name, rbuf, position) def replace_rule(self, rule, position=0): """Replace existing rule in the chain at *position* with given *rule*""" rbuf = rule.rule if not rbuf: raise ValueError("invalid rule") self.table.replace_entry(self.name, rbuf, position) def delete_rule(self, rule): """Removes *rule* from the chain.""" rule.final_check() rbuf = rule.rule if not rbuf: raise ValueError("invalid rule") self.table.delete_entry(self.name, rbuf, rule.mask) def get_target(self, rule): """This method returns the target of *rule* if it is a standard target, or *None* if it is not.""" rbuf = rule.rule if not rbuf: raise ValueError("invalid rule") return self.table.get_target(rbuf) def _get_rules(self): entries = [] entry = self.table.first_rule(self.name) while entry: entries.append(entry) entry = self.table.next_rule(entry) return [self.table.create_rule(e, self) for e in entries] rules = property(_get_rules) """This is the list of rules currently in the chain. The indexes of the Rule items produced from this list *should* correspond to the IPTables --line-numbers value minus one. Keeping in mind that iptables rules are 1-indexed whereas the Python list is 0-indexed """ def autocommit(fn): def new(*args): obj = args[0] ret = fn(*args) if obj.autocommit: obj.refresh() return ret return new class Table(object): """A table is the most basic building block in iptables. There are four fixed tables: * **Table.FILTER**, the filter table, * **Table.NAT**, the NAT table, * **Table.MANGLE**, the mangle table and * **Table.RAW**, the raw table. The four tables are cached, so if you create a new Table, and it has been instantiated before, then it will be reused. To get access to e.g. the filter table: >>> table = iptc.Table(iptc.Table.FILTER) The interface provided by *Table* is rather low-level, in fact it maps to *libiptc* API calls one by one, and take low-level iptables structs as parameters. It is encouraged to, when possible, use Chain, Rule, Match and Target to achieve what is wanted instead, since they hide the low-level details from the user. """ FILTER = "filter" """This is the constant for the filter table.""" MANGLE = "mangle" """This is the constant for the mangle table.""" RAW = "raw" """This is the constant for the raw table.""" NAT = "nat" """This is the constant for the nat table.""" SECURITY = "security" """This is the constant for the security table.""" ALL = ["filter", "mangle", "raw", "nat", "security"] """This is the constant for all tables.""" _cache = dict() def __new__(cls, name, autocommit=None): obj = Table._cache.get(name, None) if not obj: obj = object.__new__(cls) if autocommit is None: autocommit = True obj._init(name, autocommit) Table._cache[name] = obj elif autocommit is not None: obj.autocommit = autocommit return obj def _init(self, name, autocommit): """ *name* is the name of the table, if it already exists it is returned. *autocommit* specifies that any iptables operation that changes a rule, chain or table should be committed immediately. """ self.name = name self.autocommit = autocommit self._iptc = iptc() # to keep references to functions self._handle = None self.refresh() def __del__(self): self.close() def close(self): """Close the underlying connection handle to iptables.""" if self._handle: self._free() def commit(self): """Commit any pending operation.""" rv = self._iptc.iptc_commit(self._handle) if rv != 1: raise IPTCError("can't commit: %s" % (self.strerror())) def _free(self, ignore_exc=True): if self._handle is None: raise IPTCError("table is not initialized") try: if self.autocommit: self.commit() except IPTCError as e: if not ignore_exc: raise e finally: self._iptc.iptc_free(self._handle) self._handle = None def refresh(self): """Commit any pending operation and refresh the status of iptables.""" if self._handle: self._free() handle = self._iptc.iptc_init(self.name.encode()) if not handle: raise IPTCError("can't initialize %s: %s" % (self.name, self.strerror())) self._handle = handle def is_chain(self, chain): """Returns *True* if *chain* exists as a chain.""" if isinstance(chain, Chain): chain = chain.name if self._iptc.iptc_is_chain(chain.encode(), self._handle): return True else: return False def builtin_chain(self, chain): """Returns *True* if *chain* is a built-in chain.""" if isinstance(chain, Chain): chain = chain.name if self._iptc.iptc_builtin(chain.encode(), self._handle): return True else: return False def strerror(self): """Returns any pending iptables error from the previous operation.""" errno = _get_errno_loc()[0] if errno == 0: return "libiptc version error" return self._iptc.iptc_strerror(errno) @autocommit def create_chain(self, chain): """Create a new chain *chain*.""" if isinstance(chain, Chain): chain = chain.name rv = self._iptc.iptc_create_chain(chain.encode(), self._handle) if rv != 1: raise IPTCError("can't create chain %s: %s" % (chain, self.strerror())) return Chain(self, chain) @autocommit def delete_chain(self, chain): """Delete chain *chain* from the table.""" if isinstance(chain, Chain): chain = chain.name rv = self._iptc.iptc_delete_chain(chain.encode(), self._handle) if rv != 1: raise IPTCError("can't delete chain %s: %s" % (chain, self.strerror())) @autocommit def rename_chain(self, chain, new_name): """Rename chain *chain* to *new_name*.""" if isinstance(chain, Chain): chain = chain.name rv = self._iptc.iptc_rename_chain(chain.encode(), new_name.encode(), self._handle) if rv != 1: raise IPTCError("can't rename chain %s: %s" % (chain, self.strerror())) @autocommit def flush_entries(self, chain): """Flush all rules from *chain*.""" if isinstance(chain, Chain): chain = chain.name rv = self._iptc.iptc_flush_entries(chain.encode(), self._handle) if rv != 1: raise IPTCError("can't flush chain %s: %s" % (chain, self.strerror())) @autocommit def zero_entries(self, chain): """Zero the packet and byte counters of *chain*.""" if isinstance(chain, Chain): chain = chain.name rv = self._iptc.iptc_zero_entries(chain.encode(), self._handle) if rv != 1: raise IPTCError("can't zero chain %s counters: %s" % (chain, self.strerror())) @autocommit def set_policy(self, chain, policy, counters=None): """Set the policy of *chain* to *policy*, and also update chain counters if *counters* is specified.""" if isinstance(chain, Chain): chain = chain.name if isinstance(policy, Policy): policy = policy.name if counters: cntrs = xt_counters() cntrs.pcnt = counters[0] cntrs.bcnt = counters[1] cntrs = ct.pointer(cntrs) else: cntrs = None rv = self._iptc.iptc_set_policy(chain.encode(), policy.encode(), cntrs, self._handle) if rv != 1: raise IPTCError("can't set policy %s on chain %s: %s" % (policy, chain, self.strerror())) @autocommit def get_policy(self, chain): """Returns the policy of *chain* as a string.""" if isinstance(chain, Chain): chain = chain.name if not self.builtin_chain(chain): return None, None cntrs = xt_counters() pol = self._iptc.iptc_get_policy(chain.encode(), ct.pointer(cntrs), self._handle).decode() if not pol: raise IPTCError("can't get policy on chain %s: %s" % (chain, self.strerror())) return Policy(pol), (cntrs.pcnt, cntrs.bcnt) @autocommit def append_entry(self, chain, entry): """Appends rule *entry* to *chain*.""" rv = self._iptc.iptc_append_entry(chain.encode(), ct.cast(entry, ct.c_void_p), self._handle) if rv != 1: raise IPTCError("can't append entry to chain %s: %s" % (chain, self.strerror())) @autocommit def insert_entry(self, chain, entry, position): """Inserts rule *entry* into *chain* at position *position*.""" rv = self._iptc.iptc_insert_entry(chain.encode(), ct.cast(entry, ct.c_void_p), position, self._handle) if rv != 1: raise IPTCError("can't insert entry into chain %s: %s" % (chain, self.strerror())) @autocommit def replace_entry(self, chain, entry, position): """Replace existing rule in *chain* at *position* with given *rule*.""" rv = self._iptc.iptc_replace_entry(chain.encode(), ct.cast(entry, ct.c_void_p), position, self._handle) if rv != 1: raise IPTCError("can't replace entry in chain %s: %s" % (chain, self.strerror())) @autocommit def delete_entry(self, chain, entry, mask): """Removes rule *entry* with *mask* from *chain*.""" rv = self._iptc.iptc_delete_entry(chain.encode(), ct.cast(entry, ct.c_void_p), mask, self._handle) if rv != 1: raise IPTCError("can't delete entry from chain %s: %s" % (chain, self.strerror())) def first_rule(self, chain): """Returns the first rule in *chain* or *None* if it is empty.""" rule = self._iptc.iptc_first_rule(chain.encode(), self._handle) if rule: return rule[0] else: return rule def next_rule(self, prev_rule): """Returns the next rule after *prev_rule*.""" rule = self._iptc.iptc_next_rule(ct.pointer(prev_rule), self._handle) if rule: return rule[0] else: return rule def get_target(self, entry): """Returns the standard target in *entry*.""" t = self._iptc.iptc_get_target(ct.pointer(entry), self._handle) # t can be NULL if standard target has a "simple" verdict e.g. ACCEPT return t def _get_chains(self): chains = [] chain = self._iptc.iptc_first_chain(self._handle) while chain: chain = chain.decode() chains.append(Chain(self, chain)) chain = self._iptc.iptc_next_chain(self._handle) return chains chains = property(_get_chains) """List of chains in the table.""" def flush(self): """Flush and delete all non-builtin chains the table.""" for chain in self.chains: chain.flush() for chain in self.chains: if not self.builtin_chain(chain): self.delete_chain(chain) def create_rule(self, entry=None, chain=None): return Rule(entry, chain) 07070100000020000081A40000000000000000000000015EE6923E00005680000000000000000000000000000000000000002400000000python-iptables-1.0.0/iptc/ip6tc.py# -*- coding: utf-8 -*- import ctypes as ct import socket from .ip4tc import Rule, Table, IPTCError from .util import find_library, load_kernel from .xtables import (XT_INV_PROTO, NFPROTO_IPV6, xt_align, xt_counters) __all__ = ["Table6", "Rule6"] try: load_kernel("ip6_tables") except: pass _IFNAMSIZ = 16 def is_table6_available(name): try: Table6(name) return True except IPTCError: pass return False class in6_addr(ct.Structure): """This class is a representation of the C struct in6_addr.""" _fields_ = [("s6_addr", ct.c_uint8 * 16)] # IPv6 address class ip6t_ip6(ct.Structure): """This class is a representation of the C struct ip6t_ip6.""" _fields_ = [("src", in6_addr), # Source and destination IP6 addr ("dst", in6_addr), # Mask for src and dest IP6 addr ("smsk", in6_addr), ("dmsk", in6_addr), ("iniface", ct.c_char * _IFNAMSIZ), ("outiface", ct.c_char * _IFNAMSIZ), ("iniface_mask", ct.c_char * _IFNAMSIZ), ("outiface_mask", ct.c_char * _IFNAMSIZ), ("proto", ct.c_uint16), # Upper protocol number ("tos", ct.c_uint8), # TOS, match iff flags & IP6T_F_TOS ("flags", ct.c_uint8), # Flags word ("invflags", ct.c_uint8)] # Inverse flags # flags IP6T_F_PROTO = 0x01 # Set if rule cares about upper protocols IP6T_F_TOS = 0x02 # Match the TOS IP6T_F_GOTO = 0x04 # Set if jump is a goto IP6T_F_MASK = 0x07 # All possible flag bits mask # invflags IP6T_INV_VIA_IN = 0x01 # Invert the sense of IN IFACE IP6T_INV_VIA_OUT = 0x02 # Invert the sense of OUT IFACE IP6T_INV_TOS = 0x04 # Invert the sense of TOS IP6T_INV_SRCIP = 0x08 # Invert the sense of SRC IP IP6T_INV_DSTIP = 0x10 # Invert the sense of DST OP IP6T_INV_FRAG = 0x20 # Invert the sense of FRAG IP6T_INV_PROTO = XT_INV_PROTO IP6T_INV_MASK = 0x7F # All possible flag bits mask def __init__(self): # default: full netmask self.smsk.s6_addr = self.dmsk.s6_addr = 0xff * 16 class ip6t_entry(ct.Structure): """This class is a representation of the C struct ip6t_entry.""" _fields_ = [("ipv6", ip6t_ip6), ("nfcache", ct.c_uint), # fields that we care about ("target_offset", ct.c_uint16), # size of ip6t_entry + matches ("next_offset", ct.c_uint16), # size of e + matches + target ("comefrom", ct.c_uint), # back pointer ("counters", xt_counters), # packet and byte counters ("elems", ct.c_ubyte * 0)] # the matches then the target _libiptc, _ = find_library("ip6tc", "iptc") # old iptables versions use iptc class ip6tc(object): """This class contains all libip6tc API calls.""" iptc_init = _libiptc.ip6tc_init iptc_init.restype = ct.POINTER(ct.c_int) iptc_init.argstype = [ct.c_char_p] iptc_free = _libiptc.ip6tc_free iptc_free.restype = None iptc_free.argstype = [ct.c_void_p] iptc_commit = _libiptc.ip6tc_commit iptc_commit.restype = ct.c_int iptc_commit.argstype = [ct.c_void_p] iptc_builtin = _libiptc.ip6tc_builtin iptc_builtin.restype = ct.c_int iptc_builtin.argstype = [ct.c_char_p, ct.c_void_p] iptc_first_chain = _libiptc.ip6tc_first_chain iptc_first_chain.restype = ct.c_char_p iptc_first_chain.argstype = [ct.c_void_p] iptc_next_chain = _libiptc.ip6tc_next_chain iptc_next_chain.restype = ct.c_char_p iptc_next_chain.argstype = [ct.c_void_p] iptc_is_chain = _libiptc.ip6tc_is_chain iptc_is_chain.restype = ct.c_int iptc_is_chain.argstype = [ct.c_char_p, ct.c_void_p] iptc_create_chain = _libiptc.ip6tc_create_chain iptc_create_chain.restype = ct.c_int iptc_create_chain.argstype = [ct.c_char_p, ct.c_void_p] iptc_delete_chain = _libiptc.ip6tc_delete_chain iptc_delete_chain.restype = ct.c_int iptc_delete_chain.argstype = [ct.c_char_p, ct.c_void_p] iptc_rename_chain = _libiptc.ip6tc_rename_chain iptc_rename_chain.restype = ct.c_int iptc_rename_chain.argstype = [ct.c_char_p, ct.c_char_p, ct.c_void_p] iptc_flush_entries = _libiptc.ip6tc_flush_entries iptc_flush_entries.restype = ct.c_int iptc_flush_entries.argstype = [ct.c_char_p, ct.c_void_p] iptc_zero_entries = _libiptc.ip6tc_zero_entries iptc_zero_entries.restype = ct.c_int iptc_zero_entries.argstype = [ct.c_char_p, ct.c_void_p] # Get the policy of a given built-in chain iptc_get_policy = _libiptc.ip6tc_get_policy iptc_get_policy.restype = ct.c_char_p iptc_get_policy.argstype = [ct.c_char_p, ct.POINTER(xt_counters), ct.c_void_p] # Set the policy of a chain iptc_set_policy = _libiptc.ip6tc_set_policy iptc_set_policy.restype = ct.c_int iptc_set_policy.argstype = [ct.c_char_p, ct.c_char_p, ct.POINTER(xt_counters), ct.c_void_p] # Get first rule in the given chain: NULL for empty chain. iptc_first_rule = _libiptc.ip6tc_first_rule iptc_first_rule.restype = ct.POINTER(ip6t_entry) iptc_first_rule.argstype = [ct.c_char_p, ct.c_void_p] # Returns NULL when rules run out. iptc_next_rule = _libiptc.ip6tc_next_rule iptc_next_rule.restype = ct.POINTER(ip6t_entry) iptc_next_rule.argstype = [ct.POINTER(ip6t_entry), ct.c_void_p] # Returns a pointer to the target name of this entry. iptc_get_target = _libiptc.ip6tc_get_target iptc_get_target.restype = ct.c_char_p iptc_get_target.argstype = [ct.POINTER(ip6t_entry), ct.c_void_p] # These functions return TRUE for OK or 0 and set errno. If errno == # 0, it means there was a version error (ie. upgrade libiptc). # Rule numbers start at 1 for the first rule. # Insert the entry `e' in chain `chain' into position `rulenum'. iptc_insert_entry = _libiptc.ip6tc_insert_entry iptc_insert_entry.restype = ct.c_int iptc_insert_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry), ct.c_int, ct.c_void_p] # Atomically replace rule `rulenum' in `chain' with `e'. iptc_replace_entry = _libiptc.ip6tc_replace_entry iptc_replace_entry.restype = ct.c_int iptc_replace_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry), ct.c_int, ct.c_void_p] # Append entry `e' to chain `chain'. Equivalent to insert with # rulenum = length of chain. iptc_append_entry = _libiptc.ip6tc_append_entry iptc_append_entry.restype = ct.c_int iptc_append_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry), ct.c_void_p] # Delete the first rule in `chain' which matches `e', subject to # matchmask (array of length == origfw) iptc_delete_entry = _libiptc.ip6tc_delete_entry iptc_delete_entry.restype = ct.c_int iptc_delete_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry), ct.POINTER(ct.c_ubyte), ct.c_void_p] # Delete the rule in position `rulenum' in `chain'. iptc_delete_num_entry = _libiptc.ip6tc_delete_num_entry iptc_delete_num_entry.restype = ct.c_int iptc_delete_num_entry.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p] # Check the packet `e' on chain `chain'. Returns the verdict, or # NULL and sets errno. # iptc_check_packet = _libiptc.ip6tc_check_packet # iptc_check_packet.restype = ct.c_char_p # iptc_check_packet.argstype = [ct.c_char_p, ct.POINTER(ipt), ct.c_void_p] # Get the number of references to this chain iptc_get_references = _libiptc.ip6tc_get_references iptc_get_references.restype = ct.c_int iptc_get_references.argstype = [ct.c_uint, ct.c_char_p, ct.c_void_p] # read packet and byte counters for a specific rule iptc_read_counter = _libiptc.ip6tc_read_counter iptc_read_counter.restype = ct.POINTER(xt_counters) iptc_read_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p] # zero packet and byte counters for a specific rule iptc_zero_counter = _libiptc.ip6tc_zero_counter iptc_zero_counter.restype = ct.c_int iptc_zero_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p] # set packet and byte counters for a specific rule iptc_set_counter = _libiptc.ip6tc_set_counter iptc_set_counter.restype = ct.c_int iptc_set_counter.argstype = [ct.c_char_p, ct.c_uint, ct.POINTER(xt_counters), ct.c_void_p] # Translates errno numbers into more human-readable form than strerror. iptc_strerror = _libiptc.ip6tc_strerror iptc_strerror.restype = ct.c_char_p iptc_strerror.argstype = [ct.c_int] class Rule6(Rule): """This is an IPv6 rule.""" def __init__(self, entry=None, chain=None): self.nfproto = NFPROTO_IPV6 self._matches = [] self._target = None self.chain = chain self.rule = entry def __eq__(self, rule): if self._target != rule._target: return False if len(self._matches) != len(rule._matches): return False if set(rule._matches) != set([x for x in rule._matches if x in self._matches]): return False if (self.src == rule.src and self.dst == rule.dst and self.protocol == rule.protocol and self.in_interface == rule.in_interface and self.out_interface == rule.out_interface): return True return False def save(self, name): return self._save(name, self.entry.ipv6) def _get_tables(self): return [Table6(t) for t in Table6.ALL if is_table6_available(t)] tables = property(_get_tables) """This is the list of tables for our protocol.""" def _count_bits(self, n): bits = 0 while n > 0: if n & 1: bits += 1 n = n >> 1 return bits def _create_mask(self, plen): mask = [] for i in range(16): if plen >= 8: mask.append(0xff) elif plen > 0: mask.append(0xff>>(8-plen)<<(8-plen)) else: mask.append(0x00) plen -= 8 return mask def get_src(self): src = "" if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_SRCIP: src = "".join([src, "!"]) try: addr = socket.inet_ntop(socket.AF_INET6, self.entry.ipv6.src.s6_addr) except socket.error: raise IPTCError("error in internal state: invalid address") src = "".join([src, addr, "/"]) # create prefix length from mask in smsk plen = 0 for x in self.entry.ipv6.smsk.s6_addr: if x == 0xff: plen += 8 else: plen += self._count_bits(x) break src = "".join([src, str(plen)]) return src def _get_address_netmask(self, a): slash = a.find("/") if slash == -1: addr = a netm = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" else: addr = a[:slash] netm = a[slash + 1:] return addr, netm def _addr2in6addr(self, addr): arr = ct.c_uint8 * 16 ina = in6_addr() try: ina.s6_addr = arr.from_buffer_copy( socket.inet_pton(socket.AF_INET6, addr)) except socket.error: raise ValueError("invalid address %s" % (addr)) return arr, ina def set_src(self, src): if src[0] == "!": self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_SRCIP src = src[1:] else: self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_SRCIP & ip6t_ip6.IP6T_INV_MASK) addr, netm = self._get_address_netmask(src) arr, self.entry.ipv6.src = self._addr2in6addr(addr) # if we got a numeric prefix length if netm.isdigit(): plen = int(netm) if plen < 0 or plen > 128: raise ValueError("invalid prefix length %d" % (plen)) self.entry.ipv6.smsk.s6_addr = arr(*self._create_mask(plen)) return # nope, we got an IPv6 address-style prefix neta = in6_addr() try: neta.s6_addr = arr.from_buffer_copy( socket.inet_pton(socket.AF_INET6, netm)) except socket.error: raise ValueError("invalid netmask %s" % (netm)) self.entry.ipv6.smsk = neta src = property(get_src, set_src) """This is the source network address with an optional prefix length in string form.""" def get_dst(self): dst = "" if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_DSTIP: dst = "".join([dst, "!"]) try: addr = socket.inet_ntop(socket.AF_INET6, self.entry.ipv6.dst.s6_addr) except socket.error: raise IPTCError("error in internal state: invalid address") dst = "".join([dst, addr, "/"]) # create prefix length from mask in dmsk plen = 0 for x in self.entry.ipv6.dmsk.s6_addr: if x & 0xff == 0xff: plen += 8 else: plen += self._count_bits(x) break dst = "".join([dst, str(plen)]) return dst def set_dst(self, dst): if dst[0] == "!": self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_DSTIP dst = dst[1:] else: self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_DSTIP & ip6t_ip6.IP6T_INV_MASK) addr, netm = self._get_address_netmask(dst) arr, self.entry.ipv6.dst = self._addr2in6addr(addr) # if we got a numeric prefix length if netm.isdigit(): plen = int(netm) if plen < 0 or plen > 128: raise ValueError("invalid prefix length %d" % (plen)) self.entry.ipv6.dmsk.s6_addr = arr(*self._create_mask(plen)) return # nope, we got an IPv6 address-style prefix neta = in6_addr() try: neta.s6_addr = arr.from_buffer_copy( socket.inet_pton(socket.AF_INET6, netm)) except socket.error: raise ValueError("invalid netmask %s" % (netm)) self.entry.ipv6.dmsk = neta dst = property(get_dst, set_dst) """This is the destination network address with an optional network mask in string form.""" def get_in_interface(self): intf = "" if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_VIA_IN: intf = "".join(["!", intf]) iface = bytearray(_IFNAMSIZ) iface[:len(self.entry.ipv6.iniface)] = self.entry.ipv6.iniface mask = bytearray(_IFNAMSIZ) mask[:len(self.entry.ipv6.iniface_mask)] = self.entry.ipv6.iniface_mask if mask[0] == 0: return None for i in range(_IFNAMSIZ): if mask[i] != 0: intf = "".join([intf, chr(iface[i])]) else: if iface[i - 1] != 0: intf = "".join([intf, "+"]) else: intf = intf[:-1] break return intf def set_in_interface(self, intf): if intf[0] == "!": self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_VIA_IN intf = intf[1:] else: self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_VIA_IN & ip6t_ip6.IP6T_INV_MASK) if len(intf) >= _IFNAMSIZ: raise ValueError("interface name %s too long" % (intf)) masklen = len(intf) + 1 if intf[len(intf) - 1] == "+": intf = intf[:-1] masklen -= 2 self.entry.ipv6.iniface = ("".join( [intf, '\x00' * (_IFNAMSIZ - len(intf))])).encode() self.entry.ipv6.iniface_mask = ("".join( ['\x01' * masklen, '\x00' * (_IFNAMSIZ - masklen)])).encode() in_interface = property(get_in_interface, set_in_interface) """This is the input network interface e.g. *eth0*. A wildcard match can be achieved via *+* e.g. *ppp+* matches any *ppp* interface.""" def get_out_interface(self): intf = "" if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_VIA_OUT: intf = "".join(["!", intf]) iface = bytearray(_IFNAMSIZ) iface[:len(self.entry.ipv6.outiface)] = self.entry.ipv6.outiface mask = bytearray(_IFNAMSIZ) mask[:len(self.entry.ipv6.outiface_mask)] = \ self.entry.ipv6.outiface_mask if mask[0] == 0: return None for i in range(_IFNAMSIZ): if mask[i] != 0: intf = "".join([intf, chr(iface[i])]) else: if iface[i - 1] != 0: intf = "".join([intf, "+"]) else: intf = intf[:-1] break return intf def set_out_interface(self, intf): if intf[0] == "!": self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_VIA_OUT intf = intf[1:] else: self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_VIA_OUT & ip6t_ip6.IP6T_INV_MASK) if len(intf) >= _IFNAMSIZ: raise ValueError("interface name %s too long" % (intf)) masklen = len(intf) + 1 if intf[len(intf) - 1] == "+": intf = intf[:-1] masklen -= 2 self.entry.ipv6.outiface = ("".join( [intf, '\x00' * (_IFNAMSIZ - len(intf))])).encode() self.entry.ipv6.outiface_mask = ("".join( ['\x01' * masklen, '\x00' * (_IFNAMSIZ - masklen)])).encode() out_interface = property(get_out_interface, set_out_interface) """This is the output network interface e.g. *eth0*. A wildcard match can be achieved via *+* e.g. *ppp+* matches any *ppp* interface.""" def get_protocol(self): if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_PROTO: proto = "!" else: proto = "" proto = "".join([proto, self.protocols.get(self.entry.ipv6.proto, str(self.entry.ipv6.proto))]) return proto def set_protocol(self, proto): proto = str(proto) if proto[0] == "!": self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_PROTO self.entry.ipv6.flags &= (~ip6t_ip6.IP6T_F_PROTO & ip6t_ip6.IP6T_F_MASK) proto = proto[1:] else: self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_PROTO & ip6t_ip6.IP6T_INV_MASK) self.entry.ipv6.flags |= ip6t_ip6.IP6T_F_PROTO if proto.isdigit(): self.entry.ipv6.proto = int(proto) return for p in self.protocols.items(): if proto.lower() == p[1]: self.entry.ipv6.proto = p[0] return raise ValueError("invalid protocol %s" % (proto)) protocol = property(get_protocol, set_protocol) """This is the transport layer protocol.""" def get_ip(self): return self.entry.ipv6 def _entry_size(self): return xt_align(ct.sizeof(ip6t_entry)) def _entry_type(self): return ip6t_entry def _new_entry(self): return ip6t_entry() class Table6(Table): """The IPv6 version of Table. There are four fixed tables: * **Table.FILTER**, the filter table, * **Table.MANGLE**, the mangle table, * **Table.RAW**, the raw table and * **Table.SECURITY**, the security table. The four tables are cached, so if you create a new Table, and it has been instantiated before, then it will be reused. To get access to e.g. the filter table: >>> import iptc >>> table = iptc.Table6(iptc.Table6.FILTER) The interface provided by *Table* is rather low-level, in fact it maps to *libiptc* API calls one by one, and take low-level iptables structs as parameters. It is encouraged to, when possible, use Chain, Rule, Match and Target to achieve what is wanted instead, since they hide the low-level details from the user. """ FILTER = "filter" """This is the constant for the filter table.""" MANGLE = "mangle" """This is the constant for the mangle table.""" RAW = "raw" """This is the constant for the raw table.""" NAT = "nat" """This is the constant for the nat table.""" SECURITY = "security" """This is the constant for the security table.""" ALL = ["filter", "mangle", "raw", "nat", "security"] """This is the constant for all tables.""" _cache = dict() def __new__(cls, name, autocommit=None): obj = Table6._cache.get(name, None) if not obj: obj = object.__new__(cls) if autocommit is None: autocommit = True obj._init(name, autocommit) Table6._cache[name] = obj elif autocommit is not None: obj.autocommit = autocommit return obj def _init(self, name, autocommit): """ Here *name* is the name of the table to instantiate, if it has already been instantiated the existing cached object is returned. *Autocommit* specifies that any low-level iptables operation should be committed immediately, making changes visible in the kernel. """ self._iptc = ip6tc() # to keep references to functions self._handle = None self.name = name self.autocommit = autocommit self.refresh() def create_rule(self, entry=None, chain=None): return Rule6(entry, chain) 07070100000021000081A40000000000000000000000015EE6923E00000DCD000000000000000000000000000000000000002300000000python-iptables-1.0.0/iptc/util.pyimport re import os import sys import ctypes import ctypes.util from distutils.sysconfig import get_python_lib from itertools import product from subprocess import Popen, PIPE from sys import version_info try: from sysconfig import get_config_var except ImportError: def get_config_var(name): if name == 'SO': return '.so' raise Exception('Not implemented') def _insert_ko(modprobe, modname): p = Popen([modprobe, modname], stderr=PIPE) p.wait() return (p.returncode, p.stderr.read(1024)) def _load_ko(modname): # only try to load modules on kernels that support them if not os.path.exists("/proc/modules"): return (0, None) # this will return the full path for the modprobe binary modprobe = "/sbin/modprobe" try: proc = open("/proc/sys/kernel/modprobe") modprobe = proc.read(1024) except: pass if modprobe[-1] == '\n': modprobe = modprobe[:-1] return _insert_ko(modprobe, modname) # Load a kernel module. If it is already loaded modprobe will just return 0. def load_kernel(name, exc_if_failed=False): rc, err = _load_ko(name) if rc: if not err: err = "Failed to load the %s kernel module." % (name) if err[-1] == "\n": err = err[:-1] if exc_if_failed: raise Exception(err) def _do_find_library(name): if '/' in name: try: return ctypes.CDLL(name, mode=ctypes.RTLD_GLOBAL) except Exception: return None p = ctypes.util.find_library(name) if p: lib = ctypes.CDLL(p, mode=ctypes.RTLD_GLOBAL) return lib # probably we have been installed in a virtualenv try: lib = ctypes.CDLL(os.path.join(get_python_lib(), name), mode=ctypes.RTLD_GLOBAL) return lib except: pass for p in sys.path: try: lib = ctypes.CDLL(os.path.join(p, name), mode=ctypes.RTLD_GLOBAL) return lib except: pass return None def _find_library(*names): exts = [] if version_info >= (3, 3): exts.append(get_config_var("EXT_SUFFIX")) else: exts.append(get_config_var('SO')) if version_info >= (3, 5): exts.append('.so') for name in names: libnames = [name, "lib" + name] for ext in exts: libnames += [name + ext, "lib" + name + ext] libdir = os.environ.get('IPTABLES_LIBDIR', None) if libdir is not None: libdirs = libdir.split(':') libs = [os.path.join(*p) for p in product(libdirs, libnames)] libs.extend(libnames) else: libs = libnames for n in libs: while os.path.islink(n): n = os.path.realpath(n) lib = _do_find_library(n) if lib is not None: yield lib def find_library(*names): for lib in _find_library(*names): major = 0 m = re.search(r"\.so\.(\d+).?", lib._name) if m: major = int(m.group(1)) return lib, major return None, None def find_libc(): lib = ctypes.util.find_library('c') if lib is not None: return ctypes.CDLL(lib, mode=ctypes.RTLD_GLOBAL) libnames = ['libc.so.6', 'libc.so.0', 'libc.so'] for name in libnames: try: lib = ctypes.CDLL(name, mode=ctypes.RTLD_GLOBAL) return lib except: pass return None 07070100000022000081A40000000000000000000000015EE6923E0000004F000000000000000000000000000000000000002600000000python-iptables-1.0.0/iptc/version.py# -*- coding: utf-8 -*- __pkgname__ = "python-iptables" __version__ = "1.0.0" 07070100000023000081A40000000000000000000000015EE6923E0000CABE000000000000000000000000000000000000002600000000python-iptables-1.0.0/iptc/xtables.py# -*- coding: utf-8 -*- import ctypes as ct import os import sys import weakref from . import version from .util import find_library, find_libc from .errors import * XT_INV_PROTO = 0x40 # invert the sense of PROTO NFPROTO_UNSPEC = 0 NFPROTO_IPV4 = 2 NFPROTO_ARP = 3 NFPROTO_BRIDGE = 7 NFPROTO_IPV6 = 10 NFPROTO_DECNET = 12 NFPROTO_NUMPROTO = 6 XTF_DONT_LOAD = 0x00 XTF_DURING_LOAD = 0x01 XTF_TRY_LOAD = 0x02 XTF_LOAD_MUST_SUCCEED = 0x03 XTOPT_INVERT = 1 << 0 XTOPT_MAND = 1 << 1 XTOPT_MULTI = 1 << 2 XTOPT_PUT = 1 << 3 XTOPT_NBO = 1 << 4 _WORDLEN = ct.sizeof(ct.c_long) _XT_FUNCTION_MAXNAMELEN = 30 def xt_align(sz): return ((sz + (_WORDLEN - 1)) & ~(_WORDLEN - 1)) class xt_counters(ct.Structure): """This class is a representation of the C struct xt_counters.""" _fields_ = [("pcnt", ct.c_uint64), # packet counter ("bcnt", ct.c_uint64)] # byte counter class xt_entry_target_user(ct.Structure): _fields_ = [("target_size", ct.c_uint16), ("name", ct.c_char * (_XT_FUNCTION_MAXNAMELEN - 1)), ("revision", ct.c_uint8)] class xt_entry_target_u(ct.Union): _fields_ = [("user", xt_entry_target_user), ("target_size", ct.c_uint16)] # full length class xt_entry_target(ct.Structure): """This class is a representation of the C struct xt_entry_target.""" _fields_ = [("u", xt_entry_target_u), ("data", ct.c_ubyte * 0)] class xt_entry_match_user(ct.Structure): _fields_ = [("match_size", ct.c_uint16), ("name", ct.c_char * (_XT_FUNCTION_MAXNAMELEN - 1)), ("revision", ct.c_uint8)] class xt_entry_match_u(ct.Union): _fields_ = [("user", xt_entry_match_user), ("match_size", ct.c_uint16)] # full length class xt_entry_match(ct.Structure): """This class is a representation of the C struct xt_entry_match.""" _fields_ = [("u", xt_entry_match_u), ("data", ct.c_ubyte * 0)] class xtables_globals(ct.Structure): _fields_ = [("option_offset", ct.c_uint), ("program_name", ct.c_char_p), ("program_version", ct.c_char_p), ("orig_opts", ct.c_void_p), ("opts", ct.c_void_p), ("exit_err", ct.CFUNCTYPE(None, ct.c_int, ct.c_char_p)), ("compat_rev", ct.CFUNCTYPE(ct.c_int, ct.c_char_p, ct.c_uint8, ct.c_int))] # struct used by getopt() class option(ct.Structure): _fields_ = [("name", ct.c_char_p), ("has_arg", ct.c_int), ("flag", ct.POINTER(ct.c_int)), ("val", ct.c_int)] class xt_option_entry(ct.Structure): _fields_ = [("name", ct.c_char_p), ("type", ct.c_int), ("id", ct.c_uint), ("excl", ct.c_uint), ("also", ct.c_uint), ("flags", ct.c_uint), ("ptroff", ct.c_uint), ("size", ct.c_size_t), ("min", ct.c_uint), ("max", ct.c_uint)] class _U1(ct.Union): _fields_ = [("match", ct.POINTER(ct.POINTER(xt_entry_match))), ("target", ct.POINTER(ct.POINTER(xt_entry_target)))] class nf_inet_addr(ct.Union): _fields_ = [("all", ct.c_uint32 * 4), ("ip", ct.c_uint32), ("ip6", ct.c_uint32 * 4), ("in", ct.c_uint32), ("in6", ct.c_uint8 * 16)] class _S1(ct.Structure): _fields_ = [("haddr", nf_inet_addr), ("hmask", nf_inet_addr), ("hlen", ct.c_uint8)] class _S2(ct.Structure): _fields_ = [("tos_value", ct.c_uint8), ("tos_mask", ct.c_uint8)] class _S3(ct.Structure): _fields_ = [("mark", ct.c_uint32), ("mask", ct.c_uint32)] class _U_val(ct.Union): _anonymous_ = ("s1", "s2", "s3") _fields_ = [("u8", ct.c_uint8), ("u8_range", ct.c_uint8 * 2), ("syslog_level", ct.c_uint8), ("protocol", ct.c_uint8), ("u16", ct.c_uint16), ("u16_range", ct.c_uint16 * 2), ("port", ct.c_uint16), ("port_range", ct.c_uint16 * 2), ("u32", ct.c_uint32), ("u32_range", ct.c_uint32 * 2), ("u64", ct.c_uint64), ("u64_range", ct.c_uint64 * 2), ("double", ct.c_double), ("s1", _S1), ("s2", _S2), ("s3", _S3), ("ethermac", ct.c_uint8 * 6)] class xt_option_call(ct.Structure): _anonymous_ = ("u",) _fields_ = [("arg", ct.c_char_p), ("ext_name", ct.c_char_p), ("entry", ct.POINTER(xt_option_entry)), ("data", ct.c_void_p), ("xflags", ct.c_uint), ("invert", ct.c_uint8), ("nvals", ct.c_uint8), ("val", _U_val), ("u", _U1), ("xt_entry", ct.c_void_p), ("udata", ct.c_void_p)] class xt_fcheck_call(ct.Structure): _fields_ = [("ext_name", ct.c_char_p), ("data", ct.c_void_p), ("udata", ct.c_void_p), ("xflags", ct.c_uint)] class _xtables_match_v1(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, unsigned int # *flags, const void *entry, struct xt_entry_match **match) ("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_match)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the match iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match), ct.c_int)), # saves the match info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), ("option_offset", ct.c_uint), ("m", ct.POINTER(xt_entry_match)), ("mflags", ct.c_uint), ("loaded", ct.c_uint)] x6_parse = None x6_fcheck = None x6_options = None _xtables_match_v2 = _xtables_match_v1 _xtables_match_v4 = _xtables_match_v1 _xtables_match_v5 = _xtables_match_v1 class _xtables_match_v6(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, unsigned int # *flags, const void *entry, struct xt_entry_match **match) ("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_match)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the match iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match), ct.c_int)), # saves the match info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), ("option_offset", ct.c_uint), ("m", ct.POINTER(xt_entry_match)), ("mflags", ct.c_uint), ("loaded", ct.c_uint)] class _xtables_match_v7(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, unsigned int # *flags, const void *entry, struct xt_entry_match **match) ("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_match)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the match iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match), ct.c_int)), # saves the match info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("m", ct.POINTER(xt_entry_match)), ("mflags", ct.c_uint), ("loaded", ct.c_uint)] class _xtables_match_v9(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("real_name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, unsigned int # *flags, const void *entry, struct xt_entry_match **match) ("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_match)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the match iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match), ct.c_int)), # saves the match info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("m", ct.POINTER(xt_entry_match)), ("mflags", ct.c_uint), ("loaded", ct.c_uint)] class _xtables_match_v10(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("real_name", ct.c_char_p), ("revision", ct.c_uint8), ("ext_flags", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, unsigned int # *flags, const void *entry, struct xt_entry_match **match) ("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_match)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the match iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match), ct.c_int)), # saves the match info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match))), # Print match name or alias ("alias", ct.CFUNCTYPE(ct.c_char_p, ct.POINTER(xt_entry_match))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("m", ct.POINTER(xt_entry_match)), ("mflags", ct.c_uint), ("loaded", ct.c_uint)] _xtables_match_v11 = _xtables_match_v10 class _xtables_match_v12(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("real_name", ct.c_char_p), ("revision", ct.c_uint8), ("ext_flags", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, unsigned int # *flags, const void *entry, struct xt_entry_match **match) ("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_match)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the match iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match), ct.c_int)), # saves the match info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_match))), # Print match name or alias ("alias", ct.CFUNCTYPE(ct.c_char_p, ct.POINTER(xt_entry_match))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), ('xt_xlate', ct.c_int), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("m", ct.POINTER(xt_entry_match)), ("mflags", ct.c_uint), ("loaded", ct.c_uint)] class xtables_match(ct.Union): _fields_ = [("v1", _xtables_match_v1), ("v2", _xtables_match_v2), # Apparently v3 was skipped ("v4", _xtables_match_v4), ("v5", _xtables_match_v5), ("v6", _xtables_match_v6), ("v7", _xtables_match_v7), # Apparently v8 was skipped ("v9", _xtables_match_v9), ("v10", _xtables_match_v10), ("v11", _xtables_match_v11), ("v12", _xtables_match_v12)] class _xtables_target_v1(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, # unsigned int *flags, const void *entry, # struct xt_entry_target **target) ("parse", ct.CFUNCTYPE(ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_target)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the target iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target), ct.c_int)), # saves the target info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), ("option_offset", ct.c_uint), ("t", ct.POINTER(xt_entry_target)), ("tflags", ct.c_uint), ("used", ct.c_uint), ("loaded", ct.c_uint)] x6_parse = None x6_fcheck = None x6_options = None _xtables_target_v2 = _xtables_target_v1 _xtables_target_v4 = _xtables_target_v1 _xtables_target_v5 = _xtables_target_v1 class _xtables_target_v6(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, # unsigned int *flags, const void *entry, # struct xt_entry_target **target) ("parse", ct.CFUNCTYPE(ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_target)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the target iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target), ct.c_int)), # saves the target info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), ("option_offset", ct.c_uint), ("t", ct.POINTER(xt_entry_target)), ("tflags", ct.c_uint), ("used", ct.c_uint), ("loaded", ct.c_uint)] class _xtables_target_v7(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, # unsigned int *flags, const void *entry, # struct xt_entry_target **target) ("parse", ct.CFUNCTYPE(ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_target)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the target iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target), ct.c_int)), # saves the target info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("t", ct.POINTER(xt_entry_target)), ("tflags", ct.c_uint), ("used", ct.c_uint), ("loaded", ct.c_uint)] class _xtables_target_v9(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("real_name", ct.c_char_p), ("revision", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, # unsigned int *flags, const void *entry, # struct xt_entry_target **target) ("parse", ct.CFUNCTYPE(ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_target)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the target iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target), ct.c_int)), # saves the target info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("t", ct.POINTER(xt_entry_target)), ("tflags", ct.c_uint), ("used", ct.c_uint), ("loaded", ct.c_uint)] class _xtables_target_v10(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("real_name", ct.c_char_p), ("revision", ct.c_uint8), ("ext_flags", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, # unsigned int *flags, const void *entry, # struct xt_entry_target **target) ("parse", ct.CFUNCTYPE(ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_target)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the target iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target), ct.c_int)), # saves the target info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target))), # Print target name or alias ("alias", ct.CFUNCTYPE(ct.c_char_p, ct.POINTER(xt_entry_target))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("t", ct.POINTER(xt_entry_target)), ("tflags", ct.c_uint), ("used", ct.c_uint), ("loaded", ct.c_uint)] _xtables_target_v11 = _xtables_target_v10 class _xtables_target_v12(ct.Structure): _fields_ = [("version", ct.c_char_p), ("next", ct.c_void_p), ("name", ct.c_char_p), ("real_name", ct.c_char_p), ("revision", ct.c_uint8), ("ext_flags", ct.c_uint8), ("family", ct.c_uint16), ("size", ct.c_size_t), ("userspacesize", ct.c_size_t), ("help", ct.CFUNCTYPE(None)), ("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))), # fourth parameter entry is struct ipt_entry for example # int (*parse)(int c, char **argv, int invert, # unsigned int *flags, const void *entry, # struct xt_entry_target **target) ("parse", ct.CFUNCTYPE(ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.POINTER( xt_entry_target)))), ("final_check", ct.CFUNCTYPE(None, ct.c_uint)), # prints out the target iff non-NULL: put space at end # first parameter ip is struct ipt_ip * for example ("print", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target), ct.c_int)), # saves the target info in parsable form to stdout. # first parameter ip is struct ipt_ip * for example ("save", ct.CFUNCTYPE(None, ct.c_void_p, ct.POINTER(xt_entry_target))), # Print target name or alias ("alias", ct.CFUNCTYPE(ct.c_char_p, ct.POINTER(xt_entry_target))), # pointer to list of extra command-line options ("extra_opts", ct.POINTER(option)), # introduced with the new iptables API ("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))), ("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))), ("x6_options", ct.POINTER(xt_option_entry)), ('xt_xlate', ct.c_int), # size of per-extension instance extra "global" scratch space ("udata_size", ct.c_size_t), # ignore these men behind the curtain: ("udata", ct.c_void_p), ("option_offset", ct.c_uint), ("t", ct.POINTER(xt_entry_target)), ("tflags", ct.c_uint), ("used", ct.c_uint), ("loaded", ct.c_uint)] class xtables_target(ct.Union): _fields_ = [("v1", _xtables_target_v1), ("v2", _xtables_target_v2), # Apparently v3 was skipped ("v4", _xtables_target_v4), ("v5", _xtables_target_v5), ("v6", _xtables_target_v6), ("v7", _xtables_target_v7), # Apparently v8 was skipped ("v9", _xtables_target_v9), ("v10", _xtables_target_v10), ("v11", _xtables_target_v11), ("v12", _xtables_target_v12)] _libc = find_libc() _optind = ct.c_long.in_dll(_libc, "optind") _optarg = ct.c_char_p.in_dll(_libc, "optarg") xtables_version = os.getenv("PYTHON_IPTABLES_XTABLES_VERSION") if xtables_version: _searchlib = "libxtables.so.%s" % (xtables_version,) else: _searchlib = "xtables" _lib_xtables, xtables_version = find_library(_searchlib) _xtables_libdir = os.getenv("XTABLES_LIBDIR") if _xtables_libdir is None: import re ldconfig_path_regex = re.compile('^(/.*):$') import subprocess ldconfig = subprocess.Popen( ('/sbin/ldconfig', '-N', '-v'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) ldconfig_out, ldconfig_err = ldconfig.communicate() if ldconfig.returncode != 0: raise XTablesError("ldconfig failed, please set XTABLES_LIBDIR") for ldconfig_out_line in ldconfig_out.splitlines(): ldconfig_path_regex_match = ldconfig_path_regex.match(ldconfig_out_line) if ldconfig_path_regex_match is not None: ldconfig_path = os.path.join(ldconfig_path_regex_match.group(1), 'xtables') if os.path.isdir(ldconfig_path): _xtables_libdir = ldconfig_path break if _xtables_libdir is None: raise XTablesError("can't find directory with extensions; " "please set XTABLES_LIBDIR") _lib_xtwrapper, _ = find_library("xtwrapper") _throw = _lib_xtwrapper.throw_exception _wrap_parse = _lib_xtwrapper.wrap_parse _wrap_parse.restype = ct.c_int _wrap_parse.argtypes = [ct.c_void_p, ct.c_int, ct.POINTER(ct.c_char_p), ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p, ct.POINTER(ct.c_void_p)] _wrap_save = _lib_xtwrapper.wrap_save _wrap_save.restype = ct.c_void_p _wrap_save.argtypes = [ct.c_void_p, ct.c_void_p, ct.c_void_p] _wrap_uintfn = _lib_xtwrapper.wrap_uintfn _wrap_uintfn.restype = ct.c_int _wrap_uintfn.argtypes = [ct.c_void_p, ct.c_uint] _wrap_voidfn = _lib_xtwrapper.wrap_voidfn _wrap_voidfn.restype = ct.c_int _wrap_voidfn.argtypes = [ct.c_void_p] _wrap_x6fn = _lib_xtwrapper.wrap_x6fn _wrap_x6fn.restype = ct.c_int _wrap_x6fn.argtypes = [ct.c_void_p, ct.c_void_p] _kernel_version = ct.c_int.in_dll(_lib_xtwrapper, 'kernel_version') _get_kernel_version = _lib_xtwrapper.get_kernel_version _get_kernel_version() def _xt_exit(status, *args): _throw(status) _EXIT_FN = ct.CFUNCTYPE(None, ct.c_int, ct.c_char_p) _xt_exit = _EXIT_FN(_xt_exit) def set_nfproto(fn): def new(*args): xtobj = args[0] xtables._xtables_set_nfproto(xtobj.proto) return fn(*args) return new _xt_globals = xtables_globals() _xt_globals.option_offset = 0 _xt_globals.program_name = version.__pkgname__.encode() _xt_globals.program_version = version.__version__.encode() _xt_globals.orig_opts = None _xt_globals.opts = None _xt_globals.exit_err = _xt_exit if xtables_version > 10: _COMPAT_REV_FN = ct.CFUNCTYPE(ct.c_int, ct.c_char_p, ct.c_uint8, ct.c_int) _xt_compat_rev = _COMPAT_REV_FN(_lib_xtables.xtables_compatible_revision) _xt_globals.compat_rev = _xt_compat_rev _loaded_exts = {} class xtables(object): _xtables_init_all = _lib_xtables.xtables_init_all _xtables_init_all.restype = ct.c_int _xtables_init_all.argtypes = [ct.POINTER(xtables_globals), ct.c_uint8] _xtables_find_match = _lib_xtables.xtables_find_match _xtables_find_match.restype = ct.POINTER(xtables_match) _xtables_find_match.argtypes = [ct.c_char_p, ct.c_int, ct.c_void_p] _xtables_find_target = _lib_xtables.xtables_find_target _xtables_find_target.restype = ct.POINTER(xtables_target) _xtables_find_target.argtypes = [ct.c_char_p, ct.c_int] _xtables_set_nfproto = _lib_xtables.xtables_set_nfproto _xtables_set_nfproto.restype = None _xtables_set_nfproto.argtypes = [ct.c_uint8] _xtables_xt_params = ct.c_void_p.in_dll(_lib_xtables, "xt_params") _xtables_matches = (ct.c_void_p.in_dll(_lib_xtables, "xtables_matches")) try: _xtables_pending_matches = (ct.c_void_p.in_dll( _lib_xtables, "xtables_pending_matches")) except ValueError: _xtables_pending_matches = ct.POINTER(None) _xtables_targets = (ct.c_void_p.in_dll(_lib_xtables, "xtables_targets")) try: _xtables_pending_targets = (ct.c_void_p.in_dll( _lib_xtables, "xtables_pending_targets")) except ValueError: _xtables_pending_targets = ct.POINTER(None) _real_name = { 'state': 'conntrack', 'NOTRACK': 'CT' } _cache = weakref.WeakValueDictionary() def __new__(cls, proto): obj = xtables._cache.get(proto, None) if not obj: obj = object.__new__(cls) xtables._cache[proto] = obj obj._xtinit(proto) return obj def _xtinit(self, proto, no_alias_check=False): self.proto = proto self.no_alias_check = no_alias_check thismodule = sys.modules[__name__] matchname = "_xtables_match_v%d" % (xtables_version) targetname = "_xtables_target_v%d" % (xtables_version) try: self._match_struct = getattr(thismodule, matchname) self._target_struct = getattr(thismodule, targetname) except: raise XTablesError("unknown xtables version %d" % (xtables_version)) rv = xtables._xtables_init_all(ct.pointer(_xt_globals), proto) if rv: raise XTablesError("xtables_init_all() failed: %d" % (rv)) def __repr__(self): return "XTables for protocol %d" % (self.proto) def _check_extname(self, name): if name in [b"", b"ACCEPT", b"DROP", b"QUEUE", b"RETURN"]: name = b"standard" return name def _loaded(self, name, ext): _loaded_exts['%s___%s' % (self.proto, name)] = ext def _get_initfn_from_lib(self, name, lib): try: initfn = getattr(lib, "libxt_%s_init" % (name)) except AttributeError: prefix = self._get_prefix() initfn = getattr(lib, "%s%s_init" % (prefix, name), None) if initfn is None and not self.no_alias_check: if name in xtables._real_name: name = xtables._real_name[name] initfn = self._get_initfn_from_lib(name, lib) return initfn def _try_extinit(self, name, lib): try: if type(lib) != ct.CDLL: lib = ct.CDLL(lib) fn = self._get_initfn_from_lib(name, lib) if fn: _wrap_voidfn(fn) return True except: pass return False def _get_prefix(self): if self.proto == NFPROTO_IPV4: return "libipt_" elif self.proto == NFPROTO_IPV6: return "libip6t_" else: raise XTablesError("Unknown protocol %d" % (self.proto)) def _try_register(self, name): if isinstance(name, bytes): name = name.decode() if self._try_extinit(name, _lib_xtables): return prefix = self._get_prefix() libs = [os.path.join(_xtables_libdir, "libxt_" + name + ".so"), os.path.join(_xtables_libdir, prefix + name + ".so")] for lib in libs: if self._try_extinit(name, lib): return def _get_loaded_ext(self, name): ext = _loaded_exts.get('%s___%s' % (self.proto, name), None) return ext @set_nfproto def find_match(self, name): if isinstance(name, str): name = name.encode() name = self._check_extname(name) ext = self._get_loaded_ext(name) if ext is not None: return ext match = xtables._xtables_find_match(name, XTF_TRY_LOAD, None) if not match: self._try_register(name) match = xtables._xtables_find_match(name, XTF_DONT_LOAD, None) if not match: return match m = ct.cast(match, ct.POINTER(self._match_struct)) self._loaded(m[0].name, m) return m @set_nfproto def find_target(self, name): if isinstance(name, str): name = name.encode() name = self._check_extname(name) ext = self._get_loaded_ext(name) if ext is not None: return ext target = xtables._xtables_find_target(name, XTF_TRY_LOAD) if not target: self._try_register(name) target = xtables._xtables_find_target(name, XTF_DONT_LOAD) if not target: return target t = ct.cast(target, ct.POINTER(self._target_struct)) self._loaded(t[0].name, t) return t @set_nfproto def save(self, module, ip, ptr): _wrap_save(module.save, ct.cast(ct.pointer(ip), ct.c_void_p), ptr) def _option_lookup(self, entries, name): for e in entries: if not e.name: break if e.name == name: return e return None def _parse(self, module, argv, inv, flags, entry, ptr): for opt in module.extra_opts: if opt.name == argv[0]: rv = _wrap_parse(module.parse, opt.val, argv, inv, flags, entry, ptr) if rv != 1: raise ValueError("invalid value %s" % (argv[1])) return elif not opt.name: break raise AttributeError("invalid parameter %s" % (argv[0])) # Dispatch arguments to the appropriate parse function, based upon the # extension's choice of API. @set_nfproto def parse_target(self, argv, invert, t, fw, ptr, x6_parse, x6_options): _optarg.value = len(argv) > 1 and argv[1] or None _optind.value = len(argv) - 1 try: # new API? if x6_options is None: x6_options = t.x6_options if x6_parse is None: x6_parse = t.x6_parse except AttributeError: pass if x6_options and x6_parse: # new API entry = self._option_lookup(x6_options, argv[0]) if not entry: raise XTablesError("%s: no such parameter %s" % (t.name, argv[0])) cb = xt_option_call() cb.entry = ct.pointer(entry) cb.arg = _optarg cb.invert = ct.c_uint8(invert.value) cb.ext_name = t.name cb.data = ct.cast(t.t[0].data, ct.c_void_p) cb.xflags = 0 cb.target = ct.pointer(t.t) cb.xt_entry = ct.cast(fw, ct.c_void_p) cb.udata = t.udata rv = _wrap_x6fn(x6_parse, ct.pointer(cb)) if rv != 0: raise XTablesError("%s: parameter error %d (%s)" % (t.name, rv, argv[1])) t.tflags |= cb.xflags return # old API flags = ct.pointer(ct.c_uint(0)) self._parse(t, argv, invert, flags, fw, ptr) t.tflags |= flags[0] # Dispatch arguments to the appropriate parse function, based upon the # extension's choice of API. @set_nfproto def parse_match(self, argv, invert, m, fw, ptr, x6_parse, x6_options): _optarg.value = len(argv) > 1 and argv[1] or None _optind.value = len(argv) - 1 try: # new API? if x6_options is None: x6_options = m.x6_options if x6_parse is None: x6_parse = m.x6_parse except AttributeError: pass if x6_options and x6_parse: # new API entry = self._option_lookup(x6_options, argv[0]) if not entry: raise XTablesError("%s: no such parameter %s" % (m.name, argv[0])) cb = xt_option_call() cb.entry = ct.pointer(entry) cb.arg = _optarg cb.invert = ct.c_uint8(invert.value) cb.ext_name = m.name cb.data = ct.cast(m.m[0].data, ct.c_void_p) cb.xflags = 0 cb.match = ct.pointer(m.m) cb.xt_entry = ct.cast(fw, ct.c_void_p) cb.udata = m.udata rv = _wrap_x6fn(x6_parse, ct.pointer(cb)) if rv != 0: raise XTablesError("%s: parameter '%s' error %d" % ( m.name, len(argv) > 1 and argv[1] or "", rv)) m.mflags |= cb.xflags return # old API flags = ct.pointer(ct.c_uint(0)) self._parse(m, argv, invert, flags, fw, ptr) m.mflags |= flags[0] # Check that all option constraints have been met. This effectively # replaces ->final_check of the older API. def _options_fcheck(self, name, xflags, table): for entry in table: if entry.name is None: break if entry.flags & XTOPT_MAND and not xflags & (1 << entry.id): raise XTablesError("%s: --%s must be specified" % (name, entry.name)) if not xflags & (1 << entry.id): continue # XXX: check for conflicting options def _fcheck_target_old(self, target): # old API if not target.final_check: return rv = _wrap_uintfn(target.final_check, target.tflags) if rv: raise XTablesError("%s.final_check() has failed" % (target.name)) def _fcheck_target_new(self, target): # new API cb = xt_fcheck_call() cb.ext_name = target.name cb.data = ct.cast(target.t[0].data, ct.c_void_p) cb.xflags = target.tflags cb.udata = target.udata rv = _wrap_x6fn(target.x6_fcheck, ct.pointer(cb)) if rv: raise XTablesError("%s.x6_fcheck has failed" % (target.name)) if target.x6_options: self._options_fcheck(target.name, target.tflags, target.x6_options) # Dispatch arguments to the appropriate final_check function, based upon # the extension's choice of API. @set_nfproto def final_check_target(self, target): x6_fcheck = None try: # new API? x6_fcheck = target.x6_fcheck except AttributeError: # old API pass if x6_fcheck: self._fcheck_target_new(target) else: self._fcheck_target_old(target) def _fcheck_match_old(self, match): # old API if not match.final_check: return rv = _wrap_uintfn(match.final_check, match.mflags) if rv: raise XTablesError("%s.final_check() has failed" % (match.name)) def _fcheck_match_new(self, match): # new API cb = xt_fcheck_call() cb.ext_name = match.name cb.data = ct.cast(match.m[0].data, ct.c_void_p) cb.xflags = match.mflags cb.udata = match.udata rv = _wrap_x6fn(match.x6_fcheck, ct.pointer(cb)) if rv: raise XTablesError("%s.x6_fcheck has failed" % (match.name)) if match.x6_options: self._options_fcheck(match.name, match.mflags, match.x6_options) # Dispatch arguments to the appropriate final_check function, based upon # the extension's choice of API. @set_nfproto def final_check_match(self, match): x6_fcheck = None try: # new API? x6_fcheck = match.x6_fcheck except AttributeError: # old API pass if x6_fcheck: self._fcheck_match_new(match) else: self._fcheck_match_old(match) 07070100000024000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000002300000000python-iptables-1.0.0/libxtwrapper07070100000025000081A40000000000000000000000015EE6923E000006B1000000000000000000000000000000000000002D00000000python-iptables-1.0.0/libxtwrapper/wrapper.c#include <errno.h> #include <setjmp.h> #include <stdio.h> #include <stdlib.h> #include <sys/utsname.h> int kernel_version; #define LINUX_VERSION(x,y,z) (0x10000*(x) + 0x100*(y) + z) void get_kernel_version(void) { static struct utsname uts; int x = 0, y = 0, z = 0; if (uname(&uts) == -1) { fprintf(stderr, "Unable to retrieve kernel version.\n"); return; } sscanf(uts.release, "%d.%d.%d", &x, &y, &z); kernel_version = LINUX_VERSION(x, y, z); } static jmp_buf env; void throw_exception(int err) { longjmp(env, err); } int wrap_parse(int (*fn)(int, char **, int, unsigned int *, void *, void **), int i, char **argv, int inv, unsigned int *flags, void *p, void **mptr) { int rv = -1; int err; if ((err = setjmp(env)) == 0) { rv = fn(i, argv, inv, flags, p, mptr); } else { errno = err; } return rv; } struct ipt_ip; void wrap_save(int (*fn)(const void *, const void *), const void *ip, const void *m) { fn(ip, m); fprintf(stdout, "\n"); /* make sure something is written to stdout */ fflush(stdout); } int wrap_x6fn(void (*fn)(void *), void *data) { int err; if ((err = setjmp(env)) == 0) { fn(data); } else { errno = err; return -err; } return 0; } int wrap_uintfn(void (*fn)(unsigned int), unsigned int data) { int err; if ((err = setjmp(env)) == 0) { fn(data); } else { errno = err; return -err; } return 0; } int wrap_voidfn(void (*fn)(void)) { int err; if ((err = setjmp(env)) == 0) { fn(); } else { errno = err; return -err; } return 0; } 07070100000026000081A40000000000000000000000015EE6923E000006AE000000000000000000000000000000000000001F00000000python-iptables-1.0.0/setup.py#!/usr/bin/env python """python-iptables setup script""" from setuptools import setup, Extension #from distutils.core import setup, Extension # make pyflakes happy __pkgname__ = None __version__ = None exec(open("iptc/version.py").read()) # build/install python-iptables setup( name=__pkgname__, version=__version__, description="Python bindings for iptables", author="Vilmos Nebehaj", author_email="v.nebehaj@gmail.com", url="https://github.com/ldx/python-iptables", packages=["iptc"], package_dir={"iptc": "iptc"}, ext_modules=[Extension("libxtwrapper", ["libxtwrapper/wrapper.c"])], test_suite="tests", classifiers=[ "Development Status :: 5 - Production/Stable", "Environment :: Console", "Intended Audience :: Developers", "Intended Audience :: Information Technology", "Intended Audience :: System Administrators", "Intended Audience :: Telecommunications Industry", "License :: OSI Approved :: Apache Software License", "Natural Language :: English", "Operating System :: POSIX :: Linux", "Programming Language :: Python", "Topic :: Software Development :: Libraries", "Topic :: System :: Networking :: Firewalls", "Topic :: System :: Systems Administration", "Programming Language :: Python :: 2", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: Implementation :: CPython", ], license="Apache License, Version 2.0", ) 07070100000027000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001C00000000python-iptables-1.0.0/tests07070100000028000081A40000000000000000000000015EE6923E00000000000000000000000000000000000000000000002800000000python-iptables-1.0.0/tests/__init__.py07070100000029000081ED0000000000000000000000015EE6923E000089DF000000000000000000000000000000000000002900000000python-iptables-1.0.0/tests/test_iptc.py# -*- coding: utf-8 -*- import unittest import iptc is_table_available = iptc.is_table_available is_table6_available = iptc.is_table6_available def _check_chains(testcase, *chains): for chain in chains: if chain is None: continue for ch in [c for c in chains if c != chain and c is not None]: testcase.assertNotEquals(id(chain), id(ch)) class TestTable6(unittest.TestCase): def setUp(self): self.autocommit = iptc.Table(iptc.Table.FILTER).autocommit def tearDown(self): iptc.Table(iptc.Table.FILTER, self.autocommit) def test_table6(self): filt = None if is_table6_available(iptc.Table6.FILTER): filt = iptc.Table6("filter") self.assertEquals(id(filt), id(iptc.Table6(iptc.Table6.FILTER))) security = None if is_table6_available(iptc.Table6.SECURITY): security = iptc.Table6("security") self.assertEquals(id(security), id(iptc.Table6(iptc.Table6.SECURITY))) mangle = None if is_table6_available(iptc.Table6.MANGLE): mangle = iptc.Table6("mangle") self.assertEquals(id(mangle), id(iptc.Table6(iptc.Table6.MANGLE))) raw = None if is_table6_available(iptc.Table6.RAW): raw = iptc.Table6("raw") self.assertEquals(id(raw), id(iptc.Table6(iptc.Table6.RAW))) _check_chains(self, filt, security, mangle, raw) def test_table6_autocommit(self): table = iptc.Table(iptc.Table.FILTER, False) self.assertEquals(table.autocommit, False) rule = iptc.Rule() rule.src = "1.2.3.4" rule.dst = "2.3.4.5" rule.protocol = "tcp" self.assertEquals(table.autocommit, False) rule.create_target('DROP') self.assertEquals(table.autocommit, False) match = rule.create_match('tcp') match.dport = "80:90" self.assertEquals(table.autocommit, False) class TestTable(unittest.TestCase): def setUp(self): self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_chain") iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): iptc.Table(iptc.Table.FILTER).flush() def test_table(self): filt = None if is_table_available(iptc.Table.FILTER): filt = iptc.Table("filter") self.assertEquals(id(filt), id(iptc.Table(iptc.Table.FILTER))) nat = None if is_table_available(iptc.Table.NAT): nat = iptc.Table("nat") self.assertEquals(id(nat), id(iptc.Table(iptc.Table.NAT))) mangle = None if is_table_available(iptc.Table.MANGLE): mangle = iptc.Table("mangle") self.assertEquals(id(mangle), id(iptc.Table(iptc.Table.MANGLE))) raw = None if is_table_available(iptc.Table.RAW): raw = iptc.Table("raw") self.assertEquals(id(raw), id(iptc.Table(iptc.Table.RAW))) _check_chains(self, filt, nat, mangle, raw) def test_refresh(self): rule = iptc.Rule() match = iptc.Match(rule, "tcp") match.dport = "1234" rule.add_match(match) try: self.chain.insert_rule(rule) iptc.Table(iptc.Table.FILTER).delete_chain(self.chain) self.fail("inserted invalid rule") except: pass iptc.Table(iptc.Table.FILTER).refresh() target = iptc.Target(rule, "ACCEPT") rule.target = target rule.protocol = "tcp" self.chain.insert_rule(rule) self.chain.delete_rule(rule) def test_flush_user_chains(self): chain1 = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_flush_chain1") chain2 = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_flush_chain2") iptc.Table(iptc.Table.FILTER).create_chain(chain1) iptc.Table(iptc.Table.FILTER).create_chain(chain2) rule = iptc.Rule() rule.target = iptc.Target(rule, chain2.name) chain1.append_rule(rule) rule = iptc.Rule() rule.target = iptc.Target(rule, chain1.name) chain2.append_rule(rule) self.assertEquals(len(chain1.rules), 1) self.assertEquals(len(chain2.rules), 1) filter_table = iptc.Table(iptc.Table.FILTER) filter_table.flush() self.assertTrue(not filter_table.is_chain(chain1.name)) self.assertTrue(not filter_table.is_chain(chain2.name)) def test_flush_builtin(self): filter_table = iptc.Table(iptc.Table.FILTER) output_rule_count = len(iptc.Chain(filter_table, "OUTPUT").rules) rule = iptc.Rule() rule.target = iptc.Target(rule, "ACCEPT") iptc.Chain(filter_table, "OUTPUT").append_rule(rule) self.assertEquals(len(iptc.Chain(filter_table, "OUTPUT").rules), output_rule_count + 1) filter_table.flush() self.assertEquals(len(iptc.Chain(filter_table, "OUTPUT").rules), 0) class TestChain(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_chain(self): table = iptc.Table(iptc.Table.FILTER) input1 = iptc.Chain(table, "INPUT") input2 = iptc.Chain(table, "INPUT") forward1 = iptc.Chain(table, "FORWARD") forward2 = iptc.Chain(table, "FORWARD") output1 = iptc.Chain(table, "OUTPUT") output2 = iptc.Chain(table, "OUTPUT") self.assertEquals(id(input1), id(input2)) self.assertEquals(id(output1), id(output2)) self.assertEquals(id(forward1), id(forward2)) self.assertNotEquals(id(input1), id(output1)) self.assertNotEquals(id(input1), id(output2)) self.assertNotEquals(id(input1), id(forward1)) self.assertNotEquals(id(input1), id(forward2)) self.assertNotEquals(id(input2), id(output1)) self.assertNotEquals(id(input2), id(output2)) self.assertNotEquals(id(input2), id(forward1)) self.assertNotEquals(id(input2), id(forward2)) self.assertNotEquals(id(output1), id(forward1)) self.assertNotEquals(id(output1), id(forward2)) self.assertNotEquals(id(output2), id(forward1)) self.assertNotEquals(id(output2), id(forward2)) def test_is_chain(self): if is_table_available(iptc.Table.FILTER): table = iptc.Table(iptc.Table.FILTER) self.assertTrue(table.is_chain("INPUT")) self.assertTrue(table.is_chain("FORWARD")) self.assertTrue(table.is_chain("OUTPUT")) if is_table_available(iptc.Table.NAT): table = iptc.Table(iptc.Table.NAT) self.assertTrue(table.is_chain("PREROUTING")) self.assertTrue(table.is_chain("POSTROUTING")) self.assertTrue(table.is_chain("OUTPUT")) if is_table_available(iptc.Table.MANGLE): table = iptc.Table(iptc.Table.MANGLE) self.assertTrue(table.is_chain("INPUT")) self.assertTrue(table.is_chain("PREROUTING")) self.assertTrue(table.is_chain("FORWARD")) self.assertTrue(table.is_chain("POSTROUTING")) self.assertTrue(table.is_chain("OUTPUT")) if is_table_available(iptc.Table.RAW): table = iptc.Table(iptc.Table.RAW) self.assertTrue(table.is_chain("PREROUTING")) self.assertTrue(table.is_chain("OUTPUT")) def test_builtin_chain(self): if is_table_available(iptc.Table.FILTER): table = iptc.Table(iptc.Table.FILTER) self.assertTrue(table.builtin_chain("INPUT")) self.assertTrue(table.builtin_chain("FORWARD")) self.assertTrue(table.builtin_chain("OUTPUT")) if is_table_available(iptc.Table.NAT): table = iptc.Table(iptc.Table.NAT) self.assertTrue(table.builtin_chain("PREROUTING")) self.assertTrue(table.builtin_chain("POSTROUTING")) self.assertTrue(table.builtin_chain("OUTPUT")) if is_table_available(iptc.Table.MANGLE): table = iptc.Table(iptc.Table.MANGLE) self.assertTrue(table.builtin_chain("INPUT")) self.assertTrue(table.builtin_chain("PREROUTING")) self.assertTrue(table.builtin_chain("FORWARD")) self.assertTrue(table.builtin_chain("POSTROUTING")) self.assertTrue(table.builtin_chain("OUTPUT")) if is_table_available(iptc.Table.RAW): table = iptc.Table(iptc.Table.RAW) self.assertTrue(table.builtin_chain("PREROUTING")) self.assertTrue(table.builtin_chain("OUTPUT")) def test_chain_filter(self): if is_table_available(iptc.Table.FILTER): table = iptc.Table(iptc.Table.FILTER) table.autocommit = True self.assertTrue(len(table.chains) >= 3) for chain in table.chains: if chain.name not in ["INPUT", "FORWARD", "OUTPUT"]: self.failIf(chain.is_builtin()) def test_chain_nat(self): if is_table_available(iptc.Table.NAT): table = iptc.Table(iptc.Table.NAT) table.autocommit = True self.assertTrue(len(table.chains) >= 3) for chain in table.chains: if chain.name not in ["INPUT", "PREROUTING", "POSTROUTING", "OUTPUT"]: self.failIf(chain.is_builtin()) def test_chain_mangle(self): if is_table_available(iptc.Table.MANGLE): table = iptc.Table(iptc.Table.MANGLE) table.autocommit = True self.assertTrue(len(table.chains) >= 5) for chain in table.chains: if chain.name not in ["PREROUTING", "POSTROUTING", "INPUT", "FORWARD", "OUTPUT"]: self.failIf(chain.is_builtin()) def test_chain_raw(self): if is_table_available(iptc.Table.RAW): table = iptc.Table(iptc.Table.RAW) table.autocommit = True self.assertTrue(len(table.chains) >= 2) for chain in table.chains: if chain.name not in ["PREROUTING", "OUTPUT"]: self.failIf(chain.is_builtin()) def _get_tables(self): tables = [] if is_table_available(iptc.Table.FILTER): tables.append(iptc.Table(iptc.Table.FILTER)) if is_table_available(iptc.Table.NAT): tables.append(iptc.Table(iptc.Table.NAT)) if is_table_available(iptc.Table.MANGLE): tables.append(iptc.Table(iptc.Table.MANGLE)) if is_table_available(iptc.Table.RAW): tables.append(iptc.Table(iptc.Table.RAW)) return tables def test_chain_counters(self): tables = self._get_tables() for chain in (chain for table in tables for chain in table.chains): counters = chain.get_counters() fails = 0 for x in range(3): # try 3 times chain.zero_counters() counters = chain.get_counters() if counters: # only built-in chains if counters[0] != 0 or counters[1] != 0: fails += 1 self.failIf(fails > 2) def test_create_chain(self): chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_chain") iptc.Table(iptc.Table.FILTER).create_chain(chain) self.failUnless(iptc.Table(iptc.Table.FILTER).is_chain(chain)) iptc.Table(iptc.Table.FILTER).delete_chain(chain) self.failIf(iptc.Table(iptc.Table.FILTER).is_chain(chain)) def test_filter_policy(self): if is_table_available(iptc.Table.FILTER): table = iptc.Table(iptc.Table.FILTER) input_chain = iptc.Chain(table, "INPUT") pol = iptc.Policy("DROP") input_chain.set_policy(pol) rpol = input_chain.get_policy() self.assertEquals(id(pol), id(rpol)) pol = iptc.Policy("ACCEPT") input_chain.set_policy(pol) rpol = input_chain.get_policy() self.assertEquals(id(pol), id(rpol)) pol = iptc.Policy("RETURN") try: input_chain.set_policy(pol) except iptc.IPTCError: pass else: self.fail("managed to set INPUT policy to RETURN") def test_nat_policy(self): if is_table_available(iptc.Table.NAT): table = iptc.Table(iptc.Table.NAT) prerouting_chain = iptc.Chain(table, "PREROUTING") pol = iptc.Policy("DROP") prerouting_chain.set_policy(pol) rpol = prerouting_chain.get_policy() self.assertEquals(id(pol), id(rpol)) pol = iptc.Policy("ACCEPT") prerouting_chain.set_policy(pol) rpol = prerouting_chain.get_policy() self.assertEquals(id(pol), id(rpol)) pol = iptc.Policy("RETURN") try: prerouting_chain.set_policy(pol) except iptc.IPTCError: pass else: self.fail("managed to set PREROUTING policy to RETURN") if is_table_available(iptc.Table.MANGLE): table = iptc.Table(iptc.Table.MANGLE) forward_chain = iptc.Chain(table, "FORWARD") pol = iptc.Policy("DROP") forward_chain.set_policy(pol) rpol = forward_chain.get_policy() self.assertEquals(id(pol), id(rpol)) pol = iptc.Policy("ACCEPT") forward_chain.set_policy(pol) rpol = forward_chain.get_policy() self.assertEquals(id(pol), id(rpol)) pol = iptc.Policy("RETURN") try: forward_chain.set_policy(pol) except iptc.IPTCError: pass else: self.fail("managed to set FORWARD policy to RETURN") class TestRule6(unittest.TestCase): def setUp(self): self.chain = iptc.Chain(iptc.Table6(iptc.Table6.FILTER), "iptc_test_chain") iptc.Table6(iptc.Table6.FILTER).create_chain(self.chain) def tearDown(self): self.chain.flush() self.chain.delete() def test_create_mask(self): rule = iptc.Rule6() # Mask /10 should return \xff\xc0\x00... mask = rule._create_mask(10) self.assertEquals(mask[0], 0xff) self.assertEquals(mask[1], 0xc0) self.assertEquals(mask[2:], [0x00]*14) # Mask /27 should return \xff\xff\xff\xe0... mask = rule._create_mask(27) self.assertEquals(mask[:3], [0xff, 0xff, 0xff]) self.assertEquals(mask[3], 0xe0) self.assertEquals(mask[4:], [0x00]*12) def test_rule_address(self): # valid addresses rule = iptc.Rule6() for addr in ["::/128", "!2000::1/16", "2001::/64", "!2001::1/48"]: rule.src = addr self.assertEquals(rule.src, addr) rule.dst = addr self.assertEquals(rule.dst, addr) addr = "::1" rule.src = addr self.assertEquals("::1/128", rule.src) rule.dst = addr self.assertEquals("::1/128", rule.dst) # invalid addresses for addr in ["2001:fg::/::", "2001/ffff::", "2001::/-1", "2001::/129", "::1/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", "::1/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff:"]: try: rule.src = addr except ValueError: pass else: self.fail("rule accepted invalid address %s" % (addr)) try: rule.dst = addr except ValueError: pass else: self.fail("rule accepted invalid address %s" % (addr)) def test_rule_interface(self): # valid interfaces rule = iptc.Rule6() for intf in ["eth0", "eth+", "ip6tnl1", "ip6tnl+", "!ppp0", "!ppp+"]: rule.in_interface = intf self.assertEquals(intf, rule.in_interface) rule.out_interface = intf self.assertEquals(intf, rule.out_interface) # invalid interfaces for intf in ["itsaverylonginterfacename"]: try: rule.out_interface = intf except ValueError: pass else: self.fail("rule accepted invalid interface name %s" % (intf)) try: rule.in_interface = intf except ValueError: pass else: self.fail("rule accepted invalid interface name %s" % (intf)) def test_rule_protocol(self): rule = iptc.Rule6() for proto in ["tcp", "udp", "icmp", "AH", "ESP", "!TCP", "!UDP", "!ICMP", "!ah", "!esp", "sctp", "!SCTP"]: rule.protocol = proto self.assertEquals(proto.lower(), rule.protocol) for proto in ["", "asdf", "!"]: try: rule.protocol = proto except ValueError: pass except IndexError: pass else: self.fail("rule accepted invalid protocol %s" % (proto)) def test_rule_protocol_numeric(self): rule = iptc.Rule6() rule.protocol = 33 self.assertEquals(rule.protocol, '33') rule.protocol = '!33' self.assertEquals(rule.protocol, '!33') def test_rule_compare(self): r1 = iptc.Rule6() r1.src = "::1/128" r1.dst = "2001::/8" r1.protocol = "tcp" r1.in_interface = "wlan+" r1.out_interface = "eth1" r2 = iptc.Rule6() r2.src = "::1/128" r2.dst = "2001::/8" r2.protocol = "tcp" r2.in_interface = "wlan+" r2.out_interface = "eth1" self.failUnless(r1 == r2) r1.src = "::1/ffff::" self.failIf(r1 == r2) def test_rule_standard_target(self): try: target = iptc.Target(iptc.Rule(), "jump_to_chain") except: pass else: self.fail("target accepted invalid name jump_to_chain") rule = iptc.Rule6() rule.protocol = "tcp" rule.src = "::1" target = iptc.Target(rule, "RETURN") self.assertEquals(target.name, "RETURN") target = iptc.Target(rule, "ACCEPT") self.assertEquals(target.name, "ACCEPT") target = iptc.Target(rule, "") self.assertEquals(target.name, "") target.standard_target = "ACCEPT" self.assertEquals(target.name, "ACCEPT") self.assertEquals(target.standard_target, "ACCEPT") target = iptc.Target(rule, self.chain.name) rule.target = target self.chain.insert_rule(rule) self.chain.delete_rule(rule) def test_rule_iterate_filter(self): if is_table6_available(iptc.Table6.FILTER): for r in (rule for chain in iptc.Table6(iptc.Table6.FILTER).chains for rule in chain.rules if rule): pass def test_rule_iterate_raw(self): if is_table6_available(iptc.Table6.RAW): for r in (rule for chain in iptc.Table6(iptc.Table6.RAW).chains for rule in chain.rules if rule): pass def test_rule_iterate_mangle(self): if is_table6_available(iptc.Table6.MANGLE): for r in (rule for chain in iptc.Table6(iptc.Table6.MANGLE).chains for rule in chain.rules if rule): pass def test_rule_iterate_security(self): if is_table6_available(iptc.Table6.SECURITY): for r in (rule for chain in iptc.Table6(iptc.Table6.SECURITY).chains for rule in chain.rules if rule): pass def test_rule_insert(self): rules = [] rule = iptc.Rule6() rule.protocol = "tcp" rule.src = "::1" target = iptc.Target(rule, "ACCEPT") rule.target = target self.chain.insert_rule(rule) rules.append(rule) rule = iptc.Rule6() rule.protocol = "udp" rule.src = "::1" target = iptc.Target(rule, "REJECT") target.reject_with = "addr-unreach" rule.target = target self.chain.insert_rule(rule) rules.append(rule) rule = iptc.Rule6() rule.protocol = "tcp" rule.dst = "2001::/16" target = iptc.Target(rule, "RETURN") rule.target = target self.chain.insert_rule(rule) rules.append(rule) crules = self.chain.rules self.failUnless(len(rules) == len(crules)) for rule in rules: self.failUnless(rule in crules) crules.remove(rule) def test_rule_to_dict(self): rule = iptc.Rule6() rule.protocol = "tcp" rule.src = "::1/128" target = iptc.Target(rule, "ACCEPT") rule.target = target rule_d = iptc.easy.decode_iptc_rule(rule, ipv6=True) # Remove counters when comparing rules rule_d.pop('counters', None) self.assertEqual(rule_d, {"protocol": "tcp", "src": "::1/128", "target": "ACCEPT"}) def test_rule_from_dict(self): rule = iptc.Rule6() rule.protocol = "tcp" rule.src = "::1/128" target = iptc.Target(rule, "ACCEPT") rule.target = target rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src": "::1/128", "target": "ACCEPT"}, ipv6=True) self.assertEqual(rule, rule2) class TestRule(unittest.TestCase): def setUp(self): self.table = iptc.Table(iptc.Table.FILTER) self.chain = iptc.Chain(self.table, "iptc_test_chain") try: self.table.create_chain(self.chain) except: self.chain.flush() if is_table_available(iptc.Table.NAT): self.table_nat = iptc.Table(iptc.Table.NAT) self.chain_nat = iptc.Chain(self.table_nat, "iptc_test_nat_chain") try: self.table_nat.create_chain(self.chain_nat) except: self.chain_nat.flush() def tearDown(self): self.table.autocommit = True self.chain.flush() self.chain.delete() if is_table_available(iptc.Table.NAT): self.table_nat.autocommit = True self.chain_nat.flush() self.chain_nat.delete() def test_rule_address(self): # valid addresses rule = iptc.Rule() for addr in [("127.0.0.1/255.255.255.0", "127.0.0.0/255.255.255.0"), ("!127.0.0.1/255.255.255.0", "!127.0.0.0/255.255.255.0"), ("127.0.0.1/255.255.128.0", "127.0.0.0/255.255.128.0"), ("127.0.0.1/16", "127.0.0.0/255.255.0.0"), ("127.0.0.1/24", "127.0.0.0/255.255.255.0"), ("127.0.0.1/17", "127.0.0.0/255.255.128.0"), ("!127.0.0.1/17", "!127.0.0.0/255.255.128.0")]: rule.src = addr[0] self.assertEquals(rule.src, addr[1]) rule.dst = addr[0] self.assertEquals(rule.dst, addr[1]) addr = "127.0.0.1" rule.src = addr self.assertEquals("127.0.0.1/255.255.255.255", rule.src) rule.dst = addr self.assertEquals("127.0.0.1/255.255.255.255", rule.dst) # invalid addresses for addr in ["127.256.0.1/255.255.255.0", "127.0.1/255.255.255.0", "127.0.0.1/255.255.255.", "127.0.0.1 255.255.255.0", "127.0.0.1/33", "127.0.0.1/-5", "127.0.0.1/255.5"]: try: rule.src = addr except ValueError: pass else: self.fail("rule accepted invalid address %s" % (addr)) try: rule.dst = addr except ValueError: pass else: self.fail("rule accepted invalid address %s" % (addr)) def test_rule_interface(self): # valid interfaces rule = iptc.Rule() for intf in ["eth0", "eth+", "ip6tnl1", "ip6tnl+", "!ppp0", "!ppp+"]: rule.in_interface = intf self.assertEquals(intf, rule.in_interface) rule.out_interface = intf self.assertEquals(intf, rule.out_interface) rule.create_target("ACCEPT") self.chain.insert_rule(rule) r = self.chain.rules[0] eq = r == rule self.chain.flush() self.assertTrue(eq) # invalid interfaces for intf in ["itsaverylonginterfacename"]: try: rule.out_interface = intf except ValueError: pass else: self.fail("rule accepted invalid interface name %s" % (intf)) try: rule.in_interface = intf except ValueError: pass else: self.fail("rule accepted invalid interface name %s" % (intf)) def test_rule_fragment(self): rule = iptc.Rule() for frag in [("1", True), ("true", True), ("asdf", True), (1, True), (0, False), ("", False), (None, False)]: rule.fragment = frag[0] self.assertEquals(frag[1], rule.fragment) def test_rule_protocol(self): rule = iptc.Rule() for proto in ["tcp", "udp", "icmp", "AH", "ESP", "!TCP", "!UDP", "!ICMP", "!ah", "!esp"]: rule.protocol = proto self.assertEquals(proto.lower(), rule.protocol) for proto in ["", "asdf", "!"]: try: rule.protocol = proto except ValueError: pass except IndexError: pass else: self.fail("rule accepted invalid protocol %s" % (proto)) def test_rule_protocol_numeric(self): rule = iptc.Rule() rule.protocol = 33 self.assertEquals(rule.protocol, '33') rule.protocol = '!33' self.assertEquals(rule.protocol, '!33') def test_rule_compare(self): r1 = iptc.Rule() r1.src = "127.0.0.2/255.255.255.0" r1.dst = "224.1.2.3/255.255.0.0" r1.protocol = "tcp" r1.fragment = False r1.in_interface = "wlan+" r1.out_interface = "eth1" r2 = iptc.Rule() r2.src = "127.0.0.2/255.255.255.0" r2.dst = "224.1.2.3/255.255.0.0" r2.protocol = "tcp" r2.fragment = False r2.in_interface = "wlan+" r2.out_interface = "eth1" self.failUnless(r1 == r2) r1.src = "127.0.0.1" self.failIf(r1 == r2) def test_rule_standard_target(self): try: target = iptc.Target(iptc.Rule(), "jump_to_chain") except: pass else: self.fail("target accepted invalid name jump_to_chain") rule = iptc.Rule() rule.protocol = "tcp" rule.src = "127.0.0.1" target = iptc.Target(rule, "RETURN") self.assertEquals(target.name, "RETURN") target = iptc.Target(rule, "ACCEPT") self.assertEquals(target.name, "ACCEPT") target = iptc.Target(rule, "") self.assertEquals(target.name, "") target.standard_target = "ACCEPT" self.assertEquals(target.name, "ACCEPT") self.assertEquals(target.standard_target, "ACCEPT") target = iptc.Target(rule, self.chain.name) rule.target = target self.chain.insert_rule(rule) self.chain.delete_rule(rule) def test_rule_iterate_filter(self): if is_table_available(iptc.Table.FILTER): for r in (rule for chain in iptc.Table(iptc.Table.FILTER).chains for rule in chain.rules if rule): pass def test_rule_iterate_nat(self): if is_table_available(iptc.Table.NAT): for r in (rule for chain in iptc.Table(iptc.Table.NAT).chains for rule in chain.rules if rule): pass def test_rule_iterate_mangle(self): if is_table_available(iptc.Table.MANGLE): for r in (rule for chain in iptc.Table(iptc.Table.MANGLE).chains for rule in chain.rules if rule): pass def test_rule_iterate_rulenum(self): """Ensure rule numbers are always returned in order""" insert_rule_count = 3 append_rule_count = 3 for rule_num in range(insert_rule_count, 0, -1): rule = iptc.Rule() match = rule.create_match("comment") match.comment = "rule{rule_num}".format(rule_num=rule_num) rule.create_target("ACCEPT") self.chain.insert_rule(rule) append_rulenum_start = insert_rule_count + 1 append_rulenum_end = append_rulenum_start + 3 for rule_num in range(append_rulenum_start, append_rulenum_end): rule = iptc.Rule() match = rule.create_match("comment") match.comment = "rule{rule_num}".format(rule_num=rule_num) rule.create_target("ACCEPT") self.chain.append_rule(rule) rules = self.chain.rules assert len(rules) == (insert_rule_count + append_rule_count) for rule_num, rule in enumerate(rules, start=1): assert len(rule.matches) == 1 assert rule.matches[0].comment == "rule{rule_num}".format( rule_num=rule_num), \ "rule[{left_num}] is not new {right_num}".format( left_num=rule_num, right_num=rule.matches[0].comment ) def test_rule_insert(self): rules = [] rule = iptc.Rule() rule.protocol = "tcp" rule.src = "127.0.0.1" target = iptc.Target(rule, "ACCEPT") rule.target = target self.chain.insert_rule(rule) rules.append(rule) rule = iptc.Rule() rule.protocol = "udp" rule.src = "127.0.0.1" target = iptc.Target(rule, "REJECT") target.reject_with = "host-unreach" rule.target = target self.chain.insert_rule(rule) rules.append(rule) rule = iptc.Rule() rule.protocol = "tcp" rule.dst = "10.1.1.0/255.255.255.0" target = iptc.Target(rule, "RETURN") rule.target = target self.chain.insert_rule(rule) rules.append(rule) crules = self.chain.rules self.failUnless(len(rules) == len(crules)) for rule in rules: self.failUnless(rule in crules) crules.remove(rule) def test_rule_replace(self): rule = iptc.Rule() rule.protocol = "tcp" rule.src = "127.0.0.1" target = iptc.Target(rule, "ACCEPT") rule.target = target self.chain.insert_rule(rule, 0) rule = iptc.Rule() rule.protocol = "udp" rule.src = "127.0.0.1" target = iptc.Target(rule, "ACCEPT") rule.target = target self.chain.replace_rule(rule, 0) self.failUnless(self.chain.rules[0] == rule) def test_rule_multiple_parameters(self): self.table.autocommit = False self.table.refresh() rule = iptc.Rule() rule.dst = "127.0.0.1" rule.protocol = "tcp" match = rule.create_match('tcp') match.sport = "1234" match.dport = "8080" target = rule.create_target("REJECT") target.reject_with = "icmp-host-unreachable" self.chain.insert_rule(rule) self.table.commit() self.table.refresh() self.assertEquals(len(self.chain.rules), 1) r = self.chain.rules[0] self.assertEquals(r.src, '0.0.0.0/0.0.0.0') self.assertEquals(r.dst, '127.0.0.1/255.255.255.255') self.assertEquals(r.protocol, 'tcp') self.assertEquals(len(r.matches), 1) m = r.matches[0] self.assertEquals(m.name, 'tcp') self.assertEquals(m.sport, '1234') self.assertEquals(m.dport, '8080') def test_rule_delete(self): self.table.autocommit = False self.table.refresh() for p in ['8001', '8002', '8003']: rule = iptc.Rule() rule.dst = "127.0.0.1" rule.protocol = "tcp" rule.dport = "8080" target = rule.create_target("REJECT") target.reject_with = "icmp-host-unreachable" self.chain.insert_rule(rule) self.table.commit() self.table.refresh() rules = self.chain.rules for rule in rules: self.chain.delete_rule(rule) self.table.commit() self.table.refresh() def test_rule_delete_nat(self): if not is_table_available(iptc.Table.NAT): return self.table_nat.autocommit = False self.table_nat.refresh() for p in ['8001', '8002', '8003']: rule = iptc.Rule() rule.dst = "127.0.0.1" rule.protocol = "udp" rule.dport = "8080" target = rule.create_target("DNAT") target.to_destination = '127.0.0.0:' + p self.chain_nat.insert_rule(rule) self.table_nat.commit() self.table_nat.refresh() rules = self.chain_nat.rules for rule in rules: self.chain_nat.delete_rule(rule) self.table_nat.commit() self.table_nat.refresh() def test_rule_to_dict(self): rule = iptc.Rule() rule.protocol = "tcp" rule.src = "127.0.0.1/32" target = iptc.Target(rule, "ACCEPT") rule.target = target rule_d = iptc.easy.decode_iptc_rule(rule) # Remove counters when comparing rules rule_d.pop('counters', None) self.assertEqual(rule_d, {"protocol": "tcp", "src": "127.0.0.1/32", "target": "ACCEPT"}) def test_rule_from_dict(self): rule = iptc.Rule() rule.protocol = "tcp" rule.src = "127.0.0.1/32" target = iptc.Target(rule, "ACCEPT") rule.target = target rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src": "127.0.0.1/32", "target": "ACCEPT"}) self.assertEqual(rule, rule2) def suite(): suite_table6 = unittest.TestLoader().loadTestsFromTestCase(TestTable6) suite_table = unittest.TestLoader().loadTestsFromTestCase(TestTable) suite_chain = unittest.TestLoader().loadTestsFromTestCase(TestChain) suite_rule6 = unittest.TestLoader().loadTestsFromTestCase(TestRule6) suite_rule = unittest.TestLoader().loadTestsFromTestCase(TestRule) return unittest.TestSuite([suite_table6, suite_table, suite_chain, suite_rule6, suite_rule]) def run_tests(): result = unittest.TextTestRunner(verbosity=2).run(suite()) if result.errors or result.failures: return 1 return 0 if __name__ == "__main__": unittest.main() 0707010000002A000081ED0000000000000000000000015EE6923E0000437F000000000000000000000000000000000000002C00000000python-iptables-1.0.0/tests/test_matches.py# -*- coding: utf-8 -*- import unittest import iptc is_table6_available = iptc.is_table6_available class TestMatch(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_match_create(self): rule = iptc.Rule() match = rule.create_match("udp") for m in rule.matches: self.assertEqual(m, match) # check that we can change match parameters after creation match.sport = "12345:55555" match.dport = "!33333" m = iptc.Match(iptc.Rule(), "udp") m.sport = "12345:55555" m.dport = "!33333" self.assertEqual(m, match) def test_match_compare(self): m1 = iptc.Match(iptc.Rule(), "udp") m1.sport = "12345:55555" m1.dport = "!33333" m2 = iptc.Match(iptc.Rule(), "udp") m2.sport = "12345:55555" m2.dport = "!33333" self.assertEqual(m1, m2) m2.reset() m2.sport = "12345:55555" m2.dport = "33333" self.assertNotEqual(m1, m2) def test_match_parameters(self): m = iptc.Match(iptc.Rule(), "udp") m.sport = "12345:55555" m.dport = "!33333" self.assertEqual(len(m.parameters), 2) for p in m.parameters: self.assertTrue(p == "sport" or p == "dport") self.assertEqual(m.parameters["sport"], "12345:55555") self.assertEqual(m.parameters["dport"], "!33333") m.reset() self.assertEqual(len(m.parameters), 0) def test_get_all_parameters(self): m = iptc.Match(iptc.Rule(), "udp") m.sport = "12345:55555" m.dport = "!33333" params = m.get_all_parameters() self.assertEqual(set(params['sport']), set(['12345:55555'])) self.assertEqual(set(params['dport']), set(['!', '33333'])) class TestMultiportMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "udp" self.rule.create_target("ACCEPT") self.match = self.rule.create_match("multiport") table = iptc.Table(iptc.Table.FILTER) self.chain = iptc.Chain(table, "iptc_test_udp") try: self.chain.flush() self.chain.delete() except: pass iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_multiport(self): self.match.dports = '1111,2222' self.chain.insert_rule(self.rule) rule = self.chain.rules[0] match = rule.matches[0] self.assertEqual(match.dports, '1111,2222') def test_unicode_multiport(self): self.match.dports = u'1111,2222' self.chain.insert_rule(self.rule) rule = self.chain.rules[0] match = rule.matches[0] self.assertEqual(match.dports, '1111,2222') class TestXTUdpMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "udp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "udp") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_udp") iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_udp_port(self): for port in ["12345", "12345:65535", "!12345", "12345:12346", "!12345:12346", "0:1234", "! 1234", "!0:12345", "!1234:65535"]: self.match.sport = port self.assertEqual(self.match.sport, port.replace(" ", "")) self.match.dport = port self.assertEqual(self.match.dport, port.replace(" ", "")) self.match.reset() for port in ["-1", "asdf", "!asdf"]: try: self.match.sport = port except Exception: pass else: self.fail("udp accepted invalid source port %s" % (port)) try: self.match.dport = port except Exception: pass else: self.fail("udp accepted invalid destination port %s" % (port)) self.match.reset() def test_udp_insert(self): self.match.reset() self.match.dport = "12345" self.rule.add_match(self.match) self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestXTMarkMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "tcp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "mark") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_mark") iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_mark(self): for mark in ["0x7b", "! 0x7b", "0x7b/0xfffefffe", "!0x7b/0xff00ff00"]: self.match.mark = mark self.assertEqual(self.match.mark, mark.replace(" ", "")) self.match.reset() for mark in ["0xffffffffff", "123/0xffffffff1", "!asdf", "1234:1233"]: try: self.match.mark = mark except Exception: pass else: self.fail("mark accepted invalid value %s" % (mark)) self.match.reset() def test_mark_insert(self): self.match.reset() self.match.mark = "0x123" self.rule.add_match(self.match) self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestXTLimitMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "tcp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "limit") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_limit") iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_limit(self): for limit in ["1/sec", "5/min", "3/hour"]: self.match.limit = limit self.assertEqual(self.match.limit, limit) self.match.reset() for limit in ["asdf", "123/1", "!1", "!1/second"]: try: self.match.limit = limit except Exception: pass else: self.fail("limit accepted invalid value %s" % (limit)) self.match.reset() def test_limit_insert(self): self.match.reset() self.match.limit = "1/min" self.rule.add_match(self.match) self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestIcmpv6Match(unittest.TestCase): def setUp(self): self.rule = iptc.Rule6() self.rule.protocol = "icmpv6" self.rule.in_interface = "eth0" self.target = self.rule.create_target("ACCEPT") self.match = self.rule.create_match("icmp6") self.match.icmpv6_type = "echo-request" self.table = iptc.Table6(iptc.Table6.FILTER) self.chain = iptc.Chain(self.table, "ip6tc_test_icmpv6") try: self.table.delete_chain(self.chain) except: pass self.table.create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_icmpv6(self): self.chain.insert_rule(self.rule) rule = self.chain.rules[0] self.assertEqual(self.rule, rule) class TestCommentMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "udp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "comment") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_comment") iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_comment(self): comment = "comment test" self.match.reset() self.match.comment = comment self.chain.insert_rule(self.rule) self.assertEqual(self.match.comment, comment) class TestIprangeMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.protocol = "tcp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "iprange") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_iprange") iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_iprange(self): self.match.src_range = "192.168.1.100-192.168.1.200" self.match.dst_range = "172.22.33.106" self.rule.add_match(self.match) self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") def test_iprange_tcpdport(self): self.match.src_range = "192.168.1.100-192.168.1.200" self.match.dst_range = "172.22.33.106" self.rule.add_match(self.match) match = iptc.Match(self.rule, "tcp") match.dport = "22" self.rule.add_match(match) self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestXTStateMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "tcp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "state") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_state") self.table = iptc.Table(iptc.Table.FILTER) try: self.chain.flush() self.chain.delete() except: pass self.table.create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_state(self): self.match.state = "RELATED,ESTABLISHED" self.rule.add_match(self.match) self.chain.insert_rule(self.rule) rule = self.chain.rules[0] m = rule.matches[0] self.assertEqual(m.name, "state") self.assertEqual(m.state, "RELATED,ESTABLISHED") self.assertEqual(rule.matches[0].name, self.rule.matches[0].name) self.assertEqual(rule, self.rule) class TestXTConntrackMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "tcp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "conntrack") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_conntrack") self.table = iptc.Table(iptc.Table.FILTER) try: self.chain.flush() self.chain.delete() except: pass self.table.create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_state(self): self.match.ctstate = "NEW,RELATED" self.rule.add_match(self.match) self.chain.insert_rule(self.rule) rule = self.chain.rules[0] m = rule.matches[0] self.assertTrue(m.name, ["conntrack"]) self.assertEqual(m.ctstate, "NEW,RELATED") class TestHashlimitMatch(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.src = "127.0.0.1" self.rule.protocol = "udp" self.rule.target = iptc.Target(self.rule, "ACCEPT") self.match = iptc.Match(self.rule, "hashlimit") self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_hashlimit") self.table = iptc.Table(iptc.Table.FILTER) try: self.chain.flush() self.chain.delete() except: pass self.table.create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_hashlimit(self): self.match.hashlimit_name = 'foo' self.match.hashlimit_mode = 'srcip' self.match.hashlimit_upto = '200/sec' self.match.hashlimit = '200' self.match.hashlimit_htable_expire = '100' self.rule.add_match(self.match) self.chain.insert_rule(self.rule) rule = self.chain.rules[0] m = rule.matches[0] self.assertTrue(m.name, ["hashlimit"]) self.assertEqual(m.hashlimit_name, "foo") self.assertEqual(m.hashlimit_mode, "srcip") self.assertEqual(m.hashlimit_upto, "200/sec") self.assertEqual(m.hashlimit_burst, "5") class TestRecentMatch(unittest.TestCase): def setUp(self): self.table = 'filter' self.chain = 'iptc_test_recent' iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False) iptc.easy.add_chain(self.table, self.chain, ipv6=False, raise_exc=True) def tearDown(self): iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False) def test_recent(self): rule_d = { 'protocol': 'udp', 'recent': { 'mask': '255.255.255.255', 'update': '', 'seconds': '60', 'rsource': '', 'name': 'UDP-PORTSCAN', }, 'target': { 'REJECT':{ 'reject-with': 'icmp-port-unreachable' } } } iptc.easy.add_rule(self.table, self.chain, rule_d) rule2_d = iptc.easy.get_rule(self.table, self.chain, -1) # Remove counters when comparing rules rule2_d.pop('counters', None) self.assertEqual(rule_d, rule2_d) def suite(): suite_match = unittest.TestLoader().loadTestsFromTestCase(TestMatch) suite_udp = unittest.TestLoader().loadTestsFromTestCase(TestXTUdpMatch) suite_mark = unittest.TestLoader().loadTestsFromTestCase(TestXTMarkMatch) suite_limit = unittest.TestLoader().loadTestsFromTestCase(TestXTLimitMatch) suite_mport = unittest.TestLoader().loadTestsFromTestCase( TestMultiportMatch) suite_comment = unittest.TestLoader().loadTestsFromTestCase( TestCommentMatch) suite_iprange = unittest.TestLoader().loadTestsFromTestCase( TestIprangeMatch) suite_state = unittest.TestLoader().loadTestsFromTestCase(TestXTStateMatch) suite_conntrack = unittest.TestLoader().loadTestsFromTestCase( TestXTConntrackMatch) suite_hashlimit = unittest.TestLoader().loadTestsFromTestCase( TestHashlimitMatch) suite_recent = unittest.TestLoader().loadTestsFromTestCase( TestRecentMatch) extra_suites = [] if is_table6_available(iptc.Table6.FILTER): extra_suites += unittest.TestLoader().loadTestsFromTestCase( TestIcmpv6Match) return unittest.TestSuite([suite_match, suite_udp, suite_mark, suite_limit, suite_mport, suite_comment, suite_iprange, suite_state, suite_conntrack, suite_hashlimit, suite_recent] + extra_suites) def run_tests(): result = unittest.TextTestRunner(verbosity=2).run(suite()) if result.errors or result.failures: return 1 return 0 if __name__ == "__main__": unittest.main() 0707010000002B000081ED0000000000000000000000015EE6923E0000375D000000000000000000000000000000000000002C00000000python-iptables-1.0.0/tests/test_targets.py# -*- coding: utf-8 -*- import unittest import iptc from iptc.xtables import xtables_version is_table_available = iptc.is_table_available class TestTarget(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_target_create(self): rule = iptc.Rule() target = rule.create_target("MARK") self.failUnless(rule.target == target) target.set_mark = "0x123" t = iptc.Target(iptc.Rule(), "MARK") t.set_mark = "0x123" self.failUnless(t == target) def test_target_compare(self): t1 = iptc.Target(iptc.Rule(), "MARK") t1.set_mark = "0x123" t2 = iptc.Target(iptc.Rule(), "MARK") t2.set_mark = "0x123" self.failUnless(t1 == t2) t2.reset() t2.set_mark = "0x124" self.failIf(t1 == t2) def test_target_parameters(self): t = iptc.Target(iptc.Rule(), "CONNMARK") t.nfmask = "0xdeadbeef" t.ctmask = "0xfefefefe" t.save_mark = "" self.failUnless(len(t.parameters) == 3) for p in t.parameters: self.failUnless(p == "ctmask" or p == "nfmask" or p == "save_mark") self.failUnless(t.parameters["save_mark"] == "") self.failUnless(t.parameters["nfmask"] == "0xdeadbeef") self.failUnless(t.parameters["ctmask"] == "0xfefefefe") t.reset() self.failUnless(len(t.parameters) == 1) class TestXTClusteripTarget(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.dst = "127.0.0.2" self.rule.protocol = "tcp" self.rule.in_interface = "eth0" self.match = iptc.Match(self.rule, "tcp") self.rule.add_match(self.match) self.target = iptc.Target(self.rule, "CLUSTERIP") self.rule.target = self.target self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_clusterip") iptc.Table(iptc.Table.FILTER).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_mode(self): for hashmode in ["sourceip", "sourceip-sourceport", "sourceip-sourceport-destport"]: self.target.new = "" self.target.hashmode = hashmode self.assertEquals(self.target.hashmode, hashmode) self.target.reset() for hashmode in ["asdf", "1234"]: self.target.new = "" try: self.target.hashmode = hashmode except Exception: pass else: self.fail("CLUSTERIP accepted invalid value %s" % (hashmode)) self.target.reset() def test_insert(self): self.target.reset() self.target.new = "" self.target.hashmode = "sourceip" self.target.clustermac = "01:02:03:04:05:06" self.target.local_node = "1" self.target.total_nodes = "2" self.rule.target = self.target self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestIPTRedirectTarget(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.dst = "127.0.0.2" self.rule.protocol = "tcp" self.rule.in_interface = "eth0" self.match = iptc.Match(self.rule, "tcp") self.rule.add_match(self.match) self.target = iptc.Target(self.rule, "REDIRECT") self.rule.target = self.target self.chain = iptc.Chain(iptc.Table(iptc.Table.NAT), "iptc_test_redirect") iptc.Table(iptc.Table.NAT).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_mode(self): for port in ["1234", "1234-2345", "65534-65535"]: self.target.to_ports = port self.assertEquals(self.target.to_ports, port) self.target.reset() self.target.random = "" self.target.reset() for port in ["1234567", "2345-1234"]: # ipt bug: it accepts strings try: self.target.to_ports = port except Exception: pass else: self.fail("REDIRECT accepted invalid value %s" % (port)) self.target.reset() def test_insert(self): self.target.reset() self.target.to_ports = "1234-1235" self.rule.target = self.target self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestXTTosTarget(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.dst = "127.0.0.2" self.rule.protocol = "tcp" self.rule.in_interface = "eth0" self.match = iptc.Match(self.rule, "tcp") self.rule.add_match(self.match) self.target = iptc.Target(self.rule, "TOS") self.rule.target = self.target self.chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE), "iptc_test_tos") iptc.Table(iptc.Table.MANGLE).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_set_tos(self): for tos in ["0x12/0xff", "0x12/0x0f"]: self.target.set_tos = tos self.assertEquals(self.target.set_tos, tos) self.target.reset() for tos in [("Minimize-Delay", "0x10/0x3f"), ("Maximize-Throughput", "0x08/0x3f"), ("Maximize-Reliability", "0x04/0x3f"), ("Minimize-Cost", "0x02/0x3f"), ("Normal-Service", "0x00/0x3f")]: self.target.set_tos = tos[0] self.assertEquals(self.target.set_tos, tos[1]) self.target.reset() def test_tos_mode(self): for tos in ["0x04"]: self.target.and_tos = tos self.assertEquals(self.target.set_tos, "0x00/0xfb") self.target.reset() self.target.or_tos = tos self.assertEquals(self.target.set_tos, "0x04/0x04") self.target.reset() self.target.xor_tos = tos self.assertEquals(self.target.set_tos, "0x04/0x00") self.target.reset() for tos in ["0x1234", "0x12/0xfff", "asdf", "Minimize-Bullshit"]: try: self.target.and_tos = tos except Exception: pass else: self.fail("TOS accepted invalid value %s" % (tos)) self.target.reset() try: self.target.or_tos = tos except Exception: pass else: self.fail("TOS accepted invalid value %s" % (tos)) self.target.reset() try: self.target.xor_tos = tos except Exception: pass else: self.fail("TOS accepted invalid value %s" % (tos)) self.target.reset() def test_insert(self): self.target.reset() self.target.set_tos = "0x12/0xff" self.rule.target = self.target self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestDnatTarget(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.dst = "127.0.0.2" self.rule.protocol = "tcp" self.rule.in_interface = "eth0" self.match = iptc.Match(self.rule, "tcp") self.rule.add_match(self.match) self.target = iptc.Target(self.rule, "DNAT") self.rule.target = self.target self.chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE), "iptc_test_dnat") iptc.Table(iptc.Table.MANGLE).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_mode(self): for dst in ["1.2.3.4", "199.199.199.199-199.199.199.255", "1.2.3.4:5678", "1.2.3.4:5678-5688"]: self.target.to_destination = dst self.assertEquals(self.target.to_destination, dst) self.target.reset() self.target.to_destination = dst self.target.random = "1" self.assertEquals(self.target.to_destination, dst) self.target.reset() self.target.to_destination = dst self.target.persistent = "1" self.assertEquals(self.target.to_destination, dst) self.target.reset() def test_insert(self): self.target.reset() self.target.to_destination = "1.2.3.4" self.rule.target = self.target self.chain.insert_rule(self.rule) for r in self.chain.rules: if r != self.rule: self.fail("inserted rule does not match original") class TestIPTMasqueradeTarget(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.dst = "127.0.0.2" self.rule.protocol = "tcp" self.rule.out_interface = "eth0" self.target = iptc.Target(self.rule, "MASQUERADE") self.rule.target = self.target self.chain = iptc.Chain(iptc.Table(iptc.Table.NAT), "iptc_test_masquerade") iptc.Table(iptc.Table.NAT).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_mode(self): for port in ["1234", "1234-2345"]: self.target.to_ports = port self.assertEquals(self.target.to_ports, port) self.target.reset() self.target.random = "" self.target.reset() for port in ["123456", "1234-1233", "asdf"]: try: self.target.to_ports = port except Exception: pass else: self.fail("MASQUERADE accepted invalid value %s" % (port)) self.target.reset() def test_insert(self): self.target.reset() self.target.to_ports = "1234" self.rule.target = self.target self.chain.insert_rule(self.rule) found = False for r in self.chain.rules: if r == self.rule: found = True break if not found: self.fail("inserted rule does not match original") class TestXTNotrackTarget(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.dst = "127.0.0.2" self.rule.protocol = "tcp" self.rule.out_interface = "eth0" self.target = iptc.Target(self.rule, "NOTRACK") self.rule.target = self.target self.chain = iptc.Chain(iptc.Table(iptc.Table.RAW), "iptc_test_notrack") try: self.chain.flush() self.chain.delete() except: pass iptc.Table(iptc.Table.RAW).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_notrack(self): self.chain.insert_rule(self.rule) t = self.chain.rules[0].target self.assertTrue(t.name in ["NOTRACK", "CT"]) class TestXTCtTarget(unittest.TestCase): def setUp(self): self.rule = iptc.Rule() self.rule.dst = "127.0.0.2" self.rule.protocol = "tcp" self.rule.out_interface = "eth0" self.target = iptc.Target(self.rule, "CT") self.target.notrack = "true" self.rule.target = self.target self.chain = iptc.Chain(iptc.Table(iptc.Table.RAW), "iptc_test_ct") try: self.chain.flush() self.chain.delete() except: pass iptc.Table(iptc.Table.RAW).create_chain(self.chain) def tearDown(self): for r in self.chain.rules: self.chain.delete_rule(r) self.chain.flush() self.chain.delete() def test_ct(self): self.chain.insert_rule(self.rule) t = self.chain.rules[0].target self.assertEquals(t.name, "CT") self.assertTrue(t.notrack is not None) def suite(): suites = [] suite_target = unittest.TestLoader().loadTestsFromTestCase(TestTarget) suite_tos = unittest.TestLoader().loadTestsFromTestCase(TestXTTosTarget) suite_cluster = unittest.TestLoader().loadTestsFromTestCase( TestXTClusteripTarget) suite_redir = unittest.TestLoader().loadTestsFromTestCase( TestIPTRedirectTarget) suite_masq = unittest.TestLoader().loadTestsFromTestCase( TestIPTMasqueradeTarget) suite_dnat = unittest.TestLoader().loadTestsFromTestCase( TestDnatTarget) suite_notrack = unittest.TestLoader().loadTestsFromTestCase( TestXTNotrackTarget) suite_ct = unittest.TestLoader().loadTestsFromTestCase(TestXTCtTarget) suites.extend([suite_target, suite_cluster, suite_tos]) if is_table_available(iptc.Table.NAT): suites.extend([suite_redir, suite_masq, suite_dnat]) if is_table_available(iptc.Table.RAW) and xtables_version >= 10: suites.extend([suite_notrack, suite_ct]) return unittest.TestSuite(suites) def run_tests(): result = unittest.TextTestRunner(verbosity=2).run(suite()) if result.errors or result.failures: return 1 return 0 if __name__ == "__main__": unittest.main() 07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!611 blocks
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor