Issue #83 and bug fixes

This commit is contained in:
secynic 2015-12-14 18:39:27 -06:00
parent e3690cd8bf
commit fd4826e10c
22 changed files with 656 additions and 174 deletions

View File

@ -1,6 +1,7 @@
# Build file
language: python
python:
- 3.5
- 3.4
- 3.3
- 2.7

View File

@ -1,6 +1,63 @@
Changelog (Archive)
===================
0.9.1 (2014-10-14)
------------------
- Added ignore_referral_errors parameter to lookup().
- Fixed ipaddress import conflicts with alternate ipaddress module.
- Tuned import exception in ipwhois.utils.
- Fixed retry handling in get_whois().
- Fixed CIDR regex parsing bug where some nets were excluded from the results.
0.9.0 (2014-07-27)
------------------
- Fixed order on REST email fields
- Fixed setup error for initial install when dependencies don't exist.
- Added RWhois support.
- Added server and port parameters to IPWhois.get_whois().
- Added unique_addresses() to ipwhois.utils and unit tests.
- Added some unit tests to test_lookup().
- Replaced dict.copy() with copy.deepcopy(dict).
- Fixed bug in abuse emails parsing.
- Added handle and range values to returned nets dictionary.
0.8.2 (2014-05-12)
------------------
- Fixed multi-line field parsing (Issue #36).
- Added unique_everseen() to ipwhois.utils to fix multi-line field order.
- Re-added support for RIPE RWS now that their API is fixed.
0.8.1 (2014-03-05)
------------------
- Fixed encoding error in IPWhois.get_whois().
0.8.0 (2014-02-18)
------------------
- Added ASNRegistryError to handle unknown ASN registry return values.
- Added ASN registry lookup third tier fallback to ARIN.
- Fixed variable naming to avoid shadows built-in confusion.
- Fixed some type errors: Expected type 'str', got 'dict[str, dict]' instead.
- Fixed RIPE RWS links, since they changed their API.
- Temporarily removed RIPE RWS functionality until they fix their API.
- Removed RADB fallback, since RIPE removed it.
0.7.0 (2014-01-14)
------------------
- Added Python 2.6+ support.
- The country field in net dicts is now forced uppercase.
0.6.0 (2014-01-13)
------------------
- Added APNIC RWS support for IPWhois.lookup_rws().
- Fixed issue in IPWhois.lookup_rws() for radb-grs fallback.
0.5.2 (2013-12-07)
------------------

View File

@ -1,10 +1,25 @@
Changelog
=========
0.11.1 (TBD)
------------
0.11.1 (2015-12-21)
-------------------
- Added CIDR parsing for RDAP lookups.
- Re-added CIDR parsing for RDAP lookups.
- Improved tests - core code coverage now 100%. See '# pragma: no cover' for
exclusions. A few bugs were identified in the process, detailed below.
- Moved IP zero stripping from rdap._RDAPNetwork.parse() to new helper function
utils.ipv4_lstrip_zeros().
- Moved CIDR calculation from rdap._RDAPNetwork.parse() to new helper function
utils.calculate_cidr().
- Fixed utils.ipv6_is_defined() if statement ordering for Unspecified and
Loopback (conflict with Reserved).
- Added is_offline parameter to whois.Whois.lookup() primarily for testing.
- Fixed bug in whois.Whois._parse_fields() that attempted to parse 'val2' of
regex, which is no longer used. Also fixed the expected Exception to be
IndexError.
- Fixed bug in ipwhois.IPWhois.lookup() where the argument order was mixed up,
causing referral lookups to be skipped when get_referral=True.
- Fixed bug in rdap._RDAPCommon.summarize_notices() output for links.
0.11.0 (2015-11-02)
-------------------
@ -53,60 +68,3 @@ Changelog
- Fixed file open encoding bug. Moved from open to io.open.
- Fixed parameter in IPWhois ip defined checks.
- Fixed TestIPWhois.test_ip_invalid() assertions.
0.9.1 (2014-10-14)
------------------
- Added ignore_referral_errors parameter to lookup().
- Fixed ipaddress import conflicts with alternate ipaddress module.
- Tuned import exception in ipwhois.utils.
- Fixed retry handling in get_whois().
- Fixed CIDR regex parsing bug where some nets were excluded from the results.
0.9.0 (2014-07-27)
------------------
- Fixed order on REST email fields
- Fixed setup error for initial install when dependencies don't exist.
- Added RWhois support.
- Added server and port parameters to IPWhois.get_whois().
- Added unique_addresses() to ipwhois.utils and unit tests.
- Added some unit tests to test_lookup().
- Replaced dict.copy() with copy.deepcopy(dict).
- Fixed bug in abuse emails parsing.
- Added handle and range values to returned nets dictionary.
0.8.2 (2014-05-12)
------------------
- Fixed multi-line field parsing (Issue #36).
- Added unique_everseen() to ipwhois.utils to fix multi-line field order.
- Re-added support for RIPE RWS now that their API is fixed.
0.8.1 (2014-03-05)
------------------
- Fixed encoding error in IPWhois.get_whois().
0.8.0 (2014-02-18)
------------------
- Added ASNRegistryError to handle unknown ASN registry return values.
- Added ASN registry lookup third tier fallback to ARIN.
- Fixed variable naming to avoid shadows built-in confusion.
- Fixed some type errors: Expected type 'str', got 'dict[str, dict]' instead.
- Fixed RIPE RWS links, since they changed their API.
- Temporarily removed RIPE RWS functionality until they fix their API.
- Removed RADB fallback, since RIPE removed it.
0.7.0 (2014-01-14)
------------------
- Added Python 2.6+ support.
- The country field in net dicts is now forced uppercase.
0.6.0 (2014-01-13)
------------------
- Added APNIC RWS support for IPWhois.lookup_rws().
- Fixed issue in IPWhois.lookup_rws() for radb-grs fallback.

View File

@ -5,7 +5,7 @@ ipwhois
ipwhois is a Python package focused on retrieving and parsing whois data
for IPv4 and IPv6 addresses.
RDAP is now the recommended query method. Please see the
RDAP is the recommended query method as of v0.11.0. Please see the
`upgrade info <#upgrading-from-0-10-to-0-11>`_.
Features
@ -20,6 +20,7 @@ Features
* Python 2.6+ and 3.3+ supported
* Useful set of utilities
* BSD license
* 100% core code coverage (See '# pragma: no cover' for exclusions)
Links
=====

View File

@ -22,7 +22,7 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__version__ = '0.11.0'
__version__ = '0.11.1'
from .exceptions import *
from .net import Net

View File

@ -1,8 +1,3 @@
.. ipwhois documentation master file, created by
sphinx-quickstart on Mon Jul 28 19:55:48 2014.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
=======
ipwhois
=======

View File

@ -18,3 +18,6 @@ Library Structure
.. automodule:: ipwhois.utils
:members:
.. automodule:: ipwhois.exceptions
:members:

View File

@ -118,8 +118,8 @@ class IPWhois:
whois = Whois(self.net)
log.debug('WHOIS lookup for {0}'.format(self.address_str))
whois_data = whois.lookup(
inc_raw, retry_count, get_referral, extra_blacklist,
ignore_referral_errors, response, asn_data, field_list
inc_raw, retry_count, response, get_referral, extra_blacklist,
ignore_referral_errors, asn_data, field_list
)
# Add the RDAP information to the return dictionary.

View File

@ -38,14 +38,14 @@ from .exceptions import (IPDefinedError, ASNRegistryError, ASNLookupError,
from .whois import RIR_WHOIS
from .utils import ipv4_is_defined, ipv6_is_defined
if sys.version_info >= (3, 3):
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
IPv4Address,
IPv6Address,
ip_network,
summarize_address_range,
collapse_addresses)
else:
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPv4Address,
IPv6Address,
@ -53,12 +53,12 @@ else:
summarize_address_range,
collapse_address_list as collapse_addresses)
try:
try: # pragma: no cover
from urllib.request import (OpenerDirector,
ProxyHandler,
build_opener,
Request)
except ImportError:
except ImportError: # pragma: no cover
from urllib2 import (OpenerDirector,
ProxyHandler,
build_opener,
@ -318,7 +318,7 @@ class Net:
return ret
except (socket.timeout, socket.error) as e:
except (socket.timeout, socket.error) as e: # pragma: no cover
log.debug('ASN query socket error: {0}'.format(e))
if retry_count > 0:
@ -372,7 +372,7 @@ class Net:
extra_bl = extra_blacklist if extra_blacklist else []
if server in (BLACKLIST, extra_bl):
if any(server in srv for srv in (BLACKLIST, extra_bl)):
raise BlacklistError(
'The server %r is blacklisted.' % server
)
@ -409,14 +409,15 @@ class Net:
conn.close()
if 'Query rate limit exceeded' in response:
if 'Query rate limit exceeded' in response: # pragma: no cover
log.debug('WHOIS query rate limit exceeded. Waiting...')
sleep(1)
return self.get_whois(asn_registry, retry_count, server, port,
extra_blacklist)
elif 'error 501' in response or 'error 230' in response:
elif ('error 501' in response or 'error 230' in response
): # pragma: no cover
log.debug('WHOIS query error: {0}'.format(response))
raise ValueError
@ -439,7 +440,11 @@ class Net:
'WHOIS lookup failed for %r.' % self.address_str
)
except:
except BlacklistError:
raise
except: # pragma: no cover
raise WhoisLookupError(
'WHOIS lookup failed for %r.' % self.address_str
@ -470,7 +475,7 @@ class Net:
data = self.opener.open(conn, timeout=self.timeout)
try:
d = json.loads(data.readall().decode())
except AttributeError:
except AttributeError: # pragma: no cover
d = json.loads(data.read().decode('ascii', 'ignore'))
return d
@ -489,7 +494,7 @@ class Net:
raise HTTPLookupError('HTTP lookup failed for %r.' % url)
except:
except: # pragma: no cover
raise HTTPLookupError('HTTP lookup failed for %r.' % url)
@ -519,7 +524,7 @@ class Net:
log.debug('Host query for {0}'.format(self.address_str))
ret = socket.gethostbyaddr(self.address_str)
if default_timeout_set:
if default_timeout_set: # pragma: no cover
socket.setdefaulttimeout(None)
@ -541,7 +546,7 @@ class Net:
'Host lookup failed for %r.' % self.address_str
)
except:
except: # pragma: no cover
raise HostLookupError(
'Host lookup failed for %r.' % self.address_str
@ -565,6 +570,7 @@ class Net:
Raises:
ASNRegistryError: ASN registry does not match.
HTTPLookupError: The HTTP lookup failed.
"""
# Initialize the response.
@ -582,7 +588,7 @@ class Net:
log.debug('ASN DNS lookup failed, trying ASN WHOIS')
asn_data = self.get_asn_whois(retry_count)
except (ASNLookupError, ASNRegistryError):
except (ASNLookupError, ASNRegistryError): # pragma: no cover
# Lets attempt to get the ASN registry information from ARIN.
log.debug('ASN WHOIS lookup failed, trying ASN via HTTP')

View File

@ -24,9 +24,8 @@
from . import (Net, NetError, InvalidEntityContactObject, InvalidNetworkObject,
InvalidEntityObject, HTTPLookupError)
from .utils import unique_everseen
from .net import (ip_address, ip_network, summarize_address_range,
collapse_addresses)
from .utils import ipv4_lstrip_zeros, calculate_cidr, unique_everseen
from .net import ip_address
import logging
log = logging.getLogger(__name__)
@ -163,7 +162,7 @@ class _RDAPContact:
ret['type'] = val[1]['type']
except (KeyError, ValueError, TypeError):
except (IndexError, KeyError, ValueError, TypeError):
pass
@ -324,19 +323,22 @@ class _RDAPCommon:
for notices_dict in notices_json:
ret.append({
tmp = {
'title': notices_dict['title'],
'description': '\n'.join(notices_dict['description'])
})
'description': '\n'.join(notices_dict['description']),
'links': None
}
try:
ret['links'] = self.summarize_links(notices_dict['links'])
tmp['links'] = self.summarize_links(notices_dict['links'])
except (KeyError, ValueError, TypeError):
pass
ret.append(tmp)
return ret
def summarize_events(self, events_json):
@ -468,22 +470,13 @@ class _RDAPNetwork(_RDAPCommon):
# the leading 0's.
if self.vars['ip_version'] == 'v4':
obj = self.json['startAddress'].strip().split('.')
for x, y in enumerate(obj):
obj[x] = y.split('/')[0].lstrip('0')
if obj[x] in ['', None]:
obj[x] = '0'
self.vars['start_address'] = ip_address('.'.join(obj)
self.vars['start_address'] = ip_address(
ipv4_lstrip_zeros(self.json['startAddress'])
).__str__()
obj = self.json['endAddress'].strip().split('.')
for x, y in enumerate(obj):
obj[x] = y.split('/')[0].lstrip('0')
if obj[x] in ['', None]:
obj[x] = '0'
self.vars['end_address'] = ip_address('.'.join(obj)).__str__()
self.vars['end_address'] = ip_address(
ipv4_lstrip_zeros(self.json['endAddress'])
).__str__()
# No bugs found for IPv6 yet, proceed as normal.
else:
@ -500,31 +493,12 @@ class _RDAPNetwork(_RDAPCommon):
try:
tmp_addrs = []
self.vars['cidr'] = ', '.join(calculate_cidr(
self.vars['start_address'], self.vars['end_address']
))
try:
tmp_addrs.extend(summarize_address_range(
ip_address(self.vars['start_address']),
ip_address(self.vars['end_address'])))
except (KeyError, ValueError, TypeError):
tmp_addrs.extend(summarize_address_range(
ip_network(self.vars['start_address']).network_address,
ip_network(self.vars['end_address']).network_address))
except AttributeError:
tmp_addrs.extend(summarize_address_range(
ip_network(self.vars['start_address']).ip,
ip_network(self.vars['end_address']).ip))
self.vars['cidr'] = ', '.join(
[i.__str__() for i in collapse_addresses(tmp_addrs)]
)
except (KeyError, ValueError, TypeError, AttributeError) as e:
except (KeyError, ValueError, TypeError, AttributeError) as \
e: # pragma: no cover
log.debug('CIDR calculation failed: {0}'.format(e))
pass

View File

@ -59,7 +59,10 @@ class TestIPWhois(TestCommon):
result = IPWhois(ip)
try:
self.assertIsInstance(result.lookup(get_referral=True), dict)
self.assertIsInstance(result.lookup(
get_referral=True,
ignore_referral_errors=True,
inc_raw=True), dict)
except (ASNLookupError, ASNRegistryError, WhoisLookupError):
pass
except AssertionError as e:
@ -67,6 +70,41 @@ class TestIPWhois(TestCommon):
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
for ip in rwhois_ips:
result = IPWhois(ip)
try:
self.assertIsInstance(result.lookup(
get_referral=True,
ignore_referral_errors=True,
inc_raw=True,
extra_blacklist=['rwhois.cogentco.com']), dict)
except (ASNLookupError, ASNRegistryError, WhoisLookupError):
pass
except AssertionError as e:
raise e
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
break
for ip in rwhois_ips:
result = IPWhois(ip)
try:
self.assertIsInstance(result.lookup(
get_referral=True,
ignore_referral_errors=False,
inc_raw=False), dict)
except (ASNLookupError, ASNRegistryError, WhoisLookupError):
pass
except AssertionError as e:
raise e
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
break
def test_lookup_rdap(self):
try:
from urllib.request import ProxyHandler, build_opener

View File

@ -1,6 +1,6 @@
import unittest
import logging
from ipwhois import (Net, ASNLookupError, ASNRegistryError,
from ipwhois import (Net, ASNLookupError, ASNRegistryError, BlacklistError,
WhoisLookupError, HTTPLookupError, HostLookupError)
LOG_FORMAT = ('[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)s] '
@ -43,6 +43,9 @@ class TestNet(TestCommon):
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
result = Net('74.125.225.229')
self.assertRaises(ASNLookupError, result.get_asn_whois, 3, 'a')
def test_get_whois(self):
result = Net('74.125.225.229')
try:
@ -54,6 +57,16 @@ class TestNet(TestCommon):
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
self.assertRaises(WhoisLookupError, result.get_whois, **dict(
retry_count=0, server='arin.net'))
self.assertRaises(BlacklistError, result.get_whois, **dict(
server='whois.arin.net', extra_blacklist=['whois.arin.net']))
result = Net('74.125.225.229', 0)
self.assertRaises(WhoisLookupError, result.get_whois, **dict(
retry_count=1))
def test_get_http_json(self):
from ipwhois.rdap import RIR_RDAP
result = Net('74.125.225.229')
@ -67,13 +80,41 @@ class TestNet(TestCommon):
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
self.assertRaises(HTTPLookupError, result.get_http_json, **dict(
url='http://255.255.255.255', retry_count=0))
result = Net('74.125.225.229', 0)
self.assertRaises(HTTPLookupError, result.get_http_json, **dict(
retry_count=1))
def test_get_host(self):
result = Net('74.125.225.229')
ips = [
'74.125.225.229', # ARIN
'2001:4860:4860::8888'
]
for ip in ips:
result = Net(ip)
try:
self.assertIsInstance(result.get_host(), tuple)
self.assertIsInstance(result.get_host(0), tuple)
except HostLookupError:
pass
except AssertionError as e:
raise e
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
result = Net('74.125.225.229', 0)
self.assertRaises(HostLookupError, result.get_host, **dict(
retry_count=1))
def test_lookup_asn(self):
result = Net('74.125.225.229')
try:
self.assertIsInstance(result.lookup_asn(), tuple)
except (HTTPLookupError, ASNRegistryError):
pass
except AssertionError as e:
raise e
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)

View File

@ -54,3 +54,28 @@ class TestRDAP(TestCommon):
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
for key, val in data.items():
log.debug('Testing bootstrap and raw: {0}'.format(key))
net = Net(key)
obj = RDAP(net)
try:
self.assertIsInstance(obj.lookup(asn_data=val['asn_data'],
depth=3,
bootstrap=True,
inc_raw=True), dict)
except HTTPLookupError:
pass
except AssertionError as e:
raise e
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)

View File

@ -0,0 +1,20 @@
import unittest
import logging
from ipwhois import IPWhois
LOG_FORMAT = ('[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)s] '
'[%(funcName)s()] %(message)s')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
log = logging.getLogger(__name__)
class TestIPWhois(unittest.TestCase):
def test_repr(self):
# basic str test
log.debug('Basic str test: {0}'.format('74.125.225.229'))
obj = IPWhois('74.125.225.229')
self.assertIsInstance(repr(obj), str)
# add more specific tests

View File

@ -28,11 +28,9 @@ class TestNet(TestCommon):
def test_ip_defined(self):
if sys.version_info >= (3, 3):
from ipaddress import (IPv4Address,
IPv6Address)
from ipaddress import (IPv4Address, IPv6Address)
else:
from ipaddr import (IPv4Address,
IPv6Address)
from ipaddr import (IPv4Address, IPv6Address)
self.assertRaises(IPDefinedError, Net, '192.168.0.1')
self.assertRaises(IPDefinedError, Net, 'fe80::')
@ -51,13 +49,22 @@ class TestNet(TestCommon):
def test_proxy_opener(self):
try:
from urllib.request import OpenerDirector
from urllib.request import (OpenerDirector,
ProxyHandler,
build_opener)
except ImportError:
from urllib2 import OpenerDirector
from urllib2 import (OpenerDirector,
ProxyHandler,
build_opener)
result = Net('74.125.225.229')
self.assertIsInstance(result.opener, OpenerDirector)
handler = ProxyHandler()
opener = build_opener(handler)
result = Net(address='74.125.225.229', proxy_opener=opener)
self.assertIsInstance(result.opener, OpenerDirector)
def test_get_asn_dns(self):
data = ['"15169 ', ' 74.125.225.0/24 ', ' US ', ' arin ',
' 2007-03-13"']
@ -69,6 +76,11 @@ class TestNet(TestCommon):
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
data = ['"15169 ', ' 74.125.225.0/24 ', ' US ', ' random ',
' 2007-03-13"']
result = Net('74.125.225.229')
self.assertRaises(ASNRegistryError, result.get_asn_dns, data)
def test_get_asn_whois(self):
data = ('15169 | 74.125.225.229 | 74.125.225.0/24 | US | arin'
' | 2007-03-13')
@ -79,3 +91,8 @@ class TestNet(TestCommon):
raise e
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
data = ('15169 | 74.125.225.229 | 74.125.225.0/24 | US | rdm'
' | 2007-03-13')
result = Net('74.125.225.229')
self.assertRaises(ASNRegistryError, result.get_asn_whois, 3, data)

View File

@ -3,8 +3,9 @@ import json
import io
from os import path
import logging
from ipwhois.rdap import (RDAP, _RDAPEntity, InvalidEntityObject,
InvalidEntityContactObject, Net)
from ipwhois.rdap import (RDAP, _RDAPEntity, _RDAPContact, _RDAPNetwork, Net,
InvalidEntityObject, InvalidEntityContactObject,
InvalidNetworkObject, NetError)
LOG_FORMAT = ('[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)s] '
'[%(funcName)s()] %(message)s')
@ -52,6 +53,77 @@ class TestRDAP(TestCommon):
self.fail('Unexpected exception raised: %r' % e)
self.assertRaises(NetError, RDAP, 'a')
for key, val in data.items():
log.debug('Testing bootstrap and raw: {0}'.format(key))
net = Net(key)
obj = RDAP(net)
try:
self.assertIsInstance(obj.lookup(response=val['response'],
asn_data=val['asn_data'],
depth=0,
bootstrap=True,
inc_raw=True), dict)
except AssertionError as e:
raise e
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
break
class TestRDAPContact(TestCommon):
def test__RDAPContact(self):
self.assertRaises(InvalidEntityContactObject, _RDAPContact, 'a')
data_dir = path.dirname(__file__)
with io.open(str(data_dir) + '/entity.json', 'r') as data_file:
data = json.load(data_file)
contact = _RDAPContact(data['vcardArray'][1])
contact.parse()
self.assertRaises(IndexError, contact._parse_phone, [])
self.assertRaises(IndexError, contact._parse_role, [])
self.assertRaises(IndexError, contact._parse_title, [])
class TestRDAPNetwork(TestCommon):
def test__RDAPNetwork(self):
self.assertRaises(InvalidNetworkObject, _RDAPNetwork, 'a')
data_dir = path.dirname(__file__)
with io.open(str(data_dir) + '/rdap.json', 'r') as data_file:
data = json.load(data_file)
for key, val in data.items():
network = _RDAPNetwork(val['response'])
network.parse()
tmp = val['response']
del tmp['startAddress']
network = _RDAPNetwork(tmp)
self.assertRaises(InvalidNetworkObject, network.parse)
network = _RDAPNetwork({})
self.assertRaises(InvalidNetworkObject, network.parse)
break
class TestRDAPEntity(TestCommon):
@ -69,3 +141,8 @@ class TestRDAPEntity(TestCommon):
ent = _RDAPEntity(data)
ent.parse()
tmp = data
del tmp['vcardArray']
ent = _RDAPEntity(tmp)
ent.parse()

View File

@ -1,9 +1,13 @@
import unittest
import sys
from os import path
import logging
from ipwhois.utils import (get_countries,
from ipwhois.utils import (ipv4_lstrip_zeros,
calculate_cidr,
get_countries,
ipv4_is_defined,
ipv6_is_defined,
unique_everseen,
unique_addresses)
LOG_FORMAT = ('[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)s] '
@ -21,6 +25,25 @@ class TestFunctions(unittest.TestCase):
'%s is not an instance of %r' % (repr(obj), cls)
))
def test_ipv4_lstrip_zeros(self):
if sys.version_info >= (3, 3):
from ipaddress import ip_address
else:
from ipaddr import IPAddress as ip_address
self.assertIsInstance(ipv4_lstrip_zeros('074.125.000.000'), str)
tmp = ip_address(ipv4_lstrip_zeros('074.125.000.000')).__str__()
def test_calculate_cidr(self):
start_addr = '74.125.0.0'
end_addr = '74.125.255.255'
self.assertIsInstance(calculate_cidr(start_addr, end_addr), list)
start_addr_6 = '2001:240::'
end_addr_6 = '2001:240:ffff:ffff:ffff:ffff:ffff:ffff'
self.assertIsInstance(calculate_cidr(start_addr_6, end_addr_6), list)
def test_get_countries(self):
# Legacy
@ -41,9 +64,35 @@ class TestFunctions(unittest.TestCase):
self.assertRaises(ValueError, ipv4_is_defined, '192.168.0.256')
self.assertRaises(AddressValueError, ipv4_is_defined, 1234)
self.assertEquals(ipv4_is_defined('74.125.225.229'), (False, '', ''))
self.assertEquals(ipv4_is_defined('0.0.0.0'),
(True, 'This Network', 'RFC 1122, Section 3.2.1.3'))
self.assertEquals(ipv4_is_defined('192.168.0.1'),
(True, 'Private-Use Networks', 'RFC 1918'))
self.assertEquals(ipv4_is_defined('74.125.225.229'), (False, '', ''))
self.assertEquals(ipv4_is_defined('127.0.0.0'),
(True, 'Loopback', 'RFC 1122, Section 3.2.1.3'))
self.assertEquals(ipv4_is_defined('169.254.0.0'),
(True, 'Link Local', 'RFC 3927'))
self.assertEquals(ipv4_is_defined('192.0.0.0'),
(True, 'IETF Protocol Assignments', 'RFC 5736'))
self.assertEquals(ipv4_is_defined('192.0.2.0'),
(True, 'TEST-NET-1', 'RFC 5737'))
self.assertEquals(ipv4_is_defined('192.88.99.0'),
(True, '6to4 Relay Anycast', 'RFC 3068'))
self.assertEquals(ipv4_is_defined('198.18.0.0'),
(True,
'Network Interconnect Device Benchmark Testing',
'RFC 2544'))
self.assertEquals(ipv4_is_defined('198.51.100.0'),
(True, 'TEST-NET-2', 'RFC 5737'))
self.assertEquals(ipv4_is_defined('203.0.113.0'),
(True, 'TEST-NET-3', 'RFC 5737'))
self.assertEquals(ipv4_is_defined('224.0.0.0'),
(True, 'Multicast', 'RFC 3171'))
self.assertEquals(ipv4_is_defined('255.255.255.255'),
(True, 'Limited Broadcast', 'RFC 919, Section 7'))
def test_ipv6_is_defined(self):
if sys.version_info >= (3, 3):
@ -54,12 +103,38 @@ class TestFunctions(unittest.TestCase):
self.assertRaises(ValueError, ipv6_is_defined,
'2001:4860:4860::8888::1234')
self.assertRaises(AddressValueError, ipv6_is_defined, 1234)
self.assertEquals(ipv6_is_defined('fe80::'),
(True, 'Link-Local', 'RFC 4291, Section 2.5.6'))
self.assertEquals(ipv6_is_defined('2001:4860:4860::8888'),
(False, '', ''))
self.assertEquals(ipv6_is_defined('ff00::'),
(True, 'Multicast', 'RFC 4291, Section 2.7'))
self.assertEquals(ipv6_is_defined('0:0:0:0:0:0:0:0'),
(True, 'Unspecified', 'RFC 4291, Section 2.5.2'))
self.assertEquals(ipv6_is_defined('0:0:0:0:0:0:0:1'),
(True, 'Loopback', 'RFC 4291, Section 2.5.3'))
self.assertEquals(ipv6_is_defined('100::'),
(True, 'Reserved', 'RFC 4291'))
self.assertEquals(ipv6_is_defined('fe80::'),
(True, 'Link-Local', 'RFC 4291, Section 2.5.6'))
self.assertEquals(ipv6_is_defined('fec0::'),
(True, 'Site-Local', 'RFC 4291, Section 2.5.7'))
self.assertEquals(ipv6_is_defined('fc00::'),
(True, 'Unique Local Unicast', 'RFC 4193'))
def test_unique_everseen(self):
input_list = ['b', 'a', 'c', 'a', 'b', 'x', 'a']
self.assertEquals(list(unique_everseen(input_list)),
['b', 'a', 'c', 'x'])
self.assertEquals(list(unique_everseen(input_list, str.lower)),
['b', 'a', 'c', 'x'])
def test_unique_addresses(self):
self.assertRaises(ValueError, unique_addresses)
input_data = (
'You can have IPs like 74.125.225.229, or 2001:4860:4860::8888'
'Put a port on the end 74.125.225.229:80 or for IPv6: '
@ -75,3 +150,52 @@ class TestFunctions(unittest.TestCase):
}
self.assertEquals(unique_addresses(input_data), expected_result)
data_dir = path.dirname(__file__)
fp = str(data_dir) + '/rdap.json'
fp_expected_result = {
'74.125.225.0/24': {'count': 1, 'ports': {}},
'62.239.0.0/16': {'count': 1, 'ports': {}},
'2001:43f8:7b0:ffff:ffff:ffff:ffff:ffff':
{'count': 1, 'ports': {}},
'210.0.0.0': {'count': 1, 'ports': {}},
'196.11.240.0/23': {'count': 1, 'ports': {}},
'2001:240:10c:1::ca20:9d1d': {'count': 2, 'ports': {}},
'196.11.240.215': {'count': 2, 'ports': {}},
'62.239.237.0/32': {'count': 1, 'ports': {}},
'210.107.0.0/17': {'count': 6, 'ports': {}},
'2001:4860::/32': {'count': 1, 'ports': {}},
'210.107.73.73': {'count': 2, 'ports': {}},
'210.107.0.0': {'count': 2, 'ports': {}},
'2001:200::/23': {'count': 2, 'ports': {}},
'2001:240:ffff:ffff:ffff:ffff:ffff:ffff':
{'count': 1, 'ports': {}},
'210.255.255.255': {'count': 1, 'ports': {}},
'2001:43f8:7b0::': {'count': 3, 'ports': {}},
'196.255.255.255': {'count': 1, 'ports': {}},
'2001:240::/32': {'count': 6, 'ports': {}},
'196.0.0.0': {'count': 1, 'ports': {}},
'2001:240::': {'count': 1, 'ports': {}},
'196.11.246.255': {'count': 2, 'ports': {}},
'196.11.239.0': {'count': 2, 'ports': {}},
'2001:4200::/23': {'count': 1, 'ports': {}},
'2a00:2380::/25': {'count': 1, 'ports': {}},
'200.57.128.0/20': {'count': 1, 'ports': {}},
'62.239.237.255': {'count': 1, 'ports': {}},
'2001:4860:4860::8888': {'count': 10, 'ports': {}},
'2001:4860::': {'count': 2, 'ports': {}},
'2001:4860:ffff:ffff:ffff:ffff:ffff:ffff':
{'count': 1, 'ports': {}},
'74.125.225.229': {'count': 8, 'ports': {}},
'210.107.127.255': {'count': 2, 'ports': {}},
'200.57.141.161': {'count': 7, 'ports': {}},
'62.239.237.255/32': {'count': 1, 'ports': {}},
'2801:10:c000::': {'count': 7, 'ports': {}},
'2a00:2381:ffff::1': {'count': 4, 'ports': {}},
'62.239.237.0': {'count': 1, 'ports': {}},
'62.239.237.1': {'count': 4, 'ports': {}},
'210.0.0.0/8': {'count': 1, 'ports': {}}
}
self.assertEquals(unique_addresses(file_path=fp), fp_expected_result)

View File

@ -4,7 +4,7 @@ import io
from os import path
import logging
from ipwhois.net import Net
from ipwhois.whois import Whois
from ipwhois.whois import (Whois, RIR_WHOIS, NetError)
LOG_FORMAT = ('[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)s] '
'[%(funcName)s()] %(message)s')
@ -41,7 +41,9 @@ class TestWhois(TestCommon):
try:
self.assertIsInstance(obj.lookup(response=val['response'],
asn_data=val['asn_data']),
asn_data=val['asn_data'],
is_offline=True,
inc_raw=True),
dict)
except AssertionError as e:
@ -51,3 +53,76 @@ class TestWhois(TestCommon):
except Exception as e:
self.fail('Unexpected exception raised: %r' % e)
self.assertRaises(NetError, Whois, 'a')
def test__parse_fields(self):
net = Net('74.125.225.229')
obj = Whois(net)
# No exception raised, but should provide code coverage for if regex
# groups are messed up.
tmp_dict = RIR_WHOIS['arin']['fields']
tmp_dict['name'] = r'(NetName):[^\S\n]+(?P<val1>.+?)\n'
obj._parse_fields(
response="\nNetName: TEST\n",
fields_dict=tmp_dict,
dt_format=RIR_WHOIS['arin']['dt_format']
)
obj._parse_fields(
response="\nUpdated: 2012-02-24\n",
fields_dict=RIR_WHOIS['arin']['fields'],
dt_format=RIR_WHOIS['arin']['dt_format']
)
log.debug('Testing field parse error. This should be followed by a '
'debug log.')
obj._parse_fields(
response="\nUpdated: 2012-02-244\n",
fields_dict=RIR_WHOIS['arin']['fields'],
dt_format=RIR_WHOIS['arin']['dt_format']
)
def test__get_nets_arin(self):
net = Net('74.125.225.229')
obj = Whois(net)
# No exception raised, but should provide code coverage for multiple
# network scenarios and CIDR invalid IP ValueError.
multi_net_response = (
"\n#\n\nNetRange: 74.125.0.0 - 74.125.255.255"
"\nCIDR: 74.125.0.0/16\nNetName: TEST"
"\nCIDR: 74.125.1.256/24\nNetName: TEST2"
"\nNetRange: 74.125.1.0 - 74.125.1.0"
"\n"
)
obj._get_nets_arin(multi_net_response)
def test__get_nets_lacnic(self):
net = Net('200.57.141.161')
obj = Whois(net)
# No exception raised, but should provide code coverage for inetnum
# invalid IP ValueError.
multi_net_response = (
"\ninetnum: 200.57.256/19\r\n"
"\n"
)
obj._get_nets_lacnic(multi_net_response)
def test__get_nets_other(self):
net = Net('210.107.73.73')
obj = Whois(net)
# No exception raised, but should provide code coverage for inetnum
# invalid IP ValueError.
multi_net_response = (
"\ninetnum: 210.107.0.0 - 210.107.127.256\n"
"\n"
)
obj._get_nets_other(multi_net_response)

View File

@ -31,23 +31,27 @@ import io
import csv
import logging
if sys.version_info >= (3, 3):
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
ip_network,
IPv4Address,
IPv4Network,
IPv6Address)
else:
IPv6Address,
summarize_address_range,
collapse_addresses)
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPNetwork as ip_network,
IPv4Address,
IPv4Network,
IPv6Address)
IPv6Address,
summarize_address_range,
collapse_address_list as collapse_addresses)
try:
try: # pragma: no cover
from itertools import filterfalse
except ImportError:
except ImportError: # pragma: no cover
from itertools import ifilterfalse as filterfalse
log = logging.getLogger(__name__)
@ -108,6 +112,67 @@ IP_REGEX = (
)
def ipv4_lstrip_zeros(address):
"""
The function to strip leading zeros in each octet of an IPv4 address.
Args:
address: An IPv4 address in string format.
Returns:
String: The modified IPv4 address string.
"""
# Split the octets.
obj = address.strip().split('.')
for x, y in enumerate(obj):
# Strip leading zeros. Split / here in case CIDR is attached.
obj[x] = y.split('/')[0].lstrip('0')
if obj[x] in ['', None]:
obj[x] = '0'
return '.'.join(obj)
def calculate_cidr(start_address, end_address):
"""
The function to calculate a CIDR range(s) from a start and end IP address.
Args:
start_address: The starting IP address in string format.
end_address: The ending IP address in string format.
Returns:
List: A list of calculated CIDR ranges.
"""
tmp_addrs = []
try:
tmp_addrs.extend(summarize_address_range(
ip_address(start_address),
ip_address(end_address)))
except (KeyError, ValueError, TypeError): # pragma: no cover
try:
tmp_addrs.extend(summarize_address_range(
ip_network(start_address).network_address,
ip_network(end_address).network_address))
except AttributeError: # pragma: no cover
tmp_addrs.extend(summarize_address_range(
ip_network(start_address).ip,
ip_network(end_address).ip))
return [i.__str__() for i in collapse_addresses(tmp_addrs)]
def get_countries(is_legacy_xml=False):
"""
The function to generate a dictionary containing ISO_3166-1 country codes
@ -128,7 +193,7 @@ def get_countries(is_legacy_xml=False):
# Set the data directory based on if the script is a frozen executable.
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
data_dir = path.dirname(sys.executable)
data_dir = path.dirname(sys.executable) # pragma: no cover
else:
@ -147,7 +212,7 @@ def get_countries(is_legacy_xml=False):
data = f.read()
# Check if there is data.
if not data:
if not data: # pragma: no cover
return {}
@ -302,11 +367,6 @@ def ipv6_is_defined(address):
return True, 'Multicast', 'RFC 4291, Section 2.7'
# Reserved
elif query_ip.is_reserved:
return True, 'Reserved', 'RFC 4291'
# Unspecified
elif query_ip.is_unspecified:
@ -317,6 +377,11 @@ def ipv6_is_defined(address):
return True, 'Loopback', 'RFC 4291, Section 2.5.3'
# Reserved
elif query_ip.is_reserved:
return True, 'Reserved', 'RFC 4291'
# Link-Local
elif query_ip.is_link_local:

View File

@ -30,12 +30,12 @@ import logging
from .utils import unique_everseen
from . import (BlacklistError, WhoisLookupError, NetError)
if sys.version_info >= (3, 3):
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
ip_network,
summarize_address_range,
collapse_addresses)
else:
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPNetwork as ip_network,
summarize_address_range,
@ -277,9 +277,9 @@ class Whois:
values.append(m.group('val').strip())
except AttributeError:
except IndexError:
values.append(m.group('val2').strip())
pass
sub_section_end = m.end()
@ -483,7 +483,7 @@ class Whois:
def lookup(self, inc_raw=False, retry_count=3, response=None,
get_referral=False, extra_blacklist=None,
ignore_referral_errors=False, asn_data=None,
field_list=None):
field_list=None, is_offline=False):
"""
The function for retrieving and parsing whois information for an IP
address via port 43 (WHOIS).
@ -504,6 +504,9 @@ class Whois:
field_list: If provided, a list of fields to parse:
['name', 'handle', 'description', 'country', 'state', 'city',
'address', 'postal_code', 'emails', 'created', 'updated']
is_offline: Boolean for whether to perform lookups offline. If
True, response and asn_data must be provided. Primarily used
for testing.
Returns:
Dictionary:
@ -539,7 +542,8 @@ class Whois:
referral_port = 0
# Only fetch the response if we haven't already.
if response is None or asn_data['asn_registry'] is not 'arin':
if response is None or (not is_offline and
asn_data['asn_registry'] is not 'arin'):
log.debug('Response not given, perform WHOIS lookup for {0}'
.format(self._net.address_str))
@ -561,18 +565,18 @@ class Whois:
try:
temp = match.group(1)
if 'rwhois://' not in temp:
if 'rwhois://' not in temp: # pragma: no cover
raise ValueError
temp = temp.replace('rwhois://', '').split(':')
if int(temp[1]) > 65535:
if int(temp[1]) > 65535: # pragma: no cover
raise ValueError
referral_server = temp[0]
referral_port = int(temp[1])
except (ValueError, KeyError):
except (ValueError, KeyError): # pragma: no cover
continue

View File

@ -5,7 +5,7 @@ import sys
import io
NAME = 'ipwhois'
VERSION = '0.11.0'
VERSION = '0.11.1'
AUTHOR = "Philip Hane"
AUTHOR_EMAIL = "secynic AT gmail DOT com"
DESCRIPTION = "Retrieve and parse whois data for IPv4 and IPv6 addresses."
@ -53,6 +53,7 @@ CLASSIFIERS = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Topic :: Internet",
"Topic :: Software Development",
]