EVOLUTION-MANAGER
Edit File: recipients.lua
-- -- (C) 2017-24 - ntop.org -- local dirs = ntop.getDirs() package.path = dirs.installdir .. "/scripts/lua/modules/pools/?.lua;" .. package.path require "check_redis_prefs" require "locales_utils" require "label_utils" local alert_consts = require "alert_consts" local json = require "dkjson" local alert_entity_builders = require "alert_entity_builders" -- ################################# -- These are structs local alert_severities = require "alert_severities" local alert_entities = require "alert_entities" local alert_categories = require "alert_categories" -- ################################# local last_error_notification = 0 local MIN_ERROR_DELAY = 60 -- 1 minute local ERROR_KEY = "ntopng.cache.%s.error_time" local do_trace = false -- Enable debug with: -- redis-cli set "ntopng.prefs.vs.notifications_debug_enabled" "1" -- systemctl restart ntopng local debug_vs = ntop.getCache("ntopng.prefs.vs.notifications_debug_enabled") == "1" -- ############################################## local recipients = {} -- ############################################## recipients.MAX_NUM_RECIPIENTS = 64 -- Keep in sync with ntop_defines.h MAX_NUM_RECIPIENTS -- ############################################## recipients.FIRST_RECIPIENT_CREATED_CACHE_KEY = "ntopng.prefs.endpoint_hints.recipient_created" local default_builtin_minimum_severity = alert_severities.notice.severity_id -- minimum severity is notice (to avoid flooding) (*****) -- ############################################## function recipients.get_notification_types() local notification_types = { alerts = { title = i18n('endpoint_notifications.alerts'), icon = 'fas fa-lg fa-exclamation-triangle text-warning' }, reports = { title = i18n('report.reports'), icon = 'fa-regular fa-newspaper' }, vulnerability_scans = { title = i18n('hosts_stats.page_scan_hosts.vulnerability_scan_reports'), icon = 'fa-solid fa-clipboard' } } return notification_types end -- ############################################## local function debug_print(msg) if not do_trace then return end traceError(TRACE_NORMAL, TRACE_CONSOLE, msg) end -- ############################################## -- Processes queued alerts and returns the information necessary to store them. -- Alerts are only enqueued by AlertsQueue in C. From lua, the alerts_api -- can be called directly as slow operations will be postponed local function processStoreAlertFromQueue(alert) require "lua_utils_get" local entity_info = nil local type_info = nil interface.select(tostring(alert.ifid)) if (alert.alert_id == "misconfigured_dhcp_range") then local router_info = { host = alert.router_ip, vlan = alert.vlan_id } entity_info = alert_entity_builders.hostAlertEntity(alert.client_ip, alert.vlan_id) type_info = alert_consts.alert_types.alert_ip_outsite_dhcp_range.new(router_info, alert.mac_address, alert.client_mac, alert.sender_mac) type_info:set_score_warning() type_info:set_subtype(string.format("%s_%s_%s", hostinfo2hostkey(router_info), alert.client_mac, alert.sender_mac)) elseif (alert.alert_id == "mac_ip_association_change") then local name = getDeviceName(alert.new_mac) entity_info = alert_entity_builders.macEntity(alert.new_mac) type_info = alert_consts.alert_types.alert_mac_ip_association_change.new(name, alert.ip, alert.old_mac, alert.new_mac) type_info:set_score(100) type_info:set_subtype(string.format("%s_%s_%s", alert.ip, alert.old_mac, alert.new_mac)) elseif (alert.alert_id == "login_failed") then entity_info = alert_entity_builders.userEntity(alert.user) type_info = alert_consts.alert_types.alert_login_failed.new() type_info:set_score_warning() elseif (alert.alert_id == "broadcast_domain_too_large") then entity_info = alert_entity_builders.macEntity(alert.src_mac) type_info = alert_consts.alert_types.alert_broadcast_domain_too_large.new(alert.src_mac, alert.dst_mac, alert.vlan_id, alert.spa, alert.tpa) type_info:set_score_warning() type_info:set_subtype(string.format("%u_%s_%s_%s_%s", alert.vlan_id, alert.src_mac, alert.spa, alert.dst_mac, alert.tpa)) elseif ((alert.alert_id == "user_activity") and (alert.scope == "login")) then entity_info = alert_entity_builders.userEntity(alert.user) type_info = alert_consts.alert_types.alert_user_activity.new("login", nil, nil, nil, "authorized") type_info:set_score_notice() type_info:set_subtype("login//") elseif (alert.alert_id == "nfq_flushed") then entity_info = alert_entity_builders.interfaceAlertEntity(alert.ifid) type_info = alert_consts.alert_types.alert_nfq_flushed.new(getInterfaceName(alert.ifid), alert.pct, alert.tot, alert.dropped) type_info:set_score_error() elseif (alert.alert_id == "cloud_disconnected") then entity_info = alert_entity_builders.systemEntity("cloud") type_info = alert_consts.alert_types.alert_cloud_disconnected.new(alert.description) type_info:set_score_warning() elseif (alert.alert_id == "cloud_reconnected") then entity_info = alert_entity_builders.systemEntity("cloud") type_info = alert_consts.alert_types.alert_cloud_reconnected.new(alert.description) type_info:set_score_notice() elseif (alert.alert_id == "snmp_trap") then entity_info = alert_entity_builders.snmpDeviceEntity(alert.device) type_info = alert_consts.alert_types.alert_snmp_trap.new(alert.device, alert.description) type_info:set_score_warning() else traceError(TRACE_ERROR, TRACE_CONSOLE, "Unknown alert type " .. (alert.alert_id or "")) end local category = alert_consts.get_category_by_id(alert.alert_category or 0) type_info:set_category(category) return entity_info, type_info end -- ############################################## -- @brief Process notifications arriving from the internal C queue -- Such notifications are transformed into stored alerts local function process_notifications_from_c_queue() local budget = 1024 -- maximum 1024 alerts per call local budget_used = 0 -- Check for alerts pushed by the datapath to an internal queue (from C) -- and store them (push them to the SQLite and Notification queues). -- NOTE: this is executed in a system VM, with no interfaces references while budget_used <= budget do local alert = ntop.popInternalAlerts() if alert == nil then break end if (verbose) then tprint(alert) end local entity_info, type_info = processStoreAlertFromQueue(alert) if type_info and entity_info then type_info:store(entity_info) end budget_used = budget_used + 1 end end -- ############################################## -- @brief Performs Initialization operations performed during startup function recipients.initialize() local endpoints = require "endpoints" -- Initialize builtin recipients, that is, recipients always existing an not editable from the UI -- For each builtin configuration type, a configuration and a recipient is created -- Add categories local all_categories = {} for _, category in pairs(alert_categories) do all_categories[#all_categories + 1] = category.id end -- Add entities local all_entities = {} for _, entity_info in pairs(alert_entities) do all_entities[#all_entities + 1] = entity_info.entity_id end local host_pools = require "host_pools":create() -- Add host pools local all_host_pools = {} local pools = host_pools:get_all_pools() for _, pool in pairs(pools) do all_host_pools[#all_host_pools + 1] = pool.pool_id end -- Add active monitoring hosts local all_am_hosts = {} -- No hosts by default for endpoint_key, endpoint in pairs(endpoints.get_types()) do if endpoint.builtin then -- Delete (if existing) the old, string-keyed endpoint configuration endpoints.delete_config("builtin_config_" .. endpoint_key) -- Add the configuration local res = endpoints.add_config(endpoint_key --[[ the type of the endpoint--]] , "builtin_endpoint_" .. endpoint_key --[[ the name of the endpoint configuration --]] , {} --[[ no default params --]] ) -- Endpoint successfully created (or existing) if res and res.endpoint_id then -- And the recipient local recipient_res = recipients.add_recipient(res.endpoint_id --[[ the id of the endpoint --]] , "builtin_recipient_" .. endpoint_key --[[ the name of the endpoint recipient --]] , all_categories, all_entities, default_builtin_minimum_severity, all_host_pools, -- host pools all_am_hosts, -- active monitoring hosts {} --[[ no recipient params --]] ) end end end -- Delete (if existing) the old, string-keyed recipient and endpoint local sqlite_recipient = recipients.get_recipient_by_name("builtin_recipient_sqlite") if sqlite_recipient then recipients.delete_recipient(sqlite_recipient.recipient_id) end endpoints.delete_config("builtin_config_sqlite") -- Register all existing recipients in C to make sure ntopng can start with all the -- existing recipients properly loaded and ready for notification enqueues/dequeues local alert_store_db_recipient = recipients.get_recipient_by_name("builtin_recipient_alert_store_db") if (alert_store_db_recipient.recipient_id ~= 0) then print("WARNING ntopng found some inconsistencies in your recipient configuration\n") print("WARNING Please factory reset the recipient and endpoint configuration\n") alert_store_db_recipient.recipient_id = 0 -- setting it to the default value end for _, recipient in pairs(recipients.get_all_recipients()) do local flow_alert_types = nil local host_alert_types = nil local other_alert_types = nil local notifications_type = recipient.notifications_type or "alerts" if recipient.checks and table.len(recipient.checks) > 0 then for family, alerts in pairs(recipient.checks) do if #alerts > 0 then if family == "flow" then flow_alert_types = table.concat(alerts, ",") elseif family == "host" then host_alert_types = table.concat(alert, ",") else -- other if not isEmptyString(other_alert_types) then other_alert_types = other_alert_types .. "," .. table.concat(alerts, ",") else other_alert_types = table.concat(alerts, ",") end end end end end ntop.recipient_register(recipient.recipient_id, recipient.minimum_severity, table.concat(recipient.check_categories, ','), table.concat(recipient.host_pools, ','), table.concat(recipient.check_entities, ','), flow_alert_types, host_alert_types, other_alert_types, ternary((notifications_type ~= "alerts"), true --[[skip alerts]] , false --[[dont skip alerts]] )) end end -- ############################################## local function _get_recipients_lock_key() local key = string.format("ntopng.cache.recipients.recipients_lock") return key end -- ############################################## local function _lock() local max_lock_duration = 5 -- seconds local max_lock_attempts = 5 -- give up after at most this number of attempts local lock_key = _get_recipients_lock_key() for i = 1, max_lock_attempts do local value_set = ntop.setnxCache(lock_key, "1", max_lock_duration) if value_set then return true -- lock acquired end ntop.msleep(1000) end return false -- lock not acquired end -- ############################################## local function _unlock() ntop.delCache(_get_recipients_lock_key()) end -- ############################################## local function _get_recipients_prefix_key() local key = string.format("ntopng.prefs.recipients") return key end -- ############################################## local function _get_recipient_ids_key() local key = string.format("%s.recipient_ids", _get_recipients_prefix_key()) return key end -- ############################################## local function _get_recipient_details_key(recipient_id) recipient_id = tonumber(recipient_id) if not recipient_id then -- A recipient id is always needed return nil end local key = string.format("%s.recipient_id_%d.details", _get_recipients_prefix_key(), recipient_id) return key end -- ############################################## -- @brief Returns an array with all the currently assigned recipient ids local function _get_assigned_recipient_ids() local res = {} local cur_recipient_ids = ntop.getMembersCache(_get_recipient_ids_key()) for _, cur_recipient_id in pairs(cur_recipient_ids) do cur_recipient_id = tonumber(cur_recipient_id) res[#res + 1] = cur_recipient_id end return res end -- ############################################## local function _assign_recipient_id() local cur_recipient_ids = _get_assigned_recipient_ids() local next_recipient_id -- Create a Lua table with currently assigned recipient ids as keys -- to ease the lookup local ids_by_key = {} for _, recipient_id in pairs(cur_recipient_ids) do ids_by_key[recipient_id] = true end -- Lookup for the first (smallest) available recipient id. -- This is to effectively recycle recipient ids no longer used, that is, -- belonging to deleted recipients for i = 0, recipients.MAX_NUM_RECIPIENTS - 1 do if not ids_by_key[i] then next_recipient_id = i break end end if next_recipient_id then -- Add the atomically assigned recipient id to the set of current recipient ids (set wants a string) ntop.setMembersCache(_get_recipient_ids_key(), string.format("%d", next_recipient_id)) else -- All recipient ids exhausted end return next_recipient_id end -- ############################################## -- @brief Coherence checks for the endpoint configuration parameters -- @param endpoint_key A string with the notification endpoint key -- @param recipient_params A table with endpoint recipient params that will be possibly sanitized -- @return false with a description of the error, or true, with a table containing sanitized configuration params. local function check_endpoint_recipient_params(endpoint_key, recipient_params) local endpoints = require "endpoints" if not recipient_params or not type(recipient_params) == "table" then return false, { status = "failed", error = { type = "invalid_recipient_params" } } end -- Create a safe_params table with only expected params local safe_params = {} -- So iterate across all expected params of the current endpoint for _, param in ipairs(endpoints.get_types()[endpoint_key].recipient_params) do -- param is a lua table so we access its elements local param_name = param["param_name"] if param_name then local optional = param["optional"] if recipient_params and recipient_params[param_name] and not safe_params[param_name] then safe_params[param_name] = recipient_params[param_name] elseif not optional then return false, { status = "failed", error = { type = "missing_mandatory_param", missing_param = param_name } } end end end return true, { status = "OK", safe_params = safe_params } end -- ############################################## -- @brief Set a configuration along with its params. Configuration name and params must be already sanitized -- @param endpoint_id An integer identifier of the endpoint -- @param endpoint_recipient_name A string with the recipient name -- @param check_categories A Lua array with already-validated ids as found in `checks.check_categories` or nil to indicate all categories -- @param check_entities A Lua array with already-validated ids as found in `checks.check_entities` or nil to indicate all entities -- @param minimum_severity An already-validated integer alert severity id as found in `alert_severities` or nil to indicate no minimum severity -- @param checks An already-validated integer alert id or nil to indicate filter on the alert ids -- @param safe_params A table with endpoint recipient params already sanitized -- @return nil local function _set_endpoint_recipient_params(recipient_data, safe_params) -- Write the endpoint recipient config into another hash local k = _get_recipient_details_key(recipient_data.recipient_id) -- Add the preference to silence the same alerts for a specific recipient, by default is set to true (1) ntop.setCache("ntopng.prefs.silence_multiple_alerts." .. recipient_data.recipient_id, ternary(recipient_data.silence_alerts and recipient_data.silence_alerts == "false", '0', '1')) ntop.setCache(k, json.encode({ endpoint_id = recipient_data.endpoint_id, recipient_name = recipient_data.endpoint_recipient_name, check_categories = recipient_data.check_categories, check_entities = recipient_data.check_entities, minimum_severity = recipient_data.minimum_severity, host_pools = recipient_data.host_pools_ids, am_hosts = recipient_data.am_hosts_ids, checks = recipient_data.checks, silence_alerts = recipient_data.silence_alerts, notifications_type = recipient_data.notifications_type, recipient_params = safe_params })) return recipient_data.recipient_id end -- ############################################## local function format_recipient_checks(checks_list) if isEmptyString(checks_list) then return nil end local list = checks_list:split(",") or {checks_list} local formatted_list = {} local num_alerts = 0 for _, check in pairs(list or {}) do local alert_info = check:split("_") if table.len(alert_info) == 2 then local alert_id = alert_info[1] local entity_id = alert_info[2] local entity = alert_consts.alertEntityRaw(entity_id) if not formatted_list[entity] then formatted_list[entity] = {} end formatted_list[entity][#formatted_list[entity] + 1] = alert_id num_alerts = num_alerts + 1 end end if num_alerts == 0 then return nil end return formatted_list end -- ############################################## -- @brief Add a new recipient of an existing endpoint configuration and returns its id -- @param endpoint_id An integer identifier of the endpoint -- @param endpoint_recipient_name A string with the recipient name -- @param check_categories A Lua array with already-validated ids as found in `checks.check_categories` or nil to indicate all categories -- @param check_entities A Lua array with already-validated ids as found in `checks.check_entities` or nil to indicate all entities -- @param minimum_severity An already-validated integer alert severity id as found in `alert_severities` or nil to indicate no minimum severity -- @param recipient_params A table with endpoint recipient params that will be possibly sanitized -- @return A table with a key status which is either "OK" or "failed", and the recipient id assigned to the newly added recipient. When "failed", the table contains another key "error" with an indication of the issue function recipients.add_recipient(endpoint_id, endpoint_recipient_name, check_categories, check_entities, minimum_severity, host_pools_ids, am_hosts_ids, recipient_params) local endpoints = require "endpoints" local locked = _lock() local res = { status = "failed", error = { type = "internal_error" } } if locked then local ec = endpoints.get_endpoint_config(endpoint_id) if ec["status"] == "OK" and endpoint_recipient_name then -- Is the endpoint already existing? local same_recipient = recipients.get_recipient_by_name(endpoint_recipient_name) if same_recipient then res = { status = "failed", error = { type = "endpoint_recipient_already_existing", endpoint_recipient_name = endpoint_recipient_name } } else local endpoint_key = ec["endpoint_key"] local ok, status = check_endpoint_recipient_params(endpoint_key, recipient_params) if ok then local safe_params = status["safe_params"] -- Get the list of checks to deliver the alerts local checks = format_recipient_checks(recipient_params["recipient_checks"] or "") local silence_alerts = recipient_params["recipient_silence_multiple_alerts"] local notifications_type = recipient_params["recipient_notifications_type"] or "alerts" local flow_alert_types = nil local host_alert_types = nil local other_alert_types = nil if checks and table.len(checks) > 0 then for family, alerts in pairs(checks) do if #alerts > 0 then if family == "flow" then flow_alert_types = table.concat(alerts, ",") elseif family == "host" then host_alert_types = table.concat(alert, ",") else -- other if not isEmptyString(other_alert_types) then other_alert_types = other_alert_types .. "," .. table.concat(alerts, ",") else other_alert_types = table.concat(alerts, ",") end end end end end -- Assign the recipient id local recipient_id = _assign_recipient_id() -- Persist the configuration _set_endpoint_recipient_params({ endpoint_id = endpoint_id, recipient_id = recipient_id, endpoint_recipient_name = endpoint_recipient_name, check_categories = check_categories, check_entities = check_entities, minimum_severity = minimum_severity, host_pools_ids = host_pools_ids, am_hosts_ids = am_hosts_ids, silence_alerts = silence_alerts, checks = checks, notifications_type = notifications_type }, safe_params) -- Finally, register the recipient in C so we can start enqueuing/dequeuing notifications ntop.recipient_register(recipient_id, minimum_severity, table.concat(check_categories, ','), table.concat(host_pools_ids, ','), table.concat(check_entities, ','), flow_alert_types, -- Flow Alerts bitmap host_alert_types, -- Host Alerts bitmap other_alert_types, -- Other Alerts bitmap ternary((notifications_type ~= "alerts"), true --[[skip alerts]] , false --[[dont skip alerts]] )) -- Set a flag to indicate that a recipient has been created if not ec.endpoint_conf.builtin and isEmptyString(ntop.getPref(recipients.FIRST_RECIPIENT_CREATED_CACHE_KEY)) then ntop.setPref(recipients.FIRST_RECIPIENT_CREATED_CACHE_KEY, "1") end res = { status = "OK", recipient_id = recipient_id } else res = status end end else res = { status = "failed", error = { type = "bad_endpoint" } } end _unlock() end return res end -- ############################################## -- @brief Edit the recipient parameters of an existing endpoint configuration -- @param recipient_id The integer recipient identificator -- @param endpoint_recipient_name A string with the recipient name -- @param check_categories A Lua array with already-validated ids as found in `checks.check_categories` or nil to indicate all categories -- @param minimum_severity An already-validated integer alert severity id as found in `alert_severities` or nil to indicate no minimum severity -- @param recipient_params A table with endpoint recipient params that will be possibly sanitized -- @return A table with a key status which is either "OK" or "failed". When "failed", the table contains another key "error" with an indication of the issue function recipients.edit_recipient(recipient_id, endpoint_recipient_name, check_categories, check_entities, minimum_severity, host_pools_ids, am_hosts_ids, recipient_params) local endpoints = require "endpoints" local locked = _lock() local res = { status = "failed" } if locked then local rc = recipients.get_recipient(recipient_id) if not rc then res = { status = "failed", error = { type = "endpoint_recipient_not_existing", endpoint_recipient_name = endpoint_recipient_name } } else local ec = endpoints.get_endpoint_config(rc["endpoint_id"]) if ec["status"] ~= "OK" then res = ec else -- Are the submitted params those expected by the endpoint? local ok, status = check_endpoint_recipient_params(ec["endpoint_key"], recipient_params) if not ok then res = status else local safe_params = status["safe_params"] local checks = format_recipient_checks(recipient_params["recipient_checks"] or "") local silence_alerts = recipient_params["recipient_silence_multiple_alerts"] local notifications_type = recipient_params["recipient_notifications_type"] or "alerts" local flow_alert_types = nil local host_alert_types = nil local other_alert_types = nil if checks and table.len(checks) > 0 then for family, alerts in pairs(checks) do if #alerts > 0 then if family == "flow" then flow_alert_types = table.concat(alerts, ",") elseif family == "host" then host_alert_types = table.concat(alert, ",") else -- other if not isEmptyString(other_alert_types) then other_alert_types = other_alert_types .. "," .. table.concat(alerts, ",") else other_alert_types = table.concat(alerts, ",") end end end end end _set_endpoint_recipient_params({ endpoint_id = rc["endpoint_id"], recipient_id = recipient_id, endpoint_recipient_name = endpoint_recipient_name, check_categories = check_categories, check_entities = check_entities, minimum_severity = minimum_severity, host_pools_ids = host_pools_ids, am_hosts_ids = am_hosts_ids, silence_alerts = silence_alerts, checks = checks, notifications_type = notifications_type }, safe_params) -- Finally, register the recipient in C to make sure also the C knows about this edit -- and periodic scripts can be reloaded ntop.recipient_register(tonumber(recipient_id), minimum_severity, table.concat(check_categories, ','), table.concat(host_pools_ids, ','), table.concat(check_entities, ','), flow_alert_types, host_alert_types, other_alert_types, ternary((notifications_type ~= "alerts"), true --[[skip alerts]] , false --[[dont skip alerts]] )) res = { status = "OK" } end end end _unlock() end return res end -- ############################################## function recipients.delete_recipient(recipient_id) recipient_id = tonumber(recipient_id) local ret = false local locked = _lock() if locked then -- Make sure the recipient exists local cur_recipient_details = recipients.get_recipient(recipient_id) if cur_recipient_details then -- Remove the key with all the recipient details (e.g., with members) ntop.delCache(_get_recipient_details_key(recipient_id)) -- Remove the recipient_id from the set of all currently existing recipient ids ntop.delMembersCache(_get_recipient_ids_key(), string.format("%d", recipient_id)) -- Finally, remove the recipient from C ntop.recipient_delete(recipient_id) ret = true end _unlock() end return ret end -- ############################################## -- @brief Delete all recipients having the given `endpoint_id` -- @param endpoint_id An integer identifier of the endpoint -- @return nil function recipients.delete_recipients_by_conf(endpoint_id) local ret = false local all_recipients = recipients.get_all_recipients() for _, recipient in pairs(all_recipients) do -- Use tostring for backwards compatibility if tostring(recipient.endpoint_id) == tostring(endpoint_id) then recipients.delete_recipient(recipient.recipient_id) end end end -- ############################################## -- @brief Get all recipients having the given `endpoint_conf_name` -- @param endpoint_id An integer identifier of the endpoint -- @return A lua array with recipients function recipients.get_recipients_by_conf(endpoint_id, include_stats) local res = {} local all_recipients = recipients.get_all_recipients(false, include_stats) for _, recipient in pairs(all_recipients) do -- Use tostring for backward compatibility, to handle -- both integer and string endpoint_id if tostring(recipient.endpoint_id) == tostring(endpoint_id) then res[#res + 1] = recipient end end return res end -- ################################################################# function recipients.test_recipient(endpoint_id, recipient_params) local endpoints = require "endpoints" -- Get endpoint config local ec = endpoints.get_endpoint_config(endpoint_id) if ec["status"] ~= "OK" then return ec end -- Check recipient parameters local endpoint_key = ec["endpoint_key"] ok, status = check_endpoint_recipient_params(endpoint_key, recipient_params) if not ok then return status end local safe_params = status["safe_params"] -- Create test recipient local recipient = { endpoint_id = ec["endpoint_id"], endpoint_conf_name = ec["endpoint_conf_name"], endpoint_conf = ec["endpoint_conf"], endpoint_key = ec["endpoint_key"], recipient_params = safe_params } -- Get endpoint module local modules_by_name = endpoints.get_types() local module_name = recipient.endpoint_key local m = modules_by_name[module_name] if not m then return { status = "failed", error = { type = "endpoint_module_not_existing", endpoint_recipient_name = recipient.endpoint_conf.endpoint_key } } end -- Run test if not m.runTest then return { status = "failed", error = { type = "endpoint_test_not_available", endpoint_recipient_name = recipient.endpoint_conf.endpoint_key } } end local success, message = m.runTest(recipient) if success then return { status = "OK" } else return { status = "failed", error = { type = "endpoint_test_failure", message = message } } end end -- ############################################## function recipients.get_recipient(recipient_id, include_stats) local endpoints = require "endpoints" local recipient_details local recipient_details_key = _get_recipient_details_key(recipient_id) -- Attempt at retrieving the recipient details key and at decoding it from JSON if recipient_details_key then local recipient_details_str = ntop.getCache(recipient_details_key) recipient_details = json.decode(recipient_details_str) if recipient_details then -- Add the integer recipient id recipient_details["recipient_id"] = tonumber(recipient_id) -- Add also the endpoint configuration name -- Use the endpoint id to get the endpoint configuration (use endpoint_conf_name for the old endpoints) local ec = endpoints.get_endpoint_config(recipient_details["endpoint_id"] or recipient_details["endpoint_conf_name"]) recipient_details["endpoint_conf_name"] = ec["endpoint_conf_name"] recipient_details["endpoint_id"] = ec["endpoint_id"] -- Add check categories. nil or empty check categories read from the JSON imply ANY AVAILABLE category if not recipient_details["check_categories"] or #recipient_details["check_categories"] == 0 then if not recipient_details["check_categories"] then recipient_details["check_categories"] = {} end for _, category in pairs(alert_categories) do recipient_details["check_categories"][#recipient_details["check_categories"] + 1] = category.id end end -- Add check entities. nil or empty check entities read from the JSON imply ANY AVAILABLE entity if not recipient_details["check_entities"] or #recipient_details["check_entities"] == 0 then if not recipient_details["check_entities"] then recipient_details["check_entities"] = {} end for _, entity_info in pairs(alert_entities) do recipient_details["check_entities"][#recipient_details["check_entities"] + 1] = entity_info.entity_id end end -- Add host pools if not recipient_details["host_pools"] then local host_pools = require "host_pools":create() local pools = host_pools:get_all_pools() if (recipient_details["host_pools"] == nil) then recipient_details["host_pools"] = {} end for _, pool in pairs(pools) do recipient_details["host_pools"][#recipient_details["host_pools"] + 1] = pool.pool_id end end -- Add active monitoring hosts if not recipient_details["am_hosts"] then -- No hosts by default recipient_details["am_hosts"] = {} end -- Add minimum alert severity. nil or empty minimum severity assumes a minimum severity of notice if not tonumber(recipient_details["minimum_severity"]) then recipient_details["minimum_severity"] = default_builtin_minimum_severity end if ec then recipient_details["endpoint_conf"] = ec["endpoint_conf"] recipient_details["endpoint_key"] = ec["endpoint_key"] local modules_by_name = endpoints.get_types() local cur_module = modules_by_name[recipient_details["endpoint_key"]] if cur_module and cur_module.format_recipient_params then -- Add a formatted output of recipient params recipient_details["recipient_params_fmt"] = cur_module.format_recipient_params(recipient_details["recipient_params"]) else -- A default recipient_details["recipient_params_fmt"] = "" end end if include_stats then -- Read stats from C recipient_details["stats"] = ntop.recipient_stats(recipient_details["recipient_id"]) end end end -- Upon success, recipient details are returned, otherwise nil return recipient_details end -- ############################################## function recipients.get_all_recipients(exclude_builtin, include_stats) local res = {} local cur_recipient_ids = _get_assigned_recipient_ids() for _, recipient_id in pairs(cur_recipient_ids) do local recipient_details = recipients.get_recipient(recipient_id, include_stats) if recipient_details and (not exclude_builtin or not recipient_details.endpoint_conf.builtin) then res[#res + 1] = recipient_details end end return res end -- ############################################## function recipients.get_recipient_by_name(name) local cur_recipient_ids = _get_assigned_recipient_ids() for _, recipient_id in pairs(cur_recipient_ids) do local recipient_details = recipients.get_recipient(recipient_id) if recipient_details and recipient_details["recipient_name"] and recipient_details["recipient_name"] == name then return recipient_details end end return nil end -- ############################################## local builtin_recipients_cache function recipients.get_builtin_recipients() -- Currently, only sqlite is the builtin recipient -- created in startup.lua calling recipients.initialize() if not builtin_recipients_cache then local alert_store_db_recipient = recipients.get_recipient_by_name("builtin_recipient_alert_store_db") builtin_recipients_cache = {alert_store_db_recipient.recipient_id} end return builtin_recipients_cache end -- ############################################## local function get_notification_category(notification, current_script) -- Category is first read from the current_script. If no current_script is found (e.g., for -- alerts generated from the C++ core such as start after anomalous termination), the category -- is guessed from the alert entity. local entity_id = notification.entity_id local cur_category_id if current_script and current_script.category and current_script.category.id then -- Found in the script cur_category_id = current_script.category.id else --- Determined from the entity if entity_id == alert_entities.system.entity_id then -- System alert entity becomes system cur_category_id = alert_categories.system.id else -- All other entities fall into other category cur_category_id = alert_categories.other.id end end return cur_category_id or alert_categories.other.id end function recipients.format_checks_list(recipients) for _, recipient in pairs(recipients or {}) do local check_list = {} for entity, alert_list in pairs(recipient.checks or {}) do for _, alert in pairs(alert_list or {}) do check_list[#check_list + 1] = string.format("%d_%d", alert, alert_entities[entity].entity_id) end end recipient.checks = check_list end return recipients end -- ############################################## -- @brief This function deliver a notification to a specific recipient id -- without checking for filters, ecc. -- @param notification An notification in table format -- @param name The recipient name to which send the notification -- @return boolean function recipients.sendMessageByRecipientName(notification, name) local recipient = recipients.get_recipient_by_name(name) if recipient and recipient.recipient_id then recipients.dispatch_notification(notification, nil, nil, recipient.recipient_id) return true end traceError(TRACE_NORMAL, TRACE_CONSOLE, "Trying to deliver a notification to a non-existing recipient " .. name) return false end -- ############################################## -- @brief This function deliver a notification to all recipients with -- the same notification_type configured -- @param notification An notification in table format -- @param notification_type The notification type -- @return boolean function recipients.sendMessageByNotificationType(notification, notification_type) notification.notification_type = notification_type return recipients.dispatch_notification(notification, nil, notification_type, nil) end -- ############################################# -- @brief Dispatches a `notification` to all the interested recipients -- Note: this is similar to RecipientQueue::enqueue does in C++) -- @param notification An alert notification -- @param current_script The user script which has triggered this notification - can be nil if the script is unknown or not available -- @return nil function recipients.dispatch_notification(notification, current_script, notification_type, recipient_id) local is_vs = (notification_type == 'vulnerability_scans') if not notification then -- traceError(TRACE_ERROR, TRACE_CONSOLE, "Internal error. Empty notification") -- tprint(debug.traceback()) end if debug_vs and is_vs then traceError(TRACE_NORMAL, TRACE_CONSOLE, "VS: dispatching notification") end if not notification.score then notification.score = 0 end local notification_category = get_notification_category(notification, current_script) local recipients = recipients.get_all_recipients() if #recipients > 0 then -- Use pcall to catch possible exceptions, e.g., (string expected, got light userdata) local status, json_notification = pcall(function() return json.encode(notification) end) -- If an exception occurred, print the notification and exit if not status then traceError(TRACE_ERROR, TRACE_CONSOLE, "Failure encoding notification") tprint(notification) return end for _, recipient in ipairs(recipients) do local recipient_ok = true if debug_vs and is_vs then traceError(TRACE_NORMAL, TRACE_CONSOLE, "VS: evaluating recipient") tprint(recipient) end -- If recipient_id is not nil, it means that notification has to be -- dispatched only to the specific recipient if recipient_ok and recipient_id then if recipient_id == recipient.recipient_id then goto skip_filters end recipient_ok = false end -- Checking if a specific notification type is requested -- otherwise go to the alerts filters if recipient_ok and notification_type and notification_type ~= "alerts" then if recipient.notifications_type and recipient.notifications_type ~= "alerts" then if notification_type == recipient.notifications_type then if debug_vs and is_vs then traceError(TRACE_NORMAL, TRACE_CONSOLE, "VS: recipient match!") end goto skip_filters end end recipient_ok = false end -- Check Category if recipient_ok and notification_category and recipient.check_categories ~= nil then -- Make sure the user script category belongs to the recipient check categories recipient_ok = false for _, check_category in pairs(recipient.check_categories) do if check_category == notification_category then recipient_ok = true end end if not recipient_ok then debug_print("X Discarding " .. notification.entity_val .. " alert for recipient " .. recipient.recipient_name .. " due to category selection") end end -- Check Entity if recipient_ok and notification.entity_id and recipient.check_entities ~= nil then -- Make sure the user script entity belongs to the recipient check entities recipient_ok = false for _, check_entity_id in pairs(recipient.check_entities) do if check_entity_id == notification.entity_id then recipient_ok = true end end if not recipient_ok then debug_print("X Discarding " .. notification.entity_val .. " alert for recipient " .. recipient.recipient_name .. " due to entity selection") end end -- Check Severity if recipient_ok then if notification.severity and recipient.minimum_severity ~= nil and notification.severity < recipient.minimum_severity then -- If the current alert severity is less than the minimum requested severity exclude the recipient debug_print("X Discarding " .. notification.entity_val .. " alert for recipient " .. recipient.recipient_name .. " due to severity") recipient_ok = false end end -- Check Alerts List if recipient_ok and notification.alert_id and notification.entity_id ~= nil then -- Apply the filters if available if table.len(recipient["checks"] or {}) > 0 then recipient_ok = false for entity, alert_list in pairs(recipient["checks"] or {}) do -- First of all check if the entity_id of the alert is the same of the one to filter if alert_entities[entity].entity_id == notification.entity_id then -- Then check the alert_id for _, alert_id in pairs(alert_list) do -- Same ID, break the loop and continue, the alert is OK if tonumber(alert_id) == notification.alert_id then recipient_ok = true break end end end end if not recipient_ok then debug_print("X Discarding " .. notification.entity_val .. " alert for recipient " .. recipient.recipient_name .. " due to entity selection") end end end -- Check Pool if recipient_ok then if notification.host_pool_id then if recipient.recipient_name ~= "builtin_recipient_alert_store_db" and recipient.host_pools then local host_pools_map = swapKeysValues(recipient.host_pools) if not host_pools_map[notification.host_pool_id] then debug_print("X Discarding " .. notification.entity_val .. " alert for recipient " .. recipient.recipient_name .. " due to host pool selection (" .. notification.host_pool_id .. ")") recipient_ok = false end end end end if recipient_ok then if notification.entity_id == alert_entities.am_host.entity_id and notification.entity_val then if recipient.recipient_name ~= "builtin_recipient_alert_store_db" then local am_measurement local am_host local parts = split(notification.entity_val, "@") if #parts == 2 then am_measurement = parts[1] am_host = parts[2] end if am_measurement == "vs" then -- Vulnerability scan - enabled for any hosts else -- Active Monitoring measurements if recipient.am_hosts then local am_hosts_map = swapKeysValues(recipient.am_hosts) if not am_hosts_map[notification.entity_val] then recipient_ok = false debug_print("X Discarding " .. notification.entity_val .. " alert for recipient " .. recipient.recipient_name .. " due to AM selection") end end end end end end ::skip_filters:: if recipient_ok then -- Enqueue alert --debug_print("Delivering alert for entity id " .. notification.entity_id .. " to recipient " .. recipient.recipient_name) if debug_vs and is_vs then traceError(TRACE_NORMAL, TRACE_CONSOLE, "VS: enqueueing notification to recipient") end ntop.recipient_enqueue(recipient.recipient_id, json_notification --[[ alert --]] , notification.score, notification.alert_id, notification.entity_id, notification_category) end end ::continue:: end end -- ############################################## -- @brief Processes notifications dispatched to recipients -- @param ready_recipients A table with recipients ready to export. Recipients who completed their work are removed from the table -- @param now An epoch of the current time -- @param periodic_frequency The frequency, in seconds, of this call -- @param force_export A boolean telling to forcefully export dispatched notifications -- @return nil local function process_notifications(ready_recipients, now, deadline, periodic_frequency, force_export) -- Total budget available, which is a multiple of the periodic_frequency -- Budget in this case is the maximum number of notifications which can -- be processed during this call. local total_budget = 1000 * periodic_frequency -- To avoid having one recipient jeopardizing all the resources, the total -- budget is consumed in chunks, that is, recipients are iterated multiple times -- and, each time any recipient has a maximum budget for every iteration. local budget_per_iter = total_budget / #ready_recipients -- Put a cap of 1000 messages/iteration if (budget_per_iter > 1000) then budget_per_iter = 1000 end -- Cycle until there are ready_recipients and total_budget left local cur_time = os.time() while #ready_recipients > 0 and total_budget >= 0 and cur_time <= deadline and (force_export or not ntop.isDeadlineApproaching()) do for i = #ready_recipients, 1, -1 do local ready_recipient = ready_recipients[i] local recipient = ready_recipient.recipient local m = ready_recipient.mod debug_print("Dequeuing alerts for ready recipient: " .. recipient.recipient_name .. " recipient_id: " .. recipient.recipient_id) if last_error_notification == 0 then last_error_notification = tonumber(ntop.getCache(string.format(ERROR_KEY, recipient.recipient_name))) or 0 end if m.dequeueRecipientAlerts and (now > MIN_ERROR_DELAY + last_error_notification) then local rv = m.dequeueRecipientAlerts(recipient, budget_per_iter) -- If the recipient has failed (not rv.success) or -- if it has no more work to do (not rv.more_available) -- it can be removed from the array of ready recipients. if not rv.success or not rv.more_available then table.remove(ready_recipients, i) debug_print("Ready recipient done: " .. recipient.recipient_name) if not rv.success then last_error_notification = now ntop.setCache(string.format(ERROR_KEY, recipient.recipient_name), now) local msg = rv.error_message or "Unknown Error" traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. recipient.recipient_name .. " " .. msg) end end end end -- Update the total budget total_budget = total_budget - budget_per_iter cur_time = os.time() end if do_trace then if #ready_recipients > 0 then debug_print("Deadline approaching: " .. tostring(deadline < cur_time)) debug_print("Budget left: " .. total_budget) debug_print("The following recipients were unable to dequeue all their notifications") for _, ready_recipient in pairs(ready_recipients) do debug_print(" " .. ready_recipient.recipient.recipient_name) end end end end -- ################################################################# -- @brief Check if it time to export notifications towards recipient identified with `recipient_id`, depending on its `xport_frequency` -- @param recipient_id The integer recipient identifier -- @param export_frequency The recipient export frequency in seconds -- @param now The current epoch -- @return True if it is time to export notifications towards the recipient, or False otherwise local function check_endpoint_export(recipient_id, export_frequency, now) -- Read the epoch of the last time the recipient was used local last_use = ntop.recipient_last_use(recipient_id) local res = last_use + export_frequency <= now return res end -- ################################################################# -- @brief Processes notifications dispatched to recipients -- @param now An epoch of the current time -- @param periodic_frequency The frequency, in seconds, of this call -- @param force_export A boolean telling to forcefully export dispatched notifications -- @return nil local cached_recipients function recipients.process_notifications(now, deadline, periodic_frequency, force_export) local endpoints = require "endpoints" if not areAlertsEnabled() then return end -- Dequeue alerts from the internal C queue process_notifications_from_c_queue() -- Dequeue alerts enqueued into per-recipient queues from checks if not cached_recipients then -- Cache recipients to avoid re-reading them constantly -- NOTE: in case of recipient add/edit/delete, the vm executing this -- function is reloaded and thus, recipients, are re-cached automatically cached_recipients = recipients.get_all_recipients() end local modules_by_name = endpoints.get_types() local ready_recipients = {} -- Check, among all available recipients, those that are ready to export, depending on -- their EXPORT_FREQUENCY for _, recipient in pairs(cached_recipients) do local module_name = recipient.endpoint_key if modules_by_name[module_name] then local m = modules_by_name[module_name] if force_export or check_endpoint_export(recipient.recipient_id, m.EXPORT_FREQUENCY, now) then -- This recipient is ready for export... local ready_recipient = { recipient = recipient, recipient_id = recipient.recipient_id, mod = m } ready_recipients[#ready_recipients + 1] = ready_recipient end end end process_notifications(ready_recipients, now, deadline, periodic_frequency, force_export) end -- ############################################## -- @brief Cleanup all but builtin recipients function recipients.cleanup() local all_recipients = recipients.get_all_recipients() for _, recipient in pairs(all_recipients) do recipients.delete_recipient(recipient.recipient_id) end recipients.initialize() end -- ############################################## function recipients.isAlertsRecipient(recipient) return recipient.notifications_type == "alerts" end -- ############################################## function recipients.isBuiltinRecipient(recipient) return recipient.recipient_name == "builtin_recipient_alert_store_db" end -- ############################################## return recipients