EVOLUTION-MANAGER
Edit File: check_rfio_activity
#!/usr/bin/env python ############################################################################## # Copyright (c) Members of the EGEE Collaboration. 2011. # See http://www.eu-egee.org/partners/ for details on the copyright # holders. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS # OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################## # # NAME : check_rfio_activity # # DESCRIPTION : Process the rfio log to retrieve activity informations. # # AUTHORS : Alexandre.beche@cern.ch # ############################################################################## import os import re import subprocess import time import copy from datetime import datetime, timedelta from lcgdmcommon import * class check_rfio_activity: __version__ = "0.0.1" __nagios_id__ = "DM-RFIO-ACTIVITY" # Defaults DEFAULT_MEDIUM = "100M" DEFAULT_LARGE = "1G" DEFAULT_LOG = "/var/log/rfio/log" DEFAULT_INTERVAL = 10 # Specific parameters, where key = short, value = long (i.e. {"h":"help", "C:":"command="}) # getopt format. The long version will be the one passed even when the short is specified __additional_opts__ = {"m:" : "medium=", "l:" : "large=", "f:": "logfile=", "i:": "interval="} # Specific usage information __usage__ = """ \t-f, --log\tThe dpm-gsiftp log file.Default %s \t-i, --interval\tDefault interval to use. In seconds. Default: %d s. \t-m, --medium\tLimit for minimum size of medium file. Default: %sB. \t-l, --large\tLimit for minimum size of large file. Defautl: %sB. """ % (DEFAULT_LOG, DEFAULT_INTERVAL, DEFAULT_MEDIUM, DEFAULT_LARGE) # Methods def __init__(self, opt = {}, args = []): """ Constructor @param opt Contains a dictionary with the long option name as the key, and the argument as value @param args Contains the arguments not associated with any option """ # range definition for file size self.range = {} if "medium" in opt: self.range["medium"] = opt["medium"] else: self.range["medium"] = self.DEFAULT_MEDIUM if "large" in opt: self.range["large"] = opt["large"] else: self.range["large"] = self.DEFAULT_LARGE # convert human readable file size into filesize in byte for range, limit in self.range.iteritems(): self.range[range] = real_bytes(limit) self.range["small"] = None # RFIO logfile to use if "log" in opt: self.log = opt["log"] else: self.log = self.DEFAULT_LOG # Interval if "interval" in opt: self.interval = int(opt["interval"]) else: self.interval = self.DEFAULT_INTERVAL def get_timestamp(self, date): date = date.split() day, date = int(date[0]), date[1] date = date.split(":") hour, minute, second = int(date[0]), int(date[1]), int(date[2]) return second + (minute * 60) + (hour * 3600) + (day * 24 * 3600) def find_range(self, file_size): if file_size <= self.range["medium"]: return "small" elif file_size <= self.range["large"]: return "medium" else: return "large" def get_type(self, bytes_read, bytes_written): if bytes_read > 0: type = "read" elif bytes_written > 0: type = "write" else: type = "unknown" return type, int(bytes_read + bytes_written) def get_buffer_data(self, buffer): timestamp = int(buffer[0]) transfer_mode = buffer[1] bytes_transfered = buffer[2] return timestamp, transfer_mode, bytes_transfered def add_data_in_global_dictionary(self, global_transfer_dictionary, transfer_mode, ellapsed_time, bytes_transfered): file_range = self.find_range(bytes_transfered) if ellapsed_time == 0: ellapsed_time = 1 for index in [file_range, "total"]: global_transfer_dictionary[index][transfer_mode]["transfer-count"] += 1 global_transfer_dictionary[index][transfer_mode]["transfered-bytes"] += bytes_transfered global_transfer_dictionary[index][transfer_mode]["total-time"] += ellapsed_time return global_transfer_dictionary def transfer_dictionary_init(self): global_transfer_dictionary = {} data = {"transfer-count":0,"transfered-bytes":0,"total-time":0} for category in self.range.keys(): global_transfer_dictionary[category] = {"read":copy.copy(data),"write":copy.copy(data)} global_transfer_dictionary["total"] = {"read":copy.copy(data),"write":copy.copy(data)} return global_transfer_dictionary def parse_logfile(self): global_transfer_dictionary = self.transfer_dictionary_init() rfio = open(self.log, "r") begin_transfer_pattern = re.compile("^.* (\d\d \d*:\d*:\d*) rfiod\[(\d*)\].*request.*$") end_transfer_pattern = re.compile("^.* (\d\d \d*:\d*:\d*) rfiod\[(\d*)\].*: (\d*) bytes read and (\d*) bytes written.*$") buffer = {} current_time = datetime.now().strftime("%d %H:%M:%S") current_time = self.get_timestamp(current_time) for line in reversed_lines(rfio): begin_transfer = begin_transfer_pattern.search(line) end_transfer = end_transfer_pattern.search(line) line = " ".join(line.split()[1:3]) if self.get_timestamp(line) < (current_time - (60*self.interval)): return global_transfer_dictionary if end_transfer: time, thread_id = end_transfer.group(1), end_transfer.group(2) type, bytes_transfered = self.get_type(int(end_transfer.group(3)), int(end_transfer.group(4))) buffer[thread_id] = [self.get_timestamp(time), type, bytes_transfered] elif begin_transfer and buffer.has_key(begin_transfer.group(2)): request_timestamp, thread_id = self.get_timestamp(begin_transfer.group(1)), begin_transfer.group(2) return_timestamp, type, bytes_transfered = self.get_buffer_data(buffer[thread_id]) ellapsed_time = return_timestamp - request_timestamp if type != "unknown": global_transfer_dictionary = self.add_data_in_global_dictionary(global_transfer_dictionary, type, ellapsed_time, bytes_transfered) del buffer[thread_id] rfio.close() return global_transfer_dictionary def main(self): """ Test code itself. May raise exceptions. @return A tuple (exit code, message, performance) """ transfer_dictionary = self.parse_logfile() return_result, performance_data = "", "" for size in self.range: for operation in transfer_dictionary[size].keys(): transfered_bytes = float(transfer_dictionary[size][operation]["transfered-bytes"]) performance_data += " %s_%s_transfer-count=%s" % (size, operation, str(transfer_dictionary[size][operation]["transfer-count"])) performance_data += " %s_%s_transfered-bytes=%.2fB" % (size, operation, transfer_dictionary[size][operation]["transfered-bytes"]) performance_data += " %s_%s_link-throughput=%.2fB/s" % (size, operation, (transfered_bytes / (60 * self.interval))) for operation in transfer_dictionary["total"].keys(): total_transfer = int(transfer_dictionary["total"][operation]["transfer-count"]) transfered_bytes = float(transfer_dictionary["total"][operation]["transfered-bytes"]) time_to_transfer = float(transfer_dictionary["total"][operation]["total-time"]) if time_to_transfer == 0: transfer_rate = 0 else: transfer_rate = transfered_bytes / time_to_transfer return_result += " %d %s operation (%.2fB/s for %.2fB), " %(total_transfer, operation, transfer_rate, transfered_bytes) performance_data += " total_%s_transfer-rate=%.2fB/s" % (operation, transfer_rate) return (EX_OK, return_result, performance_data) # When called directly if __name__ == "__main__": run(check_rfio_activity)