Commit 478dae41 by William Tisäter

Add support for NETSPEED_EDITION

parent 2df4b7b2
......@@ -116,4 +116,5 @@ For more information, [check out the full API documentation](http://packages.pyt
* Organization IPv4
* ISP IPv4
* Region IPv4
* ASNum IPv4 and IPv6
* ASN IPv4 and IPv6
* Netspeed IPv4
......@@ -155,6 +155,7 @@ class GeoIP(object):
* ISP_EDITION
* ASNUM_EDITION
* ASNUM_EDITION_V6
* NETSPEED_EDITION
"""
self._databaseType = const.COUNTRY_EDITION
......@@ -420,10 +421,23 @@ class GeoIP(object):
else:
return socket.gethostbyname(hostname)
def _id_by_addr(self, addr):
def id_by_name(self, hostname):
"""
Looks up the index for the country which is the key for the
code and name.
Returns the database id for specified hostname.
The id might be useful as array index. 0 is unknown.
@param hostname: Hostname
@type hostname: str
@return: network byte order 32-bit integer
@rtype: int
"""
addr = self._gethostbyname(hostname)
return self.id_by_addr(addr)
def id_by_addr(self, addr):
"""
Returns the database id for specified address.
The id might be useful as array index. 0 is unknown.
@param addr: IPv4 or IPv6 address
@type addr: str
......@@ -431,7 +445,7 @@ class GeoIP(object):
@rtype: int
"""
ipv = 6 if addr.find(':') >= 0 else 4
if ipv == 4 and self._databaseType != const.COUNTRY_EDITION:
if ipv == 4 and self._databaseType not in (const.COUNTRY_EDITION, const.NETSPEED_EDITION):
raise GeoIPError('Invalid database type; expected IPv6 address')
if ipv == 6 and self._databaseType != const.COUNTRY_EDITION_V6:
raise GeoIPError('Invalid database type; expected IPv4 address')
......@@ -460,7 +474,7 @@ class GeoIP(object):
"""
VALID_EDITIONS = (const.COUNTRY_EDITION, const.COUNTRY_EDITION_V6)
if self._databaseType in VALID_EDITIONS:
country_id = self._id_by_addr(addr)
country_id = self.id_by_addr(addr)
return const.COUNTRY_CODES[country_id]
elif self._databaseType in const.REGION_CITY_EDITIONS:
return self.region_by_addr(addr).get('country_code')
......@@ -480,6 +494,29 @@ class GeoIP(object):
addr = self._gethostbyname(hostname)
return self.country_code_by_addr(addr)
def netspeed_by_addr(self, addr):
"""
Returns NetSpeed name from address.
@param hostname: IP address
@type hostname: str
@return: netspeed name
@rtype: str
"""
return const.NETSPEED_NAMES[self.id_by_addr(addr)]
def netspeed_by_name(self, hostname):
"""
Returns NetSpeed name from hostname.
@param hostname: Hostname
@type hostname: str
@return: netspeed name
@rtype: str
"""
addr = self._gethostbyname(hostname)
return self.netspeed_by_addr(addr)
def country_name_by_addr(self, addr):
"""
Returns full country name for specified IP address.
......@@ -492,7 +529,7 @@ class GeoIP(object):
"""
VALID_EDITIONS = (const.COUNTRY_EDITION, const.COUNTRY_EDITION_V6)
if self._databaseType in VALID_EDITIONS:
country_id = self._id_by_addr(addr)
country_id = self.id_by_addr(addr)
return const.COUNTRY_NAMES[country_id]
elif self._databaseType in const.CITY_EDITIONS:
return self.record_by_addr(addr).get('country_name')
......
......@@ -383,6 +383,10 @@ CONTINENT_NAMES = (
'NA', 'NA', 'AF'
)
NETSPEED_NAMES = (
'Unknown', 'Dial-up', 'Cable', 'Corporate'
)
# storage / caching flags
STANDARD = 0
MEMORY_CACHE = 1
......@@ -408,9 +412,11 @@ ORG_EDITION = 5
ISP_EDITION = 4
ASNUM_EDITION = 9
ASNUM_EDITION_V6 = 21
NETSPEED_EDITION = 10
# Not yet supported databases
PROXY_EDITION = 8
NETSPEED_EDITION = 11
NETSPEED_EDITION_REV1 = 32
NETSPEED_EDITION_REV1_V6 = 33
# Collection of databases
IPV6_EDITIONS = (COUNTRY_EDITION_V6, ASNUM_EDITION_V6, CITY_EDITION_REV1_V6)
......
......@@ -9,4 +9,5 @@ CITY_DB_PATH = path.join(DATA_DIR, 'GeoLiteCity.dat')
ORG_DB_PATH = path.join(DATA_DIR, 'GeoIPOrg.dat')
ASNUM_DB_PATH = path.join(DATA_DIR, 'GeoIPASNum.dat')
ISP_DB_PATH = path.join(DATA_DIR, 'GeoIPISP.dat')
NETSPEED_DB_PATH = path.join(DATA_DIR, 'GeoIPNetSpeedRev0.dat')
CORRUPT_DB_PATH = path.join(DATA_DIR, '../config.py')
# -*- coding: utf-8 -*-
import unittest
import pygeoip
from tests.config import NETSPEED_DB_PATH
class TestGeoIPOrgFunctions(unittest.TestCase):
def setUp(self):
self.ip = '17.172.224.47'
self.hostname = 'apple.com'
self.gi = pygeoip.GeoIP(NETSPEED_DB_PATH)
def testNetSpeedByAddr(self):
netspeed = self.gi.id_by_addr(self.ip)
self.assertEqual(netspeed, 3)
def testNetSpeedByAddrWrapper(self):
netspeed = self.gi.netspeed_by_addr(self.ip)
self.assertEqual(netspeed, 'Corporate')
def testNetSpeedByNameWrapper(self):
netspeed = self.gi.netspeed_by_name(self.hostname)
self.assertEqual(netspeed, 'Corporate')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment