import re import os from os.path import join import sys import getopt from zipfile import ZipFile from multiprocessing import Pool, cpu_count import time import socket import uuid import json import requests from requests_toolbelt.multipart.encoder import MultipartEncoder import shutil def print_pod_host_and_source(inputZips): pods = {} for inputZip in inputZips: with ZipFile(inputZip, 'r') as zip: members = zip.namelist() #print(members) for path in members: #print(path) dirs = path.split("/") if len(dirs) < 3: continue source = dirs[0] pod = dirs[1] host = dirs[2] if host == '': continue if not pod in pods: pods[pod] = {} if not host in pods[pod]: pods[pod][host] = set() pods[pod][host].add(source) #print("%s -> %s" % (pod, host)) for pod in pods: for host in pods[pod]: print("%s -> %s - %s" % (pod, host, pods[pod][host])) return pods def walk(input_zip, pods2hosts2sources, dest_host, dest_port, additional_tags): tasks = split_into_tasks( pods2hosts2sources) # returns [(path_in_zip, pod, host), ...] futures = start_tasks(input_zip, tasks, dest_host, dest_port, additional_tags) wait_for_futures(futures) def get_files(root_dir): result = [] for dirName, subdirList, fileList in os.walk(root_dir): for fname in fileList: result.append((root_dir, dirName, fname)) return result def split_into_tasks(pods2hosts2sources): result = [] for pod in pods2hosts2sources: for host in pods2hosts2sources[pod]: for source in pods2hosts2sources[pod][host]: relative_path_in_zip = "%s%s%s%s%s" % (source, "/", pod, "/", host) result.append( (relative_path_in_zip, source, pod, host) ) return result def start_tasks(input_zip, tasks, dest_host, dest_port, additional_tags): pool = Pool(processes=max(cpu_count()-3, 1)) #pool = Pool(processes=2) futures = [] for task in tasks: (path_in_zip, source, pod, host) = task future = pool.apply_async(transform_and_send_logs, (input_zip, path_in_zip,source, pod, host, dest_host, dest_port, additional_tags)) futures.append(future) return futures def wait_for_futures(futures): for future in futures: description = future.get(timeout=3600) print("done: %s" % description); def to_iso(date): return date.replace(',', '.').replace(' ', 'T') + "Z" def get_build_from_file_in_zip(zip, file_in_zip): with zip.open(file_in_zip, "r") as f: first_line = f.readline().decode('utf-8') return get_build(first_line) # 2017-09-28 00:00:00,041 [long-running-executor-2-thread-789] INFO com.recommind.framework.logging.BuildInfoLayout null - HOSTNAME='VAPFINRA01AP001.rmcloud.int (172.19.10.132)', BUILD='AXC_5.12_442(348cf5a22ee69be3a8e17fc42b79de3c4f2d7875)@AXC_5.12_BRANCH/fixes/AXC-40211', JAVA= def get_build(first_log_line): build = "(?P[^(]+)" regex = ".*BUILD='%s" % (build) pattern = re.compile(regex) match = pattern.match(first_log_line) result = match.group("build") if match else None #print("%s -> %s" % (first_log_line, result)) return result def transform_and_send_logs(input_zip, path_in_zip, source, pod, host, dest_host, dest_port, additional_tags): print("start %s" % path_in_zip) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((dest_host, dest_port)) with ZipFile(input_zip, 'r') as zip: members = zip.namelist() for file_in_zip in members: if file_in_zip.startswith(path_in_zip): if file_in_zip.endswith(".log"): #print("%s -> %s" % (path_in_zip, file_in_zip)) # TODO enable again (was only disabled to test metrics csv sending) #parse_and_send_performance_log_files(s, zip, file_in_zip, source, pod, host, additional_tags) print("skipping ingestion of log files for debugging reasons") elif file_in_zip.endswith(".csv") and "performance." in file_in_zip: send_performance_csv(zip, file_in_zip, source, pod, host, additional_tags) elif file_in_zip.endswith(".csv") and "metric" in file_in_zip: send_metric_csv(zip, file_in_zip, source, pod, host, additional_tags) #parse_and_send_metric_csv_files(s, zip, file_in_zip, source, pod, host, additional_tags) s.close() print("done %s" % path_in_zip) return "pod: %s host: %s -> %s:%d" % (pod, host, dest_host, dest_port) def send_metric_csv(zip, file_in_zip, source, pod, host, additional_tags): # #2020-04-21 00:03:10,002 HOSTNAME='172-28-6-197-VAPNYSE001AA101.rmcloud.int (127.0.1.1)', BUILD='AXC_5.17_1109(12356525fb304c6c9057fd83476620053f518c51)@AXC_5.17_BRANCH', JAVA='OpenJDK 64-Bit Server VM (12.0.1+12)', CLASSPATH='/opt/apache/tomcat/bin/bootstrap.jar:/opt/apache/tomcat/bin/tomcat-juli.jar', PID='48' # time;metric;value;project;trace_id # 2020-04-21 00:03:18,951;assignments.batches.totalBatches;0;axcelerate.app_server_monitoring;k6m10054.esf4.3fsx7 tags = additional_tags tags['pod'] = pod tags['host'] = host tags['source'] = source tags['type'] = "number" tags['build'] = get_build_from_file_in_zip(zip, file_in_zip) csvSettings = json.dumps({ "timeColumn": "time", "valueColumn": "value", "comment": '#', "separator": ';', "columnDefinitions": { "columns": { "time": {}, "metric": {}, "value": {}, "project": {"postProcessors": ["LOWER_CASE"]}, "trace_id": {"ignore" : True} } }, "additionalTags": additional_tags }) send_csv(zip, file_in_zip, csvSettings) def send_performance_csv(zip, file_in_zip, source, pod, host, additional_tags): # #HOSTNAME='172-28-37-108-VAPBDCOM01AA001.rmcloud.int (127.0.1.1)', BUILD='AXC_5.17_1109(12356525fb304c6c9057fd83476620053f518c51)@AXC_5.17_BRANCH', JAVA='OpenJDK 64-Bit Server VM (12.0.1+12)', CLASSPATH='/opt/apache/tomcat/bin/bootstrap.jar:/opt/apache/tomcat/bin/tomcat-juli.jar', PID='54' # time;duration;method;project;user;trace_id # 2020-04-21 00:02:11,329;0;HealthCheckService.isOperable;;;k6ufq4tg.8ert.110y8 tags = additional_tags tags['pod'] = pod tags['host'] = host tags['source'] = source tags['type'] = "duration" tags['build'] = get_build_from_file_in_zip(zip, file_in_zip) csvSettings = json.dumps({ "timeColumn": "time", "valueColumn": "duration", "comment": '#', "separator": ';', "columnDefinitions": { "columns": { "time": {}, "duration": {}, "method": {}, "project": {"postProcessors": ["LOWER_CASE"]}, "user": {"ignore" : True}, "trace_id": {"ignore" : True} } }, "additionalTags": additional_tags }) send_csv(zip, file_in_zip, csvSettings) def send_csv(zip, file_in_zip, csvSettings): print(csvSettings) os.makedirs('tmp', exist_ok=True) tmp_folder = join('tmp',str(uuid.uuid4())) try: # zip file does not support seeking - this forces us to extract the file csvFileToSend= zip.extract(file_in_zip, tmp_folder) with open(csvFileToSend, 'rb') as data: m = MultipartEncoder( fields={ 'file': ('0.csv', data, 'text/csv'), 'settings': ('csvReaderSettings.json', csvSettings , 'application/json') }) r = requests.post('http://localhost:17333/api/data?waitUntilFinished=true', data=m, headers={ 'Accept': 'text/plain, application/json, application/*+json, */*', 'Content-Type': m.content_type.replace("form-data", "mixed;charset=UTF-8") }) print("response:") print(r) finally: shutil.rmtree(tmp_folder) def parse_and_send_metric_csv_files(socket, zip, file_in_zip, source, pod, host, additional_tags): date = "(?P\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d)" method = "(?P[\S]+)" duration = "(?P\d+)" status = "(?P[\w]+)" project = "project=(?P[^\s,\]]+)" regex = "%s.*- Executed %s in %s ms %s" % (date, method, duration, status) regex_project = ".*%s" % project pattern = re.compile(regex) pattern_project = re.compile(regex_project) with zip.open(file_in_zip, "r") as f: first_line = f.readline().decode('utf-8') build = get_build(first_line) # example # time;metric;value;project;trace_id # 2018-10-31 09:36:12,428;applicationSync.applications;75;;jnwyp2jc.2f csv_header_line = f.readline().decode('utf-8') #print(csv_header_line) header_columns = csv_header_line.split(";") time_index = header_columns.index("time") metric_index = header_columns.index("metric") value_index = header_columns.index("value") project_index = header_columns.index("project") for line in f: line = line.decode('utf-8') cols = line.split(";") event = { "@timestamp": to_iso(cols[time_index]), "pod": pod, "host": host, "source": source, "type": "number", "metric" : cols[metric_index], "duration" : int(cols[value_index]), } if cols[project_index]: event["project"] = cols[project_index].lower() if build: event["build"] = build event.update(additional_tags) out_line = str(event).replace("'", '"').encode('utf-8') #print(out_line) socket.sendall(out_line) socket.sendall(b'\n') def parse_and_send_performance_log_files(socket, zip, file_in_zip, source, pod, host, additional_tags): date = "(?P\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d)" method = "(?P[\S]+)" duration = "(?P\d+)" status = "(?P[\w]+)" project = "project=(?P[^\s,\]]+)" regex = "%s.*- Executed %s in %s ms %s" % (date, method, duration, status) regex_project = ".*%s" % project pattern = re.compile(regex) pattern_project = re.compile(regex_project) with zip.open(file_in_zip, "r") as f: first_line = f.readline().decode('utf-8') build = get_build(first_line) for line in f: line = line.decode('utf-8') match = pattern.match(line) match_project = pattern_project.match(line) if match: event = { "@timestamp": to_iso(match.group("date")), "pod": pod, "host": host, "source": source, "type": "time", "method" : match.group("method"), "duration" : int(match.group("duration")), "status" : match.group("status"), } if match_project: event["project"] = match_project.group("project").lower() if build: event["build"] = build event.update(additional_tags) out_line = str(event).replace("'", '"').encode('utf-8') #print(out_line) socket.sendall(out_line) socket.sendall(b'\n') def to_map(serialized_map): if serialized_map == "": return {} result = {} tags = serialized_map.split(";") for tag in tags: key_value = tag.split("=") if len(key_value) == 2: result[key_value[0]] = key_value[1] return result def help(returnValue): print(sys.argv[0] + ' -i -h -p ') print('Examples: ') print(sys.argv[0] + ' -i "/path/to/logs" -h "localhost" -p 17333') sys.exit(returnValue) def main(argv): input_zips = [] host='localhost' port=17333 try: opts, args = getopt.getopt(argv,"i:h:p:",["input", "host", "port"]) except getopt.GetoptError: help(2) for opt, arg in opts: if opt == '--help': help(0) elif opt in ("-i", "--input"): if os.path.isdir(arg): input_zips = [join(arg, f) for f in os.listdir(arg) if os.path.isfile(join(arg, f)) and f.endswith(".zip")] else: input_zips = arg.split(' ') elif opt in ("-h", "--host"): host = arg elif opt in ("-p", "--port"): print(arg) port = int(arg) print('Input file(s) is "' + str(input_zips) + '"') for input_zip in input_zips: if not os.path.isfile(input_zip) or not input_zip.endswith(".zip"): print("input file is not a zip file: %s"% (input_zip)) help(2) if None == re.match('.+', host): print("host must not be empty, but was %s" % host) help(2) pods2hosts2sources = print_pod_host_and_source(input_zips) answer = input("continue y or n ?") if answer == 'y': additional_tags = to_map(input("additional tags (e.g. test=lt_axc_5.16.02;keyword=RiC): ")) print("using additional tags: %s" % additional_tags) start = time.time() for input_zip in input_zips: print("start Zip: %s" % input_zip) walk(input_zip, pods2hosts2sources, host, port, additional_tags) end = time.time() print("duration %s seconds" % (end - start)) if __name__ == "__main__": main(sys.argv[1:])