diff --git a/plogdownloader/.gitignore b/plogdownloader/.gitignore new file mode 100644 index 0000000..71a79b2 --- /dev/null +++ b/plogdownloader/.gitignore @@ -0,0 +1,4 @@ +out +download +tmp +*.pyc \ No newline at end of file diff --git a/plogdownloader/.project b/plogdownloader/.project new file mode 100644 index 0000000..791287b --- /dev/null +++ b/plogdownloader/.project @@ -0,0 +1,11 @@ + + + plogdownloader + + + + + + + + diff --git a/plogdownloader/download.bat b/plogdownloader/download.bat new file mode 100644 index 0000000..2e8567d --- /dev/null +++ b/plogdownloader/download.bat @@ -0,0 +1,8 @@ +@echo off + +set /p from_date=from date (yyyy-mm-dd): +set /p to_date=from date (yyyy-mm-dd): + +C:\Tools\Python36\python download.py --from %from_date% --to %to_date% --output "..\tmp" --pods vapbrewe01,vapglask01,vapjazzp01,vapnorto01,vapondem01,vapondem02,vapondem03,vapondem04 --tmpdir "..\tmp" + +pause \ No newline at end of file diff --git a/plogdownloader/download.py b/plogdownloader/download.py new file mode 100644 index 0000000..8a6dbc9 --- /dev/null +++ b/plogdownloader/download.py @@ -0,0 +1,135 @@ +import re +import gzip +import os +import sys +import getopt +from os.path import getsize, isdir, isfile, join, dirname, abspath, exists +from multiprocessing import Pool, cpu_count +import time +import shutil +from datetime import datetime +from datetime import timedelta +from random import random +from subprocess import call,check_call +import time + + +def download(fromDate, to_date, pod_list, outputFile, tmpDir): + dates = getDateRange(fromDate, to_date) + print(dates) + + pods = pod_list.replace(' ','').split(',') + + + tmpExtractDir = join(tmpDir, "extracted") + os.makedirs(tmpExtractDir) + + for pod in pods: + for date in dates: + tmpLogDir = join(tmpDir, 'logs', pod); + os.makedirs(tmpLogDir, exist_ok=True) + tmpExtractedPodDir = join(tmpExtractDir, pod) + os.makedirs(tmpExtractedPodDir, exist_ok=True) + cmd = ["aws", "s3", "cp", "s3://recommind-logs/"+pod.upper()+"/log/app_asg/"+date, tmpLogDir+"/"+date, "--recursive", "--exclude", "*", "--include", "axcng-service*"] + print(cmd) + check_call(cmd) + + #cmd = [sys.executable, "extractAppAsgLogs.py", "-e", ".*/performance.*log.*", "-i", tmpLogDir, "-o", tmpExtractedPodDir] + cmd = [sys.executable, "extractAppAsgLogsWith7z.py", "-e", "*/performance*log*", "-i", tmpLogDir, "-o", tmpExtractedPodDir] + print(cmd) + check_call(cmd) + + shutil.rmtree(tmpLogDir, ignore_errors=True) + + # zip the performance logs + #shutil.make_archive(outputFile, 'zip', tmpExtractDir) + remove_file(outputFile) + print("sleep 20 seconds to give the file share time to update directory entries") + time.sleep(20) + cmd = ["C:\\Program Files\\7-Zip\\7z.exe", "a", "-tzip", outputFile, join(tmpExtractDir, "*")] + print(cmd) + check_call(cmd) + + shutil.rmtree(tmpDir, ignore_errors=True) + + +def remove_file(file): + if os.path.isfile(file): + os.remove(file) + +def getDateRange(from_date, to_date): + pattern = "%Y-%m-%d" + from_datetime = datetime.strptime(from_date, pattern) + to_datetime = datetime.strptime(to_date, pattern) + print(from_datetime) + print(to_datetime) + + if from_datetime > to_datetime: + raise Exception("invalid date range %s - %s" % (from_datetime, to_datetime)) + + result = [] + + current = from_datetime + while current <= to_datetime: + result.append(datetime.strftime(current, pattern)) + current += timedelta(days=1) + + return result + +def help(returnValue): + print(sys.argv[0] + ' --from --to --output --pods ') + print('Examples: ') + print(sys.argv[0] + ' --from 2018-01-01 --to 2018-01-31 --output /path/to/outputdir --pods vapondem01') + sys.exit(returnValue) + + +def main(argv): + tmpBaseDir = 'tmp123' + outputDir = 'out'; + pods = 'vapbrewe01,vapglask01,vapjazzp01,vapnorto01,vapondem01,vapondem02,vapondem03,vapondem04' + from_date = '' + to_date = '' + try: + print ("args: %s" % (argv)) + opts, args = getopt.getopt(argv,"f:ho:p:t:x:",["from=", "help","output=", "pods=", "tmpdir=", "to=",]) + except getopt.GetoptError: + help(2) + print ("opts: %s" % opts) + for opt, arg in opts: + + print ("-- %s, %s" % (opt, arg)) + if opt in ("-h", "--help"): + help(0) + elif opt in ("-f", "--from"): + from_date = arg + elif opt in ("-o", "--output"): + outputDir = arg + elif opt in ("-p", "--pods"): + pods = arg + elif opt in ("-x", "--tmpdir"): + tmpBaseDir = arg + elif opt in ("-t", "--to"): + to_date = arg + outputFile = join(outputDir, "logs_"+from_date+"_"+to_date+"_"+pods.replace(',', '_')) + tmpDir = join(tmpBaseDir, str(random())[2:]) + print('tmpBaseDir directory is "' + tmpBaseDir + '"') + print('tmpDir directory is "' + tmpDir + '"') + print('Ouput file is "' + outputFile + '"') + + if not os.path.isdir(tmpBaseDir): + print("tmpBaseDir is not a directory") + help(2) + if not os.path.isdir(dirname(abspath(outputFile))): + print("'%s' is not a directory" % (dirname(abspath(outputFile)))) + help(2) + if os.path.exists(abspath(outputFile)): + print("'%s' already exists" % (abspath(outputFile))) + help(2) + + start = time.time() + download(from_date, to_date, pods, outputFile, tmpDir) + end = time.time() + print("duration %s seconds" % (end - start)) + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/plogdownloader/download_jumphost.bat b/plogdownloader/download_jumphost.bat new file mode 100644 index 0000000..bf16e7a --- /dev/null +++ b/plogdownloader/download_jumphost.bat @@ -0,0 +1,17 @@ +@echo off + +set default_pods=vapbrewe01,vapfinra01,vapglask01,vapjazzp01,vapnorto01,vapnyse001,vapondem01,vapondem02,vapondem03,vapondem04,vappilby01,vaprjrey01,vapfacbk01,vapbdcom01,vaprayja01 + +set /p from_date=from date (yyyy-mm-dd): +set /p to_date=to date (yyyy-mm-dd): +echo default pods: %default_pods% +set /p pods=pods (lowercase, comma separated, or leave empty for default): +if "%pods%" == "" set pods=%default_pods% + + +S:\Working\ahr\Python36\python download.py --from %from_date% --to %to_date% --output "..\tmp" --pods "%pods%" --tmpdir "..\tmp" + + + + +pause \ No newline at end of file diff --git a/plogdownloader/extractAppAsgLogsWith7z.py b/plogdownloader/extractAppAsgLogsWith7z.py new file mode 100644 index 0000000..bd86ffd --- /dev/null +++ b/plogdownloader/extractAppAsgLogsWith7z.py @@ -0,0 +1,225 @@ +from zipfile import ZipFile +import os +from os import listdir +from os.path import getsize, isdir, isfile, join, dirname +import sys +import getopt +import re +import shutil +from subprocess import call,check_call +import time + +# S:\Working\ahr\Python36\python S:\Working\ahr\giru\plog-downloader\extractAppAsgLogsWith7z.py -e "*/performance*log*" -o S:\Working\ahr\giru\tmp\9323982100316752\extracted\vaprjrey01 -i S:\Working\ahr\giru\tmp\9323982100316752\logs\vaprjrey01 + +def extract(inputDir, outputDir, pattern): + zips = listZips(inputDir) + zips.sort() + print("zips: %s" % zips); + zipToOutputDir = mapZipToOutputDir(zips, outputDir) + print("zipToOutputDir: %s" % zipToOutputDir); + extractFiles(zipToOutputDir, pattern) + + +def extractFiles(zipToOutputDir, pattern): + for zipFile, outputDir in zipToOutputDir: + # 7z e archive.zip -o outputdir *.xml *.dll + cmd = ["C:\\Program Files\\7-Zip\\7z.exe", "e", zipFile, "-y", "-o%s"%outputDir, pattern] + print(cmd) + check_call(cmd) + extractZippedLogs(outputDir) + removeLogsCreatedAfterRolling(outputDir) + #with ZipFile(zipFile, 'r') as myzip: + # members = myzip.namelist() + # filteredMembers = filterMembers(members, pattern) + # filteredMembers = filterExisting(filteredMembers, outputDir, myzip) + # filteredMembers.sort() + # if len(filteredMembers) == 0: + # continue + # print("extracting ",filteredMembers, "-> " +outputDir) + # myzip.extractall(outputDir, filteredMembers) + # extractZippedLogs(filteredMembers, outputDir) + # removeLogsCreatedAfterRolling(filteredMembers, outputDir) + +def extractZippedLogs(outputDir): + + from os import walk + zips = [] + for (dirpath, dirnames, filenames) in walk(outputDir): + for name in filenames: + if name.endswith(".zip"): + zips.append(join(dirpath, name)) + + + print ("extract2: %s" % zips) + for zipFile in zips: + extractFile(zipFile, outputDir) + remove_file(zipFile) + +def extractFile(zipFile, outputDir): + attempt = 0 + while attempt < 10: + if not os.path.isfile(zipFile): + print("extractFile: %s is not a file" % zipFile) + return + try: + attempt += 1 + cmd = ["C:\\Program Files\\7-Zip\\7z.exe", "e", zipFile, "-y", "-o%s"% outputDir] + print("attempt %d: %s" % (attempt,cmd)) + check_call(cmd) + return + except: + print("extraction failed") + time.sleep(1) + if attempt >= 10: + print("aborted after 10 failed attempts") + sys.exit(2) + +def removeLogsCreatedAfterRolling(outputDir): + if not os.path.isdir(outputDir): + return + logfiles = [f for f in listdir(outputDir) if isfile(join(outputDir, f))] + for member in logfiles: + file = join(outputDir, member) + print("check if %s has just been rolled" % file) + if member.endswith('performance.log'): + lastLine = readLastLine(file) + print("lastLine: %s)" % lastLine) + if re.match(".*\d\d\d\d-\d\d-\d\d 00:1.*", lastLine.decode('utf8')): + attempt = 0 + while os.path.isfile(file): + attempt += 1 + print("attempt %d removing %s, because it was just rolled" % (attempt, file)) + try: + os.remove(file) + except: + print("failed to delete") + time.sleep(1) + +def readLastLine(file): + with open(file, 'rb') as f: + for line in f: + f.seek(-2, os.SEEK_END) # Jump to the second last byte. + while f.read(1) != b"\n": # Until EOL is found... + #print f.tell() + if f.tell() <= 2: + break; + f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more. + last = f.readline() # Read last line. + #print "%s -> %s" % (file, last) + return last + + +def filterExisting(filteredMembers, outputDir, myzip): + result = [] + for member in filteredMembers: + file = join(outputDir, member) + if not isfile(file): + result.append(member) + continue + zipInfo = myzip.getinfo(member) + zipFileSize = zipInfo.file_size + fileSize = getsize(file) + if zipFileSize != fileSize: + result.append(member) + else: + print("skip: " + member + " (file exists and has the same size)") + return result + + +def filterMembers(members, pattern): + result = [] + for member in members: + if pattern.match(member): + result.append(member) + return result + + +def mapZipToOutputDir(zips, outputDir): + result = [] + + for zipFileName in zips: + date = extractDate(zipFileName) + instanceId = extractInstanceId(zipFileName) + if date and instanceId: + # why did I have a version with date AND instanceId ??? + # result.append((zipFileName, join(outputDir, date, instanceId))) + result.append((zipFileName, join(outputDir, instanceId))) + + return result + +def extractInstanceId(zipFileName): + # pre 5.14 pattern: axcng-service_i-0ccd20213cffb9fc3_001201.zip + # post 5.14 pattern: axcng-service_i-09c26757fd0b61c12_172_19_113_219_VAPFINRA01AA001_2018-04-14_090701.zip + pattern = re.compile(".*axcng-.*_i-([a-zA-Z0-9]+)_.*.zip") + match = pattern.match(zipFileName) + if match: + instanceId = match.group(1) + else: + instanceId = False + return instanceId + +def extractDate(zipFileName): + # axcng-service_i-0376ad122c7fa2bbc_172_28_1_153_VADTRANS01AA001_2018-08-17_095022.zip + #pattern = re.compile(".*(/|\\\\)([0-9]{4}-[0-9]{2}-[0-9]{2})(/|\\\\).*zip") + pattern = re.compile(".*_([0-9]{4}-[0-9]{2}-[0-9]{2})_[0-9]{6}\.zip") + match = pattern.match(zipFileName) + if match: + date = match.group(1) + else: + print("no date" , zipFileName) + date = False + return date + +def listZips(directory): + zips = [] + for root, dirs, files in os.walk(directory): + for name in files: + print(join(root, name)) + zips.append(join(root,name)) + return zips + +def remove_file(file): + if os.path.isfile(file): + os.remove(file) + + + +def help(returnValue): + print(sys.argv[0] + ' -i -o [-e ]') + print('Examples: ') + print(sys.argv[0] + ' -e ".*/performance.log.*" -i logs\\vapaccen01 -o extracted\\vapaccen01') + print(sys.argv[0] + ' -e ".*/performance.log.*" -i logs\\vapaccen01 -o d:\\ws\\pdb\\logs\\vapaccen01') + sys.exit(returnValue) + +def main(argv): + expression = '.*' + inputDir = '' + outputDir = '' + try: + opts, args = getopt.getopt(argv,"he:i:o:",["expression", "input=", "output="]) + except getopt.GetoptError: + help(2) + for opt, arg in opts: + if opt == '-h': + help(0) + elif opt in ("-e", "--expression"): + expression = arg + elif opt in ("-i", "--input"): + inputDir = arg + elif opt in ("-o", "--output"): + outputDir = arg + print('Input directory is "' + inputDir + '"') + print('Ouput directory is "' + outputDir + '"') + + if not os.path.isdir(inputDir): + print("input dir is not a directory") + help(2) + if not isdir(outputDir): + print("output dir is not a directory") + help(2) + print("extract") + extract(inputDir, outputDir, expression) + sys.exit(0) + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/plogdownloader/performance_to_json.py b/plogdownloader/performance_to_json.py new file mode 100644 index 0000000..5ef20c3 --- /dev/null +++ b/plogdownloader/performance_to_json.py @@ -0,0 +1,404 @@ +import re +import os +from os.path import join +import sys +import getopt +from zipfile import ZipFile +from multiprocessing import Pool, cpu_count +import time +import socket + +import uuid +import json +import requests +from requests_toolbelt.multipart.encoder import MultipartEncoder +import shutil + + +def print_pod_host_and_source(inputZips): + pods = {} + for inputZip in inputZips: + with ZipFile(inputZip, 'r') as zip: + members = zip.namelist() + #print(members) + for path in members: + #print(path) + dirs = path.split("/") + if len(dirs) < 3: + continue + source = dirs[0] + pod = dirs[1] + host = dirs[2] + if host == '': + continue + + if not pod in pods: + pods[pod] = {} + if not host in pods[pod]: + pods[pod][host] = set() + pods[pod][host].add(source) + #print("%s -> %s" % (pod, host)) + for pod in pods: + for host in pods[pod]: + print("%s -> %s - %s" % (pod, host, pods[pod][host])) + return pods + + +def walk(input_zip, pods2hosts2sources, dest_host, dest_port, additional_tags): + tasks = split_into_tasks( pods2hosts2sources) # returns [(path_in_zip, pod, host), ...] + futures = start_tasks(input_zip, tasks, dest_host, dest_port, additional_tags) + wait_for_futures(futures) + + +def get_files(root_dir): + result = [] + for dirName, subdirList, fileList in os.walk(root_dir): + for fname in fileList: + result.append((root_dir, dirName, fname)) + return result + + +def split_into_tasks(pods2hosts2sources): + result = [] + for pod in pods2hosts2sources: + for host in pods2hosts2sources[pod]: + for source in pods2hosts2sources[pod][host]: + relative_path_in_zip = "%s%s%s%s%s" % (source, "/", pod, "/", host) + result.append( (relative_path_in_zip, source, pod, host) ) + return result + + +def start_tasks(input_zip, tasks, dest_host, dest_port, additional_tags): + pool = Pool(processes=max(cpu_count()-3, 1)) + #pool = Pool(processes=2) + futures = [] + for task in tasks: + (path_in_zip, source, pod, host) = task + future = pool.apply_async(transform_and_send_logs, (input_zip, path_in_zip,source, pod, host, dest_host, dest_port, additional_tags)) + futures.append(future) + return futures + + +def wait_for_futures(futures): + for future in futures: + description = future.get(timeout=3600) + print("done: %s" % description); + + +def to_iso(date): + return date.replace(',', '.').replace(' ', 'T') + "Z" + + +def get_build_from_file_in_zip(zip, file_in_zip): + with zip.open(file_in_zip, "r") as f: + first_line = f.readline().decode('utf-8') + return get_build(first_line) + + +# 2017-09-28 00:00:00,041 [long-running-executor-2-thread-789] INFO com.recommind.framework.logging.BuildInfoLayout null - HOSTNAME='VAPFINRA01AP001.rmcloud.int (172.19.10.132)', BUILD='AXC_5.12_442(348cf5a22ee69be3a8e17fc42b79de3c4f2d7875)@AXC_5.12_BRANCH/fixes/AXC-40211', JAVA= +def get_build(first_log_line): + build = "(?P[^(]+)" + regex = ".*BUILD='%s" % (build) + pattern = re.compile(regex) + match = pattern.match(first_log_line) + result = match.group("build") if match else None + #print("%s -> %s" % (first_log_line, result)) + return result + + + +def transform_and_send_logs(input_zip, path_in_zip, source, pod, host, dest_host, dest_port, additional_tags): + + print("start %s" % path_in_zip) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((dest_host, dest_port)) + + with ZipFile(input_zip, 'r') as zip: + members = zip.namelist() + for file_in_zip in members: + if file_in_zip.startswith(path_in_zip): + if file_in_zip.endswith(".log"): + #print("%s -> %s" % (path_in_zip, file_in_zip)) + # TODO enable again (was only disabled to test metrics csv sending) + #parse_and_send_performance_log_files(s, zip, file_in_zip, source, pod, host, additional_tags) + print("skipping ingestion of log files for debugging reasons") + elif file_in_zip.endswith(".csv") and "performance." in file_in_zip: + send_performance_csv(zip, file_in_zip, source, pod, host, additional_tags) + elif file_in_zip.endswith(".csv") and "metric" in file_in_zip: + send_metric_csv(zip, file_in_zip, source, pod, host, additional_tags) + #parse_and_send_metric_csv_files(s, zip, file_in_zip, source, pod, host, additional_tags) + + s.close() + print("done %s" % path_in_zip) + return "pod: %s host: %s -> %s:%d" % (pod, host, dest_host, dest_port) + + +def send_metric_csv(zip, file_in_zip, source, pod, host, additional_tags): + + # #2020-04-21 00:03:10,002 HOSTNAME='172-28-6-197-VAPNYSE001AA101.rmcloud.int (127.0.1.1)', BUILD='AXC_5.17_1109(12356525fb304c6c9057fd83476620053f518c51)@AXC_5.17_BRANCH', JAVA='OpenJDK 64-Bit Server VM (12.0.1+12)', CLASSPATH='/opt/apache/tomcat/bin/bootstrap.jar:/opt/apache/tomcat/bin/tomcat-juli.jar', PID='48' + # time;metric;value;project;trace_id + # 2020-04-21 00:03:18,951;assignments.batches.totalBatches;0;axcelerate.app_server_monitoring;k6m10054.esf4.3fsx7 + + tags = additional_tags + tags['pod'] = pod + tags['host'] = host + tags['source'] = source + tags['type'] = "number" + tags['build'] = get_build_from_file_in_zip(zip, file_in_zip) + + + csvSettings = json.dumps({ + "timeColumn": "time", + "valueColumn": "value", + "comment": '#', + "separator": ';', + "columnDefinitions": { + "columns": { + "time": {}, + "metric": {}, + "value": {}, + "project": {"postProcessors": ["LOWER_CASE"]}, + "trace_id": {"ignore" : True} + } + }, + "additionalTags": additional_tags + }) + + send_csv(zip, file_in_zip, csvSettings) + + +def send_performance_csv(zip, file_in_zip, source, pod, host, additional_tags): + # #HOSTNAME='172-28-37-108-VAPBDCOM01AA001.rmcloud.int (127.0.1.1)', BUILD='AXC_5.17_1109(12356525fb304c6c9057fd83476620053f518c51)@AXC_5.17_BRANCH', JAVA='OpenJDK 64-Bit Server VM (12.0.1+12)', CLASSPATH='/opt/apache/tomcat/bin/bootstrap.jar:/opt/apache/tomcat/bin/tomcat-juli.jar', PID='54' + # time;duration;method;project;user;trace_id + # 2020-04-21 00:02:11,329;0;HealthCheckService.isOperable;;;k6ufq4tg.8ert.110y8 + tags = additional_tags + tags['pod'] = pod + tags['host'] = host + tags['source'] = source + tags['type'] = "duration" + tags['build'] = get_build_from_file_in_zip(zip, file_in_zip) + + + csvSettings = json.dumps({ + "timeColumn": "time", + "valueColumn": "duration", + "comment": '#', + "separator": ';', + "columnDefinitions": { + "columns": { + "time": {}, + "duration": {}, + "method": {}, + "project": {"postProcessors": ["LOWER_CASE"]}, + "user": {"ignore" : True}, + "trace_id": {"ignore" : True} + } + }, + "additionalTags": additional_tags + }) + + send_csv(zip, file_in_zip, csvSettings) + + +def send_csv(zip, file_in_zip, csvSettings): + print(csvSettings) + os.makedirs('tmp', exist_ok=True) + tmp_folder = join('tmp',str(uuid.uuid4())) + try: + # zip file does not support seeking - this forces us to extract the file + csvFileToSend= zip.extract(file_in_zip, tmp_folder) + + with open(csvFileToSend, 'rb') as data: + m = MultipartEncoder( + fields={ + 'file': ('0.csv', data, 'text/csv'), + 'settings': ('csvReaderSettings.json', csvSettings , 'application/json') + }) + + r = requests.post('http://localhost:17333/api/data?waitUntilFinished=true', data=m, + headers={ + 'Accept': 'text/plain, application/json, application/*+json, */*', + 'Content-Type': m.content_type.replace("form-data", "mixed;charset=UTF-8") + }) + print("response:") + print(r) + finally: + shutil.rmtree(tmp_folder) + + + + +def parse_and_send_metric_csv_files(socket, zip, file_in_zip, source, pod, host, additional_tags): + date = "(?P\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d)" + method = "(?P[\S]+)" + duration = "(?P\d+)" + status = "(?P[\w]+)" + project = "project=(?P[^\s,\]]+)" + + regex = "%s.*- Executed %s in %s ms %s" % (date, method, duration, status) + regex_project = ".*%s" % project + + pattern = re.compile(regex) + pattern_project = re.compile(regex_project) + + + with zip.open(file_in_zip, "r") as f: + first_line = f.readline().decode('utf-8') + build = get_build(first_line) + + # example + # time;metric;value;project;trace_id + # 2018-10-31 09:36:12,428;applicationSync.applications;75;;jnwyp2jc.2f + + + csv_header_line = f.readline().decode('utf-8') + #print(csv_header_line) + header_columns = csv_header_line.split(";") + + time_index = header_columns.index("time") + metric_index = header_columns.index("metric") + value_index = header_columns.index("value") + project_index = header_columns.index("project") + + + for line in f: + line = line.decode('utf-8') + cols = line.split(";") + + event = { + "@timestamp": to_iso(cols[time_index]), + "pod": pod, + "host": host, + "source": source, + "type": "number", + "metric" : cols[metric_index], + "duration" : int(cols[value_index]), + } + if cols[project_index]: + event["project"] = cols[project_index].lower() + if build: + event["build"] = build + + event.update(additional_tags) + + out_line = str(event).replace("'", '"').encode('utf-8') + #print(out_line) + socket.sendall(out_line) + socket.sendall(b'\n') + +def parse_and_send_performance_log_files(socket, zip, file_in_zip, source, pod, host, additional_tags): + date = "(?P\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d)" + method = "(?P[\S]+)" + duration = "(?P\d+)" + status = "(?P[\w]+)" + project = "project=(?P[^\s,\]]+)" + + regex = "%s.*- Executed %s in %s ms %s" % (date, method, duration, status) + regex_project = ".*%s" % project + + pattern = re.compile(regex) + pattern_project = re.compile(regex_project) + + with zip.open(file_in_zip, "r") as f: + first_line = f.readline().decode('utf-8') + build = get_build(first_line) + + for line in f: + line = line.decode('utf-8') + match = pattern.match(line) + match_project = pattern_project.match(line) + + if match: + event = { + "@timestamp": to_iso(match.group("date")), + "pod": pod, + "host": host, + "source": source, + "type": "time", + "method" : match.group("method"), + "duration" : int(match.group("duration")), + "status" : match.group("status"), + } + if match_project: + event["project"] = match_project.group("project").lower() + if build: + event["build"] = build + + event.update(additional_tags) + + out_line = str(event).replace("'", '"').encode('utf-8') + #print(out_line) + socket.sendall(out_line) + socket.sendall(b'\n') + +def to_map(serialized_map): + if serialized_map == "": + return {} + + result = {} + tags = serialized_map.split(";") + + for tag in tags: + key_value = tag.split("=") + if len(key_value) == 2: + result[key_value[0]] = key_value[1] + + return result + + +def help(returnValue): + print(sys.argv[0] + ' -i -h -p ') + print('Examples: ') + print(sys.argv[0] + ' -i "/path/to/logs" -h "localhost" -p 17333') + sys.exit(returnValue) + + +def main(argv): + input_zips = [] + host='localhost' + port=17333 + try: + opts, args = getopt.getopt(argv,"i:h:p:",["input", "host", "port"]) + except getopt.GetoptError: + help(2) + for opt, arg in opts: + if opt == '--help': + help(0) + elif opt in ("-i", "--input"): + if os.path.isdir(arg): + input_zips = [join(arg, f) for f in os.listdir(arg) if os.path.isfile(join(arg, f)) and f.endswith(".zip")] + else: + input_zips = arg.split(' ') + elif opt in ("-h", "--host"): + host = arg + elif opt in ("-p", "--port"): + print(arg) + port = int(arg) + print('Input file(s) is "' + str(input_zips) + '"') + + for input_zip in input_zips: + if not os.path.isfile(input_zip) or not input_zip.endswith(".zip"): + print("input file is not a zip file: %s"% (input_zip)) + help(2) + + + if None == re.match('.+', host): + print("host must not be empty, but was %s" % host) + help(2) + + pods2hosts2sources = print_pod_host_and_source(input_zips) + answer = input("continue y or n ?") + if answer == 'y': + additional_tags = to_map(input("additional tags (e.g. test=lt_axc_5.16.02;keyword=RiC): ")) + print("using additional tags: %s" % additional_tags) + start = time.time() + for input_zip in input_zips: + print("start Zip: %s" % input_zip) + walk(input_zip, pods2hosts2sources, host, port, additional_tags) + end = time.time() + print("duration %s seconds" % (end - start)) + + +if __name__ == "__main__": + main(sys.argv[1:])