aboutsummaryrefslogtreecommitdiffstats
path: root/tools/dhcpns/pdns.py
blob: 0e3b3abd3ef371c3da603e5382506feebf2e5ca9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import requests
import ipaddress


class PowerDNS:
    def __init__(self, base_url, apikey, dryrun=False):
        self.base_url = base_url
        self.apikey = apikey
        self.dryrun = dryrun
        self._rdns_v4_zones = None
        self._rdns_v6_zones = None


    def _query(self, uri, method, kwargs={}):
        if self.dryrun:
            return None

        headers = {
            'X-API-Key': self.apikey,
            'Accept': 'application/json'
        }

        if method == "GET":
            request = requests.get(self.base_url + uri, headers=headers)
        elif method == "POST":
            request = requests.post(
                self.base_url + uri,
                headers=headers,
                json=kwargs
            )
        elif method == "PUT":
            request = requests.put(
                self.base_url + uri,
                headers=headers,
                json=kwargs
            )
        elif method == "PATCH":
            request = requests.patch(
                self.base_url + uri, headers=headers, json=kwargs
            )
        elif method == "DELETE":
            request = requests.delete(self.base_url + uri, headers=headers)

        if request.headers.get('content-type') == 'application/json':
            return request.json()
        return None


    def list_zones(self):
        if self.dryrun:
            return []
        return self._query("/servers/localhost/zones", "GET")


    def get_zone(self, domain):
        return self._query("/servers/localhost/zones/%s." % domain, "GET")


    def set_records(self, domain, rrsets):
        return self._query("/servers/localhost/zones/%s" % domain, "PATCH", {
            'rrsets': rrsets
        })


    def search(self, q, max=100, object_type="all"):
        if self.dryrun:
            return []
        return self._query(
            "/servers/localhost/search-data?q={0}&max={1}&object_type={2}".format(
                q, max, object_type), "GET")


    def get_zone_metadata(self, domain):
        return self._query("/servers/localhost/zones/%s/metadata" % (domain), "GET")


    def create_zone_metadata(self, domain, kind, content):
        return self._query("/servers/localhost/zones/%s/metadata" % (domain), "POST", {
            'kind': kind,
            'metadata': [content]
        })


    def create_zone(self, domain, nameservers, kind='Master'):
        return self._query("/servers/localhost/zones", "POST", {
            'kind': kind,
            'nameservers': nameservers,
            'name': domain
        })

    def delete_zone(self, domain):
        return self._query("/servers/localhost/zones/%s." % (domain), "DELETE")


    def get_rdns_zone_from_ip(self, ip):
        if self._rdns_v4_zones is None:
            self._rdns_v4_zones = self.search("*.in-addr.arpa", 2000, "zone")
        if self._rdns_v6_zones is None:
            self._rdns_v6_zones = self.search("*.ip6.arpa", 2000, "zone")

        address = ipaddress.ip_address(ip)
        rdns_zones = self._rdns_v4_zones if address.version == 4 else self._rdns_v6_zones

        ptr = address.reverse_pointer
        rdns_zone = None
        rdns_zone_accuracy = 30
        for zone in [sub['name'] for sub in rdns_zones]:
            test = str(ptr).split('.')
            for i in range(len(test) - 3):
                x = len(test) - i
                if '.'.join(test[-x:]) + '.' in zone and rdns_zone_accuracy > i:
                    rdns_zone = zone
                    rdns_zone_accuracy = i
                    break

        return rdns_zone