EVOLUTION-MANAGER
Edit File: ts_min_dump_utils.lua
-- -- (C) 2019-24 - ntop.org -- local dirs = ntop.getDirs() package.path = dirs.installdir .. "/scripts/lua/modules/timeseries/schemas/?.lua;" .. package.path local profiling = require "profiling" -- ######################################################## require "ntop_utils" require "ts_minute" require "check_redis_prefs" local ts_utils = require("ts_utils_core") local ts_custom if ntop.exists(dirs.installdir .. "/scripts/lua/modules/timeseries/custom/ts_5min_custom.lua") then package.path = dirs.installdir .. "/scripts/lua/modules/timeseries/custom/?.lua;" .. package.path ts_custom = require "ts_5min_custom" end local ts_dump = {} -- ######################################################## function ts_dump.iface_update_ndpi_rrds(when, _ifname, ifstats, verbose, config) for k in pairs(ifstats["ndpi"]) do local v = ifstats["ndpi"][k]["bytes.sent"] + ifstats["ndpi"][k]["bytes.rcvd"] if (verbose) then print("[" .. __FILE__() .. ":" .. __LINE__() .. "] " .. _ifname .. ": " .. k .. "=" .. v .. "\n") end ts_utils.append("iface:ndpi", { ifid = ifstats.id, protocol = k, bytes = v }, when) if config.ndpi_flows_timeseries_creation == "1" then ts_utils.append("iface:ndpi_flows", { ifid = ifstats.id, protocol = k, num_flows = ifstats["ndpi"][k]["num_flows"] }, when) end end end -- ######################################################## function ts_dump.iface_update_categories_rrds(when, _ifname, ifstats, verbose) for k, v in pairs(ifstats["ndpi_categories"]) do v = v["bytes"] if (verbose) then print("[" .. __FILE__() .. ":" .. __LINE__() .. "] " .. _ifname .. ": " .. k .. "=" .. v .. "\n") end ts_utils.append("iface:ndpi_categories", { ifid = ifstats.id, category = k, bytes = v }, when) end end -- ######################################################## function ts_dump.iface_update_stats_rrds(when, _ifname, ifstats, verbose) -- IN/OUT counters if (ifstats["localstats"]["bytes"]["local2remote"] > 0) then ts_utils.append("iface:local2remote", { ifid = ifstats.id, bytes = ifstats["localstats"]["bytes"]["local2remote"] }, when) end if (ifstats["localstats"]["bytes"]["remote2local"] > 0) then ts_utils.append("iface:remote2local", { ifid = ifstats.id, bytes = ifstats["localstats"]["bytes"]["remote2local"] }, when) end end -- ######################################################## function ts_dump.subnet_update_rrds(when, ifstats, verbose) local subnet_stats = interface.getNetworksStats() for subnet, sstats in pairs(subnet_stats or {}) do if ntop.isPro() and not isEmptyString(ntop.getPref("ntopng.prefs.intranet_traffic_rrd_creation") or "") then for second_subnet, traffic in pairs(sstats["intranet_traffic"]) do if traffic.bytes_sent ~= 0 or traffic.bytes_rcvd ~= 0 then ts_utils.append("subnet:intranet_traffic_min", { ifid = ifstats.id, subnet = subnet, subnet_2 = second_subnet, bytes_sent = traffic.bytes_sent, bytes_rcvd = traffic.bytes_rcvd }, when) end network.select(sstats.network_id) network.resetTrafficBetweenNets() end end -- Save ASN RTT stats if not ifstats.isViewed and not ifstats.isView then ts_utils.append("subnet:rtt", { ifid = ifstats.id, subnet = subnet, millis_rtt = sstats["round_trip_time"] }, when) end ts_utils.append("subnet:traffic", { ifid = ifstats.id, subnet = subnet, bytes_ingress = sstats["ingress"], bytes_egress = sstats["egress"], bytes_inner = sstats["inner"] }, when) ts_utils.append("subnet:broadcast_traffic", { ifid = ifstats.id, subnet = subnet, bytes_ingress = sstats["broadcast"]["ingress"], bytes_egress = sstats["broadcast"]["egress"], bytes_inner = sstats["broadcast"]["inner"] }, when) ts_utils.append("subnet:score", { ifid = ifstats.id, subnet = subnet, score = sstats["score"], scoreAsClient = sstats["score.as_client"], scoreAsServer = sstats["score.as_server"] }, when) if not ifstats.isSampledTraffic then ts_utils.append("subnet:tcp_retransmissions", { ifid = ifstats.id, subnet = subnet, packets_ingress = sstats["tcpPacketStats.ingress"]["retransmissions"], packets_egress = sstats["tcpPacketStats.egress"]["retransmissions"], packets_inner = sstats["tcpPacketStats.inner"]["retransmissions"] }, when) ts_utils.append("subnet:tcp_out_of_order", { ifid = ifstats.id, subnet = subnet, packets_ingress = sstats["tcpPacketStats.ingress"]["out_of_order"], packets_egress = sstats["tcpPacketStats.egress"]["out_of_order"], packets_inner = sstats["tcpPacketStats.inner"]["out_of_order"] }, when) ts_utils.append("subnet:tcp_lost", { ifid = ifstats.id, subnet = subnet, packets_ingress = sstats["tcpPacketStats.ingress"]["lost"], packets_egress = sstats["tcpPacketStats.egress"]["lost"], packets_inner = sstats["tcpPacketStats.inner"]["lost"] }, when) ts_utils.append("subnet:tcp_keep_alive", { ifid = ifstats.id, subnet = subnet, packets_ingress = sstats["tcpPacketStats.ingress"]["keep_alive"], packets_egress = sstats["tcpPacketStats.egress"]["keep_alive"], packets_inner = sstats["tcpPacketStats.inner"]["keep_alive"] }, when) end if areAlertsEnabled() then ts_utils.append("subnet:engaged_alerts", { ifid = ifstats.id, subnet = subnet, alerts = sstats["engaged_alerts"] }, when) end end end -- ######################################################## function ts_dump.iface_update_general_stats(when, ifstats, verbose) -- Score ts_utils.append("iface:score", { ifid = ifstats.id, srv_score = ifstats.score.score_as_srv, cli_score = ifstats.score.score_as_cli }, when) -- General stats ts_utils.append("iface:flows", { ifid = ifstats.id, num_flows = ifstats.stats.flows }, when) ts_utils.append("iface:alerted_flows", { ifid = ifstats.id, num_flows = ifstats.num_alerted_flows }, when) ts_utils.append("iface:new_flows", { ifid = ifstats.id, new_flows = ifstats.stats.new_flows }, when) if not ifstats.isViewed then -- Viewed interfaces don't have hosts, their hosts stay in the view ts_utils.append("iface:hosts", { ifid = ifstats.id, num_hosts = ifstats.stats.hosts }, when) ts_utils.append("iface:local_hosts", { ifid = ifstats.id, num_hosts = ifstats.stats.local_hosts }, when) ts_utils.append("iface:http_hosts", { ifid = ifstats.id, num_hosts = ifstats.stats.http_hosts }, when) if not ifstats.isView then ts_utils.append("iface:devices", { ifid = ifstats.id, num_devices = ifstats.stats.devices }, when) end end -- Alert stats if areAlertsEnabled() then ts_utils.append("iface:engaged_alerts", { ifid = ifstats.id, engaged_alerts = ifstats.num_alerts_engaged }, when) ts_utils.append("iface:dropped_alerts", { ifid = ifstats.id, dropped_alerts = ifstats.num_dropped_alerts }, when) end if ntop.isEnterpriseM() then if not ifstats["score_behavior"] or not ifstats["traffic_rx_behavior"] or not ifstats["traffic_tx_behavior"] then goto continue end -- Score Behaviour ts_utils.append("iface:score_behavior_v2", { ifid = ifstats.id, value = ifstats["score_behavior"]["value"], lower_bound = ifstats["score_behavior"]["lower_bound"], upper_bound = ifstats["score_behavior"]["upper_bound"] }, when) -- Score Anomalies local anomaly = 0 if ifstats["score_behavior"]["anomaly"] == true then anomaly = 1 end ts_utils.append("iface:score_anomalies_v2", { ifid = ifstats.id, anomaly = anomaly }, when) -- Traffic Behaviour ts_utils.append("iface:traffic_rx_behavior_v5", { ifid = ifstats.id, value = ifstats["traffic_rx_behavior"]["value"], lower_bound = ifstats["traffic_rx_behavior"]["lower_bound"], upper_bound = ifstats["traffic_rx_behavior"]["upper_bound"] }, when) ts_utils.append("iface:traffic_tx_behavior_v5", { ifid = ifstats.id, value = ifstats["traffic_tx_behavior"]["value"], lower_bound = ifstats["traffic_tx_behavior"]["lower_bound"], upper_bound = ifstats["traffic_tx_behavior"]["upper_bound"] }, when) -- Traffic Anomalies local anomaly = 0 if ifstats["traffic_tx_behavior"]["anomaly"] == true or ifstats["traffic_rx_behavior"]["anomaly"] == true then anomaly = 1 end ts_utils.append("iface:traffic_anomalies_v2", { ifid = ifstats.id, anomaly = anomaly }, when) ::continue:: end end function ts_dump.iface_update_l4_stats(when, ifstats, verbose) local l4_protocol_list = require "l4_protocol_list" for id, _ in pairs(l4_protocol_list.l4_keys) do k = l4_protocol_list.l4_keys[id][2] if ((ifstats.stats[k .. ".bytes.sent"] ~= nil) and (ifstats.stats[k .. ".bytes.rcvd"] ~= nil)) then ts_utils.append("iface:l4protos", { ifid = ifstats.id, -- NOTE: direction may not be correct for PCAP interfaces, so it cannot be split l4proto = tostring(k), bytes = ifstats.stats[k .. ".bytes.sent"] + ifstats.stats[k .. ".bytes.rcvd"] }, when) end end end function ts_dump.iface_update_flow_dump_stats(when, ifstats, verbose) ts_utils.append("iface:dumped_flows", { ifid = ifstats.id, dumped_flows = ifstats.stats.flow_export_count or 0, dropped_flows = ifstats.stats.flow_export_drops or 0 }, when) end function ts_dump.iface_update_tcp_stats(when, ifstats, verbose) ts_utils.append("iface:tcp_retransmissions", { ifid = ifstats.id, packets = ifstats.tcpPacketStats.retransmissions }, when) ts_utils.append("iface:tcp_out_of_order", { ifid = ifstats.id, packets = ifstats.tcpPacketStats.out_of_order }, when) ts_utils.append("iface:tcp_lost", { ifid = ifstats.id, packets = ifstats.tcpPacketStats.lost }, when) ts_utils.append("iface:tcp_keep_alive", { ifid = ifstats.id, packets = ifstats.tcpPacketStats.keep_alive }, when) end function ts_dump.iface_update_tcp_flags(when, ifstats, verbose) ts_utils.append("iface:tcp_syn", { ifid = ifstats.id, packets = ifstats.pktSizeDistribution.tcp_flags.syn }, when) ts_utils.append("iface:tcp_synack", { ifid = ifstats.id, packets = ifstats.pktSizeDistribution.tcp_flags.synack }, when) ts_utils.append("iface:tcp_finack", { ifid = ifstats.id, packets = ifstats.pktSizeDistribution.tcp_flags.finack }, when) ts_utils.append("iface:tcp_rst", { ifid = ifstats.id, packets = ifstats.pktSizeDistribution.tcp_flags.rst }, when) end -- ######################################################## function ts_dump.profiles_update_stats(when, ifstats, verbose) for pname, ptraffic in pairs(ifstats.profiles) do ts_utils.append("profile:traffic", { ifid = ifstats.id, profile = pname, bytes = ptraffic }, when) end end -- ######################################################## local function update_internals_hash_tables_stats(when, ifstats, verbose) local hash_tables_stats = interface.getHashTablesStats() for ht_name, ht_stats in pairs(hash_tables_stats) do local num_idle = 0 local num_active = 0 if ht_stats["hash_entry_states"] then if ht_stats["hash_entry_states"]["hash_entry_state_idle"] then num_idle = ht_stats["hash_entry_states"]["hash_entry_state_idle"] end if ht_stats["hash_entry_states"]["hash_entry_state_active"] then num_active = ht_stats["hash_entry_states"]["hash_entry_state_active"] end end ts_utils.append("ht:state", { ifid = ifstats.id, hash_table = ht_name, num_idle = num_idle, num_active = num_active }, when) end end -- ######################################################## function ts_dump.update_internals_periodic_activities_stats(when, ifstats, verbose) local periodic_scripts_stats = interface.getPeriodicActivitiesStats() local to_stdout = ntop.getPref("ntopng.prefs.periodic_activities_stats_to_stdout") == "1" for ps_name, ps_stats in pairs(periodic_scripts_stats) do if to_stdout then local cur_ifid = interface.getId() local cur_ifname local rrd_out if tostring(cur_ifid) == getSystemInterfaceId() then cur_ifname = getSystemInterfaceName() else cur_ifname = getInterfaceName(cur_ifid) end if ps_stats["timeseries"] and ps_stats["timeseries"]["write"] and ps_stats["timeseries"]["write"]["last"] then rrd_out = string.format( "[timeseries.write.tot_calls: %i][last_avg_call_duration_ms: %.2f][last_max_call_duration_ms: %.2f][timeseries.write.tot_drops: %u][last_is_slow: %s]", ps_stats["timeseries"]["write"]["tot_calls"] or 0, ps_stats["timeseries"]["write"]["last"]["avg_call_duration_ms"] or 0, ps_stats["timeseries"]["write"]["last"]["max_call_duration_ms"] or 0, ps_stats["timeseries"]["write"]["tot_drops"] or 0, tostring(ps_stats["timeseries"]["write"]["last"]["is_slow"])) end if rrd_out then traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("[ifname: %s][ifid: %i][%s]%s", cur_ifname, cur_ifid, ps_name, rrd_out)) end end -- Write info on the script duration local num_ms_last = 0 if ps_stats["duration"] then if ps_stats["duration"]["last_duration_ms"] then num_ms_last = ps_stats["duration"]["last_duration_ms"] end end ts_utils.append("periodic_script:duration", { ifid = ifstats.id, periodic_script = ps_name, num_ms_last = num_ms_last }, when) -- Only if RRD is enabled, also total number of writes and dropped points are written if ts_utils.getDriverName() == "rrd" then if ps_stats["timeseries"] and ps_stats["timeseries"]["write"] then local tot_calls = ps_stats["timeseries"]["write"]["tot_calls"] or 0 local tot_drops = ps_stats["timeseries"]["write"]["tot_drops"] or 0 -- Do not generate nor update the timeseries if no point has been written or dropped -- to prevent generation of empty files and empty timeseries if tot_calls + tot_drops > 0 then ts_utils.append("periodic_script:timeseries_writes", { ifid = ifstats.id, periodic_script = ps_name, writes = tot_calls, drops = tot_drops }, when) end end end end end -- ######################################################## function ts_dump.containers_update_stats(when, ifstats, verbose) local containers_stats = interface.getContainersStats() for container_id, container in pairs(containers_stats) do ts_utils.append("container:num_flows", { ifid = ifstats.id, container = container_id, as_client = container["num_flows.as_client"], as_server = container["num_flows.as_server"] }, when) ts_utils.append("container:rtt", { ifid = ifstats.id, container = container_id, as_client = container["rtt_as_client"], as_server = container["rtt_as_server"] }, when) ts_utils.append("container:rtt_variance", { ifid = ifstats.id, container = container_id, as_client = container["rtt_variance_as_client"], as_server = container["rtt_variance_as_server"] }, when) end end -- ######################################################## function ts_dump.pods_update_stats(when, ifstats, verbose) local pods_stats = interface.getPodsStats() for pod_id, pod in pairs(pods_stats) do ts_utils.append("pod:num_containers", { ifid = ifstats.id, pod = pod_id, num_containers = pod["num_containers"] }, when) ts_utils.append("pod:num_flows", { ifid = ifstats.id, pod = pod_id, as_client = pod["num_flows.as_client"], as_server = pod["num_flows.as_server"] }, when) ts_utils.append("pod:rtt", { ifid = ifstats.id, pod = pod_id, as_client = pod["rtt_as_client"], as_server = pod["rtt_as_server"] }, when) ts_utils.append("pod:rtt_variance", { ifid = ifstats.id, pod = pod_id, as_client = pod["rtt_variance_as_client"], as_server = pod["rtt_variance_as_server"] }, when) end end -- ######################################################## local function dumpTopTalkers(_ifname, ifstats, verbose) local top_talkers_utils = require "top_talkers_utils" -- Dump topTalkers every minute local talkers = top_talkers_utils.makeTopJson(_ifname) if talkers then if (verbose) then print("Computed talkers for interfaceId " .. ifstats.id .. "/" .. ifstats.name .. "\n") print(talkers) end ntop.insertMinuteSampling(ifstats.id, talkers) end end -- ######################################################## function ts_dump.iface_update_anomalies(when, ifstats, verbose) if not ifstats.isViewed then ts_utils.append("iface:hosts_anomalies", { ifid = ifstats.id, num_loc_hosts_anom = ifstats.anomalies.num_local_hosts_anomalies, num_rem_hosts_anom = ifstats.anomalies.num_remote_hosts_anomalies }, when) end end -- ######################################################## function ts_dump.run_min_dump(_ifname, ifstats, when, verbose) local config = getMinTSConfig() dumpTopTalkers(_ifname, ifstats, verbose) local iface_rrd_creation_enabled = areInterfaceTimeseriesEnabled(ifstats.id) if not iface_rrd_creation_enabled then return end ts_dump.subnet_update_rrds(when, ifstats, verbose) ts_dump.iface_update_stats_rrds(when, _ifname, ifstats, verbose) ts_dump.iface_update_general_stats(when, ifstats, verbose) ts_dump.iface_update_l4_stats(when, ifstats, verbose) ts_dump.iface_update_anomalies(when, ifstats, verbose) -- Check both if global flows dump is enabled (config.is_dump_flows_enabled) -- and also if flows dump is enabled for the current interface (ifstats.isFlowDumpDisabled) if config.is_dump_flows_enabled and ifstats.isFlowDumpDisabled == false then ts_dump.iface_update_flow_dump_stats(when, ifstats, verbose) end if not ifstats.has_seen_ebpf_events and not ifstats.isSampledTraffic then ts_dump.iface_update_tcp_flags(when, ifstats, verbose) ts_dump.iface_update_tcp_stats(when, ifstats, verbose) end if config.interface_ndpi_timeseries_creation == "per_protocol" or config.interface_ndpi_timeseries_creation == "both" then ts_dump.iface_update_ndpi_rrds(when, _ifname, ifstats, verbose, config) end -- create custom rrds if ts_custom and ts_custom.iface_update_stats then ts_custom.iface_update_stats(when, _ifname, ifstats, verbose) end if config.interface_ndpi_timeseries_creation == "per_category" or config.interface_ndpi_timeseries_creation == "both" then ts_dump.iface_update_categories_rrds(when, _ifname, ifstats, verbose) end -- Only if the internal telemetry preference is enabled... if config.internals_rrd_creation then -- Save internal hash tables states every minute update_internals_hash_tables_stats(when, ifstats, verbose) -- Save duration of periodic activities ts_dump.update_internals_periodic_activities_stats(when, ifstats, verbose) end -- Save Profile stats every minute if ntop.isPro() and ifstats.profiles then -- profiles are only available in the Pro version ts_dump.profiles_update_stats(when, ifstats, verbose) end -- Containers/Pods stats if ifstats.has_seen_containers then ts_dump.containers_update_stats(when, ifstats, verbose) end if ifstats.has_seen_pods then ts_dump.pods_update_stats(when, ifstats, verbose) end if ntop.isnEdge() and ifstats.type == "netfilter" and ifstats.netfilter then local st = ifstats.netfilter.nfq or {} ts_utils.append("iface:nfq_pct", { ifid = ifstats.id, num_nfq_pct = st.queue_pct }, when) end end -- ######################################################## return ts_dump