ipupdate.py, ou comment rester sur la carte

Some people, when confronted with a problem, think, "I know, I’ll use regular expressions." Now they have two problems.

— Jamie Zawinski

Comme tous ces simples mortels qui hébergent leur site web derrière un routeur DSL, surveiller et publier les changements de l'adresse IP externe de ce dernier constitue un problème intéressant.

Comme toujours, les scripts shell sont les premiers appelés en renfort. Quelques minutes de programmation et DNS-O-Matic reçoit des mises à jour. Tout est bien.

Enfin, jusqu'au jour où myip.dnsomatic.com retourne une erreur. Une expression régulière s'ajoute alors à la mixture. La confiance est restaurée.

Enfin, jusqu'au moment où DNS-O-Matic refuse une mise à jour et que votre site se retrouve rayé de la carte pour 6 heures. Une nouvelle expression régulière s'ajoute alors à la mixture. N'empêche, la confiance est ébranlée.

Et puis, un jour, vous repensez à ce livre que vous avez lu, Tipping Point, et vous vous dites que l'heure est venue de passer aux choses sérieuses. Le script shell prend alors la direction des vidanges et vous écrivez un démon en python.

Satisfait de vous-même, vous présentez votre solution dans un texte que vous publiez sur votre site web.

ExternalIpLookupService

Avant de pouvoir publier l'adresse IP vers DNS-O-Matic, encore faut-il connaître cette dernière. Pour ce faire, l'emploi d'un service externe est (malheureusement et généralement) inévitable. Étonnement, les whatismyip.com de se monde ne sont pas légion:

Chacun de ces services présente une interface sensiblement identique: une requête http est effectuée et le serveur retourne un string contenant la valeur du REMOTE_ADDR.

$ip = $_SERVER['REMOTE_ADDR'];
echo "$ip";

Toutefois, certains services place une limite sur la fréquence des requêtes.

La classe ExternalIpLookupService implante la logique requise afin d'effectuer des requêtes sur un service particulier tout en respectant la limite imposée par ce dernier quant à la fréquence des requêtes.

class ExternalIpLookupService:

    # Over-simplified regular expression to match ip address.
    ip_regex = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")

    def __init__(self, name, url, min_delay_between_query = 0):
        self.name = name
        self.url = url
        self.min_delay_between_query = min_delay_between_query
        self.last_query_time = 0
        self.logger = logging.getLogger("ipupdate.ExternalIpLookupService")

    def ready(self):
        # Check if the elapsed time since the last query is
        # greater than the maximal query rate for this service.
        delay_since_last_query = int(time.time()) - self.last_query_time
        return self.min_delay_between_query < delay_since_last_query

    def query(self):
        if not self.ready():
            return None

        self.logger.debug("Querying %s." % self.name)
        self.last_query_time = int(time.time())

        try:
            response = urllib.request.urlopen(self.url, None, 30)
            content = response.read().decode('utf-8')
            ip = self.ip_regex.search(content)
            if ip is not None:
                ip = ip.group(0)
                self.logger.debug("%s." % ip)
                return ip
        except urllib.error.HTTPError as err:
            error = "HTTPError (%s) querying %s." % (err.code, self.name)
            self.logger.error(error)
            return None
        except urllib.error.URLError as err:
            error = "URLError (%s) querying %s." % (err.reason, self.name)
            self.logger.error(error)
            return None

        self.logger.warning("No IP address in %s response." % self.name)
        return None

Pour utiliser un des services listés précédemment, il suffit simplement de construire un objet de type ExternalIpLookupService en fournissant le nom du service, son url ainsi que le délai minimal devant être respecté entre deux requêtes.

icanhazip = ExternalIpLookupService("icanhazip", "http://icanhazip.com/", 300)
ip = icanhazip.query()

ExternalIpLookupServicePool

Évidemment, placer tous ses œufs dans le même panier n'est pas une très bonne idée. Le service peut connaître des problèmes techniques le rendant inutilisable pendant une certaine période de temps. Par ailleurs, utiliser un seul service limite la fréquence des requêtes à celle imposée par le service en question.

D'où l'idée de regrouper plusieurs services dans un pool. Lorsqu'une requête est effectuée contre le pool, celui-ci choisit aléatoirement un service auquel transmettre la requête. Si ce service ne répond pas, le pool est en mesure d'en essayer d'autres jusqu'à ce qu'une requête soit complétée ou que tous les services disponibles aient été essayés.

class ExternalIpLookupServicePool:

    def __init__(self, services):
        self.services = services
        self.logger = logging.getLogger("ipupdate.ExternalIpLookupServicePool")

    def add(self, service):
        self.services.append(service)

    # Returns true if at least one external
    # ip lookup service is ready to handle
    # a request.
    def ready(self):
        for service in self.services:
            if service.ready():
                return True

        return False

    def query(self):
        # Shuffle the services list so we don't
        # call them in the same order every time.
        random.shuffle(self.services)

        for service in self.services:
            ip = service.query()
            if ip is not None:
                return ip

        self.logger.warning("Cant' determine the external IP address.")
        return None

Un objet de type ExternalIpLookupServicePool est construit à partir d'une liste d'ExternalIpLookupService(s). Par ailleurs, l'interface demeure la même qu'un ExternalIpLookupService, ce qui rend ces deux classes interchangeables.

ip_lookup_service_pool = ExternalIpLookupServicePool([
    ExternalIpLookupService("dyndns", "http://checkip.dyndns.org/", 300),
    ExternalIpLookupService("ifconfig", "http://ifconfig.me/ip", 300),
    ExternalIpLookupService("codebrainz", "http://showip.codebrainz.ca/", 300),
    ExternalIpLookupService("icanhazip", "http://icanhazip.com/", 300),
    ExternalIpLookupService("externalip", "http://api.externalip.net/ip/", 300),
    ExternalIpLookupService("dnsomatic", "http://myip.dnsomatic.com/", 300)
])
ip = ip_lookup_service_pool.query()

DNSUpdateService

Une fois l'adresse IP externe déterminée, il s'agit maintenant de publier celle-ci du côté de DNS-O-Matic ou d'un autre service du même genre.

La classe DNSUpdateService, configurée avec un nom d'usager et un mot de passe s'acquitte justement de cette tâche.

class DNSUpdateService:

    # API datails can be found at https://www.dnsomatic.com/wiki/api

    realm = "DNSOMATIC"
    base_url = "https://updates.dnsomatic.com"
    update_url = "https://updates.dnsomatic.com/nic/update"
    messages = {
        "good": "The update was scheduled successfully.",
        "nochg": "No change.",
        "badauth": "The DNS-O-Matic username or password specified are incorrect.",
        "notfqdn": "The hostname specified is not a fully-qualified domain name.",
        "nohost": "The hostname passed could not be matched to any services configured.",
        "numhost": "You may update up to 20 hosts.",
        "abuse": "The hostname is blocked for update abuse.",
        "badagent": "The user-agent is blocked.",
        "dnserr": "DNS error encountered. Stop updating for 30 minutes.",
        "911": "There is a problem or scheduled maintenance on DNS-O-Matic."
    }

    def __init__(self, username, password, dry):

        self.dry = dry

        pswd_manager = urllib.request.HTTPPasswordMgr()
        pswd_manager.add_password(user = username,
                                  passwd = password,
                                  realm = self.realm,
                                  uri = self.base_url)

        auth_handler = urllib.request.HTTPBasicAuthHandler(pswd_manager)

        # Call self.opener instead of urlopen in order
        # to use the previously configured auth handler.
        self.opener = urllib.request.build_opener(auth_handler)

        self.logger = logging.getLogger("ipupdate.DNSUpdateService")

    # This method should return true on success,
    # false otherwise, and swallow exceptions.
    def update(self, ip):
        self.logger.info("Updating external IP address to %s" % ip)

        if self.dry:
            return True

        try:
            return_code = None
            params = urllib.parse.urlencode({"myip": ip})
            response = self.opener.open("%s?%s" % (self.update_url, params))
            response = response.read().decode("utf-8")
            return_code = response.split()[0]
        # Handlers raise an URLError when they run into a
        # problem. HTTPError is a subclass of URLError. It's
        # useful for dealing with HTTP errors.
        except urllib.error.HTTPError as err:
            err_msg = "HTTPError (%s)." % err.code
            self.logger.error(err_msg)
        except urllib.error.URLError as err:
            err_msg = "URLError (%s)." % err.reason
            self.logger.error(err_msg)

        if return_code in self.messages:
            self.logger.info(self.messages[return_code])

        # If DNS-O-Matic returned either good or nochg, we
        # can consider the update operation as successful.
        # Otherwise, we'll return false and the caller will
        # have the responsability of retrying the update at
        # a later time. 5 minutes seems to be a reasonable delay.
        return return_code == "good" or return_code == "nochg"

IpWatchDog

Avec toute cette machinerie en place, il ne manque plus qu'un chef d'orchestre. La classe IpWatchDog remplit ce rôle. À intervalles réguliers, cette dernière vérifie la valeur de l'adresse IP externe, la compare avec la valeur précédente et, selon le cas, publie celle-ci du côté de DNS-O-Matic.

class IpWatchDog:
    PING_HOSTS = ['www.google.com', 'www.yahoo.com',
                  'www.facebook.com', 'www.bing.com',
                  'www.youtube.com', 'www.stackoverflow.com']

    def __init__(self, ip_lookup_service, dns_update_service, check_interval):
        self.logger = logging.getLogger("ipupdate.IpWatchDog")
        self.ip_lookup_service = ip_lookup_service
        self.dns_update_service = dns_update_service
        self.check_interval = check_interval
        self.current_ip = None

    def watch(self):
        while True:
            try: self.update()
            except Exception:
                self.logger.exception("Unhandled exception during update.")

            time.sleep(self.check_interval)

    def update(self):
        if not self.connected():
            self.logger.warning("No connection, skipping update.")
            return

        ip = self.ip_lookup_service.query()
        if ip is not None and ip != self.current_ip:
            if self.dns_update_service.update(ip):
                # Since the update was successful, we update
                # the current_ip. Otherwise, we simply do
                # nothing and it should still be unequal to
                # the new ip address on the next iteration.
                self.current_ip = ip

    def connected(self):
        random.shuffle(self.PING_HOSTS)
        for host in self.PING_HOSTS:
            if ping(host):
                return True
        return False

Conclusion

Avec un srcipt shell aux vidanges et quelques lignes de python, nous avons maintenant un logiciel compréhensible, maintenable et testable. En fait, tellement compréhensible que nous ne saurions résister à la tentation irrépressible d'en faire un joli diagramme UML.

Diagramme de classes.

Le code source complet peut être consulté en ligne ainsi qu'un fichier de configuration typique.