This commit is contained in:
secynic 2014-01-13 12:28:32 -06:00
parent ca13612363
commit f59ac7ccdf
6 changed files with 342 additions and 168 deletions

View File

@ -1,6 +1,12 @@
Changelog
=========
0.6.0 (2014-01-13)
------------------
- Added APNIC RWS support for IPWhois.lookup_rws().
- Fixed issue in IPWhois.lookup_rws() for radb-grs fallback.
0.5.2 (2013-12-07)
------------------

View File

@ -1,4 +1,4 @@
Copyright (c) 2013, Philip Hane
Copyright (c) 2013, 2014 Philip Hane
All rights reserved.
Redistribution and use in source and binary forms, with or without

View File

@ -139,7 +139,7 @@ REST (HTTP)
===========
IPWhois.lookup_rws() should be faster than IPWhois.lookup(), but may not be as
reliable. APNIC, and AFRINIC do not have a Whois-RWS service yet. We
have to rely on the Ripe RWS service, which does not contain all of the data
we need. The LACNIC RWS service is supported, but is in beta v2. This may
result in availability or performance issues.
reliable. AFRINIC does not have a Whois-RWS service yet. We have to rely on the
Ripe RWS service, which does not contain all of the data we need. The LACNIC
RWS service is supported, but is in beta v2. This may result in availability
or performance issues.

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013, Philip Hane
# Copyright (c) 2013, 2014 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -22,7 +22,7 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__version__ = '0.5.2'
__version__ = '0.6.0'
from .ipwhois import (IPWhois, IPDefinedError, ASNLookupError,
WhoisLookupError, HostLookupError)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013, Philip Hane
# Copyright (c) 2013, 2014 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -108,10 +108,7 @@ NIC_WHOIS = {
},
'apnic': {
'server': 'whois.apnic.net',
'url': (
'http://apps.db.ripe.net/whois/grs-search?'
'query-string={0}&source=apnic-grs'
),
'url': 'http://rdap.apnic.net/ip/{0}',
'fields': {
'name': r'^(netname):[^\S\n]+(?P<val>.+)$',
'description': r'^(descr):[^\S\n]+(?P<val>.+)$',
@ -128,7 +125,8 @@ NIC_WHOIS = {
),
'updated': r'^(changed):[^\S\n]+.*?(?P<val>[0-9]{8})$'
},
'dt_format': '%Y%m%d'
'dt_format': '%Y%m%d',
'dt_rws_format': '%Y-%m-%dT%H:%M:%S%z'
},
'lacnic': {
'server': 'whois.lacnic.net',
@ -1052,153 +1050,6 @@ class IPWhois():
return nets
def _lookup_rws_lacnic(self, response=None):
"""
The function for retrieving and parsing whois information for a LACNIC
IP address via HTTP (Whois-RWS).
Args:
response: The dictionary containing whois information to parse.
Returns:
List: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. Certain IPs
have more granular network listings, hence the need for a list
object.
"""
addrs = []
net = BASE_NET.copy()
try:
addrs.extend(ipaddress.summarize_address_range(
ipaddress.ip_address(response['startAddress'].strip()),
ipaddress.ip_address(response['endAddress'].strip())))
net['cidr'] = ', '.join(
[i.__str__()
for i in ipaddress.collapse_addresses(addrs)]
)
except (KeyError, ValueError, TypeError):
pass
try:
net['country'] = response['country'].strip()
except KeyError:
pass
try:
events = response['events']
if not isinstance(events, list):
events = [events]
except KeyError:
events = []
for ev in events:
try:
if ev['eventAction'] == 'registration':
tmp = ev['eventDate'].strip()
value = datetime.strptime(
tmp,
NIC_WHOIS['lacnic']['dt_rws_format']
).isoformat('T')
net['created'] = value
elif ev['eventAction'] == 'last changed':
tmp = ev['eventDate'].strip()
value = datetime.strptime(
tmp,
NIC_WHOIS['lacnic']['dt_rws_format']
).isoformat('T')
net['updated'] = value
except (KeyError, ValueError):
pass
try:
entities = response['entities']
if not isinstance(entities, list):
entities = [entities]
except KeyError:
entities = []
for en in entities:
try:
if en['roles'][0] == 'registrant':
temp = en['vcardArray'][1]
for t in temp:
if t[0] == 'fn':
net['name'] = t[3].strip()
elif t[0] == 'org':
net['description'] = t[3][0].strip()
elif t[0] == 'adr':
net['address'] = t[1]['label'].strip()
elif t[0] == 'email':
net['misc_emails'] = t[3].strip()
elif en['roles'][0] == 'abuse':
temp = en['vcardArray'][1]
for t in temp:
if t[0] == 'email':
net['abuse_emails'] = t[3].strip()
elif en['roles'][0] == 'tech':
temp = en['vcardArray'][1]
for t in temp:
if t[0] == 'email':
net['tech_emails'] = t[3].strip()
except (KeyError, IndexError):
pass
return [net]
def _lookup_rws_ripe(self, response=None):
"""
The function for retrieving and parsing whois information for a RIPE
@ -1364,15 +1215,322 @@ class IPWhois():
return nets
def _lookup_rws_apnic(self, response=None):
"""
The function for retrieving and parsing whois information for a APNIC
IP address via HTTP (Whois-RWS).
Args:
response: The dictionary containing whois information to parse.
Returns:
List: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. Certain IPs
have more granular network listings, hence the need for a list
object.
"""
addrs = []
net = BASE_NET.copy()
try:
addrs.extend(ipaddress.summarize_address_range(
ipaddress.ip_address(response['startAddress'].strip()),
ipaddress.ip_address(response['endAddress'].strip())))
net['cidr'] = ', '.join(
[i.__str__()
for i in ipaddress.collapse_addresses(addrs)]
)
except (KeyError, ValueError, TypeError):
pass
try:
net['country'] = response['country'].strip()
except KeyError:
pass
try:
events = response['events']
if not isinstance(events, list):
events = [events]
except KeyError:
events = []
for ev in events:
try:
if ev['eventAction'] == 'registration':
net['created'] = ev['eventDate'].strip()
elif ev['eventAction'] == 'last changed':
net['updated'] = ev['eventDate'].strip()
except (KeyError, ValueError):
pass
try:
entities = response['entities']
if not isinstance(entities, list):
entities = [entities]
except KeyError:
entities = []
for en in entities:
try:
temp = en['vcardArray'][1]
for t in temp:
if 'administrative' in en['roles'] and t[0] == 'fn':
net['name'] = t[3].strip()
elif 'administrative' in en['roles'] and t[0] == 'adr':
try:
net['address'] = t[1]['label'].strip()
except KeyError:
pass
elif t[0] == 'email':
key = None
if (len(en['roles']) > 1 or
en['roles'][0] == 'administrative'):
key = 'misc_emails'
elif en['roles'][0] == 'abuse':
key = 'abuse_emails'
elif en['roles'][0] == 'technical':
key = 'tech_emails'
if key is not None:
if net[key] is not None:
net[key] += '\n%s' % t[3].strip()
else:
net[key] = t[3].strip()
except (KeyError, IndexError):
pass
try:
remarks = response['remarks']
if not isinstance(remarks, list):
remarks = [remarks]
except KeyError:
remarks = []
for rem in remarks:
try:
if rem['title'] == 'description':
net['description'] = '\n'.join(rem['description'])
except (KeyError, IndexError):
pass
return [net]
def _lookup_rws_lacnic(self, response=None):
"""
The function for retrieving and parsing whois information for a LACNIC
IP address via HTTP (Whois-RWS).
Args:
response: The dictionary containing whois information to parse.
Returns:
List: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. Certain IPs
have more granular network listings, hence the need for a list
object.
"""
addrs = []
net = BASE_NET.copy()
try:
addrs.extend(ipaddress.summarize_address_range(
ipaddress.ip_address(response['startAddress'].strip()),
ipaddress.ip_address(response['endAddress'].strip())))
net['cidr'] = ', '.join(
[i.__str__()
for i in ipaddress.collapse_addresses(addrs)]
)
except (KeyError, ValueError, TypeError):
pass
try:
net['country'] = response['country'].strip()
except KeyError:
pass
try:
events = response['events']
if not isinstance(events, list):
events = [events]
except KeyError:
events = []
for ev in events:
try:
if ev['eventAction'] == 'registration':
tmp = ev['eventDate'].strip()
value = datetime.strptime(
tmp,
NIC_WHOIS['lacnic']['dt_rws_format']
).isoformat('T')
net['created'] = value
elif ev['eventAction'] == 'last changed':
tmp = ev['eventDate'].strip()
value = datetime.strptime(
tmp,
NIC_WHOIS['lacnic']['dt_rws_format']
).isoformat('T')
net['updated'] = value
except (KeyError, ValueError):
pass
try:
entities = response['entities']
if not isinstance(entities, list):
entities = [entities]
except KeyError:
entities = []
for en in entities:
try:
if en['roles'][0] == 'registrant':
temp = en['vcardArray'][1]
for t in temp:
if t[0] == 'fn':
net['name'] = t[3].strip()
elif t[0] == 'org':
net['description'] = t[3][0].strip()
elif t[0] == 'adr':
net['address'] = t[1]['label'].strip()
elif t[0] == 'email':
net['misc_emails'] = t[3].strip()
elif en['roles'][0] == 'abuse':
temp = en['vcardArray'][1]
for t in temp:
if t[0] == 'email':
net['abuse_emails'] = t[3].strip()
elif en['roles'][0] == 'tech':
temp = en['vcardArray'][1]
for t in temp:
if t[0] == 'email':
net['tech_emails'] = t[3].strip()
except (KeyError, IndexError):
pass
return [net]
def lookup_rws(self, inc_raw=False, retry_count=3):
"""
The function for retrieving and parsing whois information for an IP
address via HTTP (Whois-RWS).
NOTE: This should be faster than IPWhois.lookup(), but may not be as
reliable. APNIC and AFRINIC do not have a Whois-RWS
service yet. We have to rely on the Ripe RWS service, which does
not contain all of the data we need. LACNIC RWS is in beta v2.
reliable. AFRINIC does not have a Whois-RWS service yet. We have
to rely on the Ripe RWS service, which does not contain all of the
data we need. LACNIC RWS is in beta v2.
Args:
inc_raw: Boolean for whether to include the raw whois results in
@ -1415,6 +1573,9 @@ class IPWhois():
#Add the ASN information to the return dictionary.
results.update(asn_data)
#Create the boolean for if the response is a radb-grs search.
is_radb = False
#Retrieve the whois data.
try:
@ -1427,6 +1588,8 @@ class IPWhois():
#If the query failed, try the radb-grs source.
except WhoisLookupError:
is_radb = True
response = self.get_rws((
'http://apps.db.ripe.net/whois/grs-search'
'?query-string={0}&source=radb-grs').format(self.address_str),
@ -1438,17 +1601,22 @@ class IPWhois():
results['raw'] = response
if results['asn_registry'] == 'arin':
if (results['asn_registry'] in ('ripencc', 'afrinic') or
is_radb is True):
nets = self._lookup_rws_ripe(response)
elif results['asn_registry'] == 'arin':
nets = self._lookup_rws_arin(response, retry_count)
elif results['asn_registry'] == 'lacnic':
elif results['asn_registry'] == 'apnic':
nets = self._lookup_rws_lacnic(response)
nets = self._lookup_rws_apnic(response)
else:
nets = self._lookup_rws_ripe(response)
nets = self._lookup_rws_lacnic(response)
#Add the networks to the return dictionary.
results['nets'] = nets

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013, Philip Hane
# Copyright (c) 2013, 2014 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without