#!/usr/bin/env python3
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import zip_longest
from typing import Dict, List, Optional, Union
from pydantic import Field, root_validator, validator
from salo import SaloEventModel, fake
from .base import SuricataModel
MAX_DNS_ID = 65535
MAX_TTL = 28800
[docs]class DNSModelFull(SaloEventModel):
dns_type: Optional[str] = Field(defaults="query")
dns_id: Optional[int] = None
dns_version: Optional[str] = None
dns_qr: Optional[bool] = None
dns_aa: Optional[bool] = None
dns_tc: Optional[bool] = None
dns_rd: Optional[bool] = None
dns_ra: Optional[bool] = None
dns_rcode_name: Optional[str] = None
dns_query: Optional[str] = None
dns_qtype_name: Optional[str] = None
dns_rdata: Optional[Union[str, List[str]]] = None
dns_ttl: Optional[Union[List[int], int]] = None
dns_answers: Optional[List[Dict]] = None
dns_grouped: Optional[Dict[str, List]] = None
[docs] class Config:
fields = {
"dns_type": "type",
"dns_id": "id",
"dns_version": "version",
"dns_qr": "qr",
"dns_aa": "aa",
"dns_tc": "tc",
"dns_rd": "rd",
"dns_ra": "ra",
"dns_rcode_name": "rcode",
"dns_query": "rrname",
"dns_qtype_name": "rrtype",
"dns_rdata": "rdata",
"dns_ttl": "ttl",
"dns_answers": "answers",
"dns_grouped": "grouped",
}
[docs] @validator("dns_id", pre=True, always=True)
def set_dns_id(cls, v):
return v or fake.pyint(max_value=MAX_DNS_ID)
[docs] @validator("dns_qr", pre=True, always=True)
def set_dns_qr(cls, v):
return v or fake.boolean(chance_of_getting_true=90)
[docs] @validator("dns_aa", pre=True, always=True)
def set_aa(cls, v):
return v or fake.boolean(chance_of_getting_true=90)
[docs] @validator("dns_tc", pre=True, always=True)
def set_tc(cls, v):
return v or fake.boolean(chance_of_getting_true=1)
[docs] @validator("dns_rd", pre=True, always=True)
def set_rd(cls, v):
return v or fake.boolean(chance_of_getting_true=10)
[docs] @validator("dns_ra", pre=True, always=True)
def set_ra(cls, v):
return v or fake.boolean(chance_of_getting_true=10)
[docs] @root_validator(pre=True)
def set_values(cls, values):
if not "dns_answers" in values:
answers = []
if values.get("dns_type") == "answer":
responses = values.get("dns_rdata", [])
ttls = values.get("dns_ttl", [])
if isinstance(responses, str):
responses = [responses]
if isinstance(ttls, int):
ttls = [ttls]
for response, ttl in zip_longest(responses, ttls):
answer = {
"rrname": values.get("dns_query", fake.hostname()),
"rrtype": values.get("dns_qtype_name", "CNAME"),
"ttl": ttl or fake.pyint(max_value=MAX_TTL),
"rdata": response,
}
answers.append(answer)
values["dns_answers"] = answers
# Ensure these are None so they don't show up in results
values["dns_rdata"] = None
values["dns_ttl"] = None
return values
[docs]class DNSModel(SuricataModel):
_refs: List[str] = [
"https://docs.zeek.org/en/master/logs/dns.html",
]
event_type: str = Field(default="dns")
dest_port: int = Field(default=53)
dns: Optional[Union[DNSModelFull, Dict]] = None
[docs] @root_validator(pre=True)
def set_values(cls, values):
values["dns"] = DNSModelFull(**values)
return values
[docs] def get_options(self, *args, **kwargs) -> Dict:
data: Dict = super().dict(*args, **kwargs)
new_data: Dict = data.copy()
new_data.update(data.pop("dns"))
answers: List[Dict] = new_data.get("dns_answers", [])
if answers:
new_data["dns_query"] = new_data.get("dns_query", answers[0].get("rrname"))
new_data["dns_qtype_name"] = new_data.get(
"dns_qtype_name", answers[0].get("rrtype")
)
new_data["dns_ttl"] = new_data.get("dns_ttl", answers[0].get("ttl"))
new_data["dns_rdata"] = new_data.get("dns_rdata", answers[0].get("rdata"))
return new_data