Source code for salo.events.zeek.dns

#!/usr/bin/env python3

#  Copyright 2021 Splunk Inc.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from salo import fake
import random
from typing import List, Optional, Union

from pydantic import Field, validator, root_validator

from .base import ZeekModel

MAX_TRANS_ID = 65535
MAX_RTT = 2
MAX_TTL = 28800

Q_TYPES = {
    1: "A",
    2: "NS",
    5: "CNAME",
    15: "MX",
    16: "TXT",
    28: "AAAA",
    33: "SRV",
    65: "HTTPS",
    255: "*",
}

Q_CLASSES = {1: "C_INTERNET"}

RCODES = {0: "NOERROR", 2: "SERVFAIL", 3: "NXDOMAIN"}


[docs]class DNSModel(ZeekModel): _refs: List[str] = [ "https://docs.zeek.org/en/master/scripts/base/protocols/dns/main.zeek.html#type-DNS::Info", "https://docs.zeek.org/en/master/logs/dns.html", ] proto: Optional[str] = None dns_id: Optional[int] = None dns_rtt: Optional[float] = None dns_query: str = Field(default_factory=fake.hostname) dns_qclass: int dns_qclass_name: str dns_qtype: int dns_qtype_name: str dns_rcode: int dns_rcode_name: str dns_aa: Optional[bool] = None dns_tc: Optional[bool] = None dns_rd: Optional[bool] = None dns_ra: Optional[bool] = None dns_z: int = Field(default=0) dns_rdata: Optional[Union[str, List[str]]] = None dns_ttl: Optional[Union[int, List[int]]] = None dns_rejected: Optional[bool] = None dns_auth: Optional[List[str]] = None dns_addl: Optional[List[str]] = None dns_original_query: Optional[str] = None
[docs] class Config: fields = { "dns_id": "trans_id", "dns_rtt": "rtt", "dns_query": "query", "dns_aa": "AA", "dns_tc": "TC", "dns_rd": "RD", "dns_ra": "RA", "dns_z": "Z", "dns_qclass": "qclass", "dns_qclass_name": "qclass_name", "dns_rcode": "rcode", "dns_rcode_name": "rcode_name", "dns_qtype": "qtype", "dns_qtype_name": "qtype_name", "dns_ttl": "TTLs", "dns_rdata": "answers", "dns_rejected": "rejected", "dns_auth": "auth", "dns_addl": "addl", "dns_original_query": "original_query" }
[docs] @validator("dest_port", pre=True, always=True, check_fields=False) def set_dest_port(cls, v): return v or 53
[docs] @validator("proto", pre=True, always=True) def set_proto(cls, v): return v or random.choice(["tcp", "udp"])
[docs] @validator("dns_id", pre=True, always=True) def set_dns_id(cls, v): return v or fake.pyint(max_value=MAX_TRANS_ID)
[docs] @validator("dns_rtt", pre=True, always=True) def set_dns_rtt(cls, v): return v or fake.pydecimal(right_digits=14, positive=True, max_value=MAX_RTT)
[docs] @validator("dns_aa", pre=True, always=True) def set_dns_aa(cls, v): return v or fake.boolean(chance_of_getting_true=90)
[docs] @validator("dns_tc", pre=True, always=True) def set_dns_tc(cls, v): return v or fake.boolean(chance_of_getting_true=1)
[docs] @validator("dns_rd", pre=True, always=True) def set_rd(cls, v): return v or fake.boolean(chance_of_getting_true=10)
[docs] @validator("dns_ra", pre=True, always=True) def set_ra(cls, v): return v or fake.boolean(chance_of_getting_true=10)
[docs] @validator("dns_rejected", pre=True, always=True) def set_dns_rejected(cls, v): return v or fake.boolean(chance_of_getting_true=1)
[docs] @validator("dns_rdata", pre=True, always=True) def set_dns_rdata(cls, v): if isinstance(v, str): return [v] return v
[docs] @validator("dns_ttl", pre=True, always=True) def set_dns_ttl(cls, v, *, values): if isinstance(v, int): return [v] elif values.get('dns_rdata'): return [ fake.pyint(max_value=MAX_TTL) for _ in range(len(values["dns_rdata"])) ] else: return v
[docs] @root_validator(pre=True) def set_values(cls, values) -> None: values["dns_qclass"] = values.get("dns_qclass", random.choice(list(Q_CLASSES.keys()))) values["dns_qclass_name"] = values.get("dns_qclass_name", Q_CLASSES[values["dns_qclass"]]) values["dns_qtype"] = values.get("dns_qtype", random.choice(list(Q_TYPES.keys()))) values["dns_qtype_name"] = values.get("dns_qtype_name", Q_TYPES[values["dns_qtype"]]) values["dns_rcode"] = values.get("dns_rcode", random.choice(list(RCODES.keys()))) values["dns_rcode_name"] = values.get("dns_rcode_name", RCODES[values["dns_rcode"]]) if values["dns_rcode"] == 0 and not values.get("dns_rdata"): answers = [] if values["dns_qtype"] in (1, 2): answers = [fake.ipv4_public() for _ in range(fake.pyint(max_value=10))] elif values["dns_qtype"] == 15: answers = [ f"mx.{fake.domain_name()}" for _ in range(fake.pyint(max_value=4)) ] elif values["dns_qtype"] == 28: answers = [fake.ipv6() for _ in range(fake.pyint(max_value=10))] values["dns_rdata"] = answers return values