{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import json\n", "from urllib.parse import unquote\n", "import glob\n", "import plotly.express as xp\n", "import matplotlib.pyplot as plt\n", "import seaborn as sns\n", "sns.set_style(\"white\")\n", "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Initializations - Finction Definitions" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "#Katrin log path\n", "path = '/mnt/ands/logs/katrin'\n", "all_files = glob.glob(path + \"/*.log\")\n", "update_request_type = 'SERVICE(update)'\n", "get_request_type = 'SERVICE(get)'\n", "downaload_request_type = 'SERVICE(download)'\n", "\n", "#function to filterlog data based on request type\n", "def filter_log(df, request_type):\n", " logs=df[0].str.split(\" \",1)\n", " logs = pd.DataFrame(logs.values.tolist())\n", " data = pd.DataFrame(logs.apply(lambda logs: logs[1][:int(logs[0])-2], axis=1).str.split(\" \").values.tolist())\n", " data.columns=[\"Timestamp\", \"Severity\", \"Setup\", \"Server\", \"Source\", \"Target\", \"SessionID\", \"Request\",\"Latency\", \"PID\", \"ClientID\", \"Len\", \"MsgLen\"]\n", " data_filtered = data.loc[data[\"Source\"] == request_type]\n", " data_filtered[\"Content\"] = logs.iloc[data_filtered.index][1]\n", " return data_filtered\n", "\n", "#function to load query logs of type 'SERVICE(update)'\n", "def process_update_katrin_log(df): \n", " data_filtered = filter_log(df, update_request_type)\n", " \n", " data_filtered[\"Message\"] = data_filtered.apply(lambda data_filtered: data_filtered[\"Content\"][-int(data_filtered[\"Len\"])+int(data_filtered[\"MsgLen\"])+2: ], axis=1)\n", " data_filtered[\"Content\"] = data_filtered.apply(lambda data_filtered: data_filtered[\"Content\"][:len(data_filtered[\"Content\"]) -int(data_filtered[\"Len\"])+int(data_filtered[\"MsgLen\"])+1], axis=1)\n", " data_filtered[\"db_server\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"db_server\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"srctree\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"srctree\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"width\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"width\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"height\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"height\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"db_name\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"db_name\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"db_group\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"db_group\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"control_group\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"control_group\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"db_mask\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"db_mask\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"window\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"window\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"aggregation\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"aggregation\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"resample\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"resample\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered[\"mask_mode\"] = data_filtered.apply(lambda data_filtered: json.loads(json.loads(unquote(data_filtered[\"Message\"]), strict=False)['POST']['props'], strict=False)[\"mask_mode\"] if 'POST' in json.loads(unquote(data_filtered[\"Message\"]), strict=False) else'' , axis=1)\n", " data_filtered.drop(['Message','MsgLen', 'Len'], axis=1, inplace = True) \n", " return data_filtered\n", "\n", "#function to load query logs of type 'SERVICE(get)'\n", "def process_get_katrin_log(df) :\n", " data_filtered = filter_log(df, get_request_type)\n", " data_filtered[\"Content\"] = logs.iloc[data_filtered.index][1]\n", " data_filtered[\"Message\"] = data_filtered.apply(lambda data_filtered: data_filtered[\"Content\"][-int(data_filtered[\"Len\"])+int(data_filtered[\"MsgLen\"])+2: ], axis=1)\n", " data_filtered[\"rt\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"rt\") , axis=1)\n", " data_filtered[\"db_name\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"db_name\") , axis=1)\n", " data_filtered[\"db_group\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"db_group\") , axis=1)\n", " data_filtered[\"srctree\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"srctree\") , axis=1)\n", " data_filtered[\"db_mask\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"db_mask\") , axis=1)\n", " data_filtered[\"window\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"window\") , axis=1)\n", " data_filtered[\"resample\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"resample\") , axis=1)\n", " data_filtered.drop(['MsgLen', 'Severity', 'Len','Target', 'Content', 'Message'], axis=1, inplace = True)\n", " return data_filtered\n", "\n", "#function to load query logs of type 'SERVICE(download)'\n", "def process_download_katrin_log(df) :\n", " data_filtered = filter_log(df, downaload_request_type)\n", " data_filtered[\"Content\"] = logs.iloc[data_filtered.index][1]\n", " data_filtered = data_filtered.loc[~data_filtered['Content'].isnull()]\n", " if len(data_filtered) >= 1:\n", " data_filtered[\"Message\"] = data_filtered.apply(lambda data_filtered: data_filtered[\"Content\"][-int(data_filtered[\"Len\"])+int(data_filtered[\"MsgLen\"])+2: ], axis=1)\n", " data_filtered[\"Target\"] = data_filtered.apply(lambda data_filtered: json.loads(unquote(data_filtered[\"Message\"]), strict=False)['GET'].get(\"target\") , axis=1)\n", " data_filtered = data_filtered.loc[data_filtered['Target'] == 'dlmanager_add']\n", " data_filtered.drop(['Content', 'MsgLen', 'Len'], axis=1, inplace = True)\n", " return data_filtered\n", "\n", "#function to chunk a log file and process each chunck individually\n", "def process_chunks(filename, request_type):\n", " chunksize = 10 ** 6\n", " i=0\n", " with pd.read_csv(filename, chunksize=chunksize, sep=r'\\t', engine='python', header = None) as reader:\n", " for chunk in reader: \n", " if (request_type == update_request_type):\n", " processed_df = process_update_katrin_log(chunk)\n", " elif (request_type == get_request_type):\n", " processed_df = process_get_katrin_log(chunk)\n", " elif (request_type == get_request_type):\n", " processed_df = process_download_katrin_log(chunk)\n", " if i == 0: \n", "\n", " df = processed_df\n", " else:\n", " df = df.append(processed_df, ignore_index=True) \n", " i+=1\n", " return df\n", "\n", "\n", "def process_all_files(request_type):\n", " i = 0\n", " for filename in all_files:\n", " if i == 0:\n", " df = process_chunks(filename, update_request_type)\n", " else:\n", " df = df.append(process_chunks(filename, update_request_type), ignore_index=True) \n", " i+=1\n", " return df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Loading KATRIN Logs\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading service(update) logs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ ":15: SettingWithCopyWarning: \n", "A value is trying to be set on a copy of a slice from a DataFrame.\n", "Try using .loc[row_indexer,col_indexer] = value instead\n", "\n", "See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy\n", " data_filtered[\"Content\"] = logs.iloc[data_filtered.index][1]\n" ] } ], "source": [ "processed_update_logs = process_all_files(update_request_type)\n", "processed_update_logs.to_csv(\"katrin_update.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading service(get) logs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "processed_get_logs = process_all_files(get_request_type)\n", "processed_get_logs.to_csv(\"katrin_get.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading service(download) logs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "processed_download_logs = process_all_files(download_request_type)\n", "processed_download_logs.to_csv(\"katrin_download.csv\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 5 }