Archive for the 'scapy' Category

20
Mar
14

Spoofing DNS Packets

edit: scripts on github https://github.com/b4ldr/spoof

For part of our monitoring we send DNS packets to a dummy interface with a spoofed source, we then see if the packets make it back to the spoofed source address, a monitoring server. This is fairly trivial to do in perl using something like the following

use Net::RawIP;
my $sock = new Net::RawIP({udp=>{}});
$sock->set({
  ip => {
    saddr => $src_ip,
    daddr => $dst_ip,
    frag_off => 0,
    tos => 0,
    id => 6969,
  },
  udp => {
    source => 6969,
    dest => 53,
    data => $dnsdata,
  }
});
$sock->send;

I had assumed this would be trivial to do in python, my first choice was to use scapy. its a powerful tool and should be able to do the job pretty easily

import scapy
send(IP(src=src, dst=dst)/UDP(sport=src_port,dport=dst_port)/DNS(qd=DNSQR(qname=qname))

There was one change i wanted to add an nsid, scapy probably supports this but i couldn’t find it easily so a quick change

import scapy
import dns.message
import dns.rdatatype
import dns.rdataclass
query = dns.message.make_query(qname, dns.rdatatype.SOA, dns.rdataclass.IN)
query.use_edns(payload=4096, options=[dns.edns.GenericOption(dns.edns.NSID, '')])
send(IP(src=src, dst=dst)/UDP(sport=src_port,dport=dst_port)/Raw(load=query.to_wire()))

Now that should have been the end of it, job well done time to put my feat up. however when i tested the script i could see the query go out but my server wasn’t responding. the queries being set all looked like DNS queries but still no response. In desperation i got tshark out yes this was indeed a valid dns packet everything was as it should, so where was my response.

I went for a coffee and when i returned i decided to test this on a different server. To my surprise i got a response on the monitoring server, however it was from a completely different server. We the destination address we are testing is an anycasted address, so this meant that scapy sent the packet straight out of the primary interface completely bypassing the dummy interface.

Thats a pain, back to the drawing board. So i start to look aback at the socket library and raw sockets. ip and udp headers are pretty simple so i could have just writing a simple struct like raw packet and sent that. although i figured there must be a library out there that already does this with perhaps some nicer functions.

after a bit of googling i cam across the impacket library, its written by the corelabs folk i figured it would be pretty good. Unfortunately there is not much documentation or examples around that use the library in the manner i wanted to use it. Eventually i went scouring through the source code and with a bit of trial and error and a bit of hacking i managed to come up with what i wanted.

#!/usr/bin/env python
import argparse
import socket
import dns.message
import dns.rdatatype
import dns.rdataclass
from impacket import ImpactPacket
def get_args():
    '''return argpars opject'''
    parser = argparse.ArgumentParser(description='dns spoof monitoring server')
    parser.add_argument('-s', '--source', help='Source address', required=True)
    parser.add_argument('-p', '--source-port', help='Source port', default=6969, type=int)
    parser.add_argument('-d', '--destination', help='Destination address', required=True)
    parser.add_argument('-P', '--destination-port', help='Destination port', default=53, type=int)
    parser.add_argument('-Q', '--qname', help='query name', required=True)
    parser.add_argument('-T', '--qtype', help='query type', default='SOA')
    parser.add_argument('-C', '--qclass', help='query class', default='IN')
    parser.add_argument('-n', '--nsid', help='set the NSID OPT bit', action='store_true')
    return parser.parse_args()
def main():
    '''main function for using as cli'''
    args = get_args()
    query = dns.message.make_query(args.qname,
        dns.rdatatype.from_text(args.qtype), dns.rdataclass.from_text(args.qclass))
    if args.nsid:
        query.use_edns(payload=4096, options=[dns.edns.GenericOption(dns.edns.NSID, '')])
    data = ImpactPacket.Data(query.to_wire())
    udp = ImpactPacket.UDP()
    udp.set_uh_sport(args.source_port)
    udp.set_uh_dport(args.destination_port)
    udp.contains(data)
    ip = ImpactPacket.IP()
    ip.set_ip_src(args.source)
    ip.set_ip_dst(args.destination)
    ip.contains(udp)
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
    s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
    s.sendto(ip.get_packet(), (args.destination,args.destination_port))

if __name__ == "__main__":
    main()

The entire time there was a voice in the back of my mind saying, this is just going to have the same problem, luckily it didn’t, it hit the dummy interface just as planned and things are working again

Advertisements



Advertisements