Source code for rs_mrt_dau_utilities.ip_analysis.ip_analysis

import base64
import gzip
import json
import logging
import re

import fast_json_normalize
import polars as pl

# parse a SCPI result obtained with FETCh:DATA:MEASurement:IPANalysis:RESult?
# return a list of pattern: ['time', json_messages']
# example of use: Print the parsed sequences
# parsed_sequences = parse_scpi_result(scpi_result)
# for sequence in parsed_sequences:
#    print(f"Time: {sequence['time']}")
#    for message in sequence['json_messages']:
#        print(f"message: {message}")
#        #print(json.dumps(message, indent=2))
#    print()


[docs] def ipanalysis_parse_scpi_result(scpi_result: str) -> list[dict]: """ Processes a given SCPI result string by splitting it into sequences based on a time pattern and SCPI block. Args: scpi_result (str): A string containing the SCPI result data. Returns: list: A list of dictionaries, each containing a time and a list of parsed JSON messages. """ # Split the input into sequences based on the pattern: time, SCPI block sequences = re.split(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})",', scpi_result)[1:] # Initialize a list to store the parsed sequences parsed_sequences = [] # print(f"split len: {len(sequences)}") # Iterate over the sequences in pairs (time, SCPI block) for i in range(0, len(sequences), 2): time = sequences[i] scpi_block_with_len = sequences[i + 1] # remove the length of the block data scpi_block = re.split(r"#(\d+)", scpi_block_with_len)[2] # print(f"time_json_messages: {time_json_messages}") time_json_messages = ipanalysis_parse_json_result(time, scpi_block) # Store the time and parsed JSON messages in the result list parsed_sequences.append(time_json_messages) return parsed_sequences
[docs] def ipanalysis_parse_json_result(time: str, encoded_json_block: str) -> dict: """ Processes a base64 block: - obtain the binary gzip string - decompress the gzip string - process the JSON message string by splitting it into individual JSON messages and parsing each message. Args: time (str): A string representing the time associated with the JSON messages. encoded_json_block (str): A base64 block who is a gzip string containing the JSON messages, separated by newline characters. Returns: dict: A dictionary containing the time and a list of parsed JSON messages. """ decoded_scpi_block = base64.b64decode(encoded_json_block) decompressed_data = gzip.decompress(decoded_scpi_block) # Split the SCPI block into individual JSON messages json_block = decompressed_data.decode("utf-8") json_messages = json_block.strip().split("\n") # Parse each JSON message parsed_json_messages = [] for message in json_messages: try: parsed_json_messages.append(json.loads(message)) except json.JSONDecodeError: print(f"\njson.JSONDecodeError: {message}") continue # Store the time and parsed JSON messages in the result return {"time": time, "json_messages": parsed_json_messages}
[docs] def ipanalysis_parse_scpi_schema_result(schema_result: str) -> dict | None: """ Parses the SCPI schema result string and extracts the JSON schema. Args: schema_result (str): A string containing the SCPI schema result. Returns: dict: A dictionary representing the parsed JSON schema, or None if the schema is not found or if there is an error in parsing. """ try: # Find the index of '{"$schema"' start_index = schema_result.find('{"$schema"') if start_index != -1: # Create a new string starting from '{"$schema"' json_schema_str = schema_result[start_index:].strip() json_schema = json.loads(json_schema_str) return json_schema else: logging.warning( 'The keyword {"$schema"} was not found in the input string.' ) return None except json.JSONDecodeError as e: logging.error(f"Error decoding JSON schema: {e}") return None except Exception as e: logging.error(f"An unexpected error occurred: {e}") return None
[docs] def ipanalysis_init_dataframes() -> dict[str, pl.DataFrame]: """ Initializes and returns a dictionary of empty Polars DataFrames for IP analysis. Returns: dict: A dictionary containing empty DataFrames for various categories. """ return { "flow_started": pl.DataFrame(), "report": pl.DataFrame(), "upd_classification": pl.DataFrame(), "upd_network": pl.DataFrame(), "upd_fqdn": pl.DataFrame(), "upd_ssl": pl.DataFrame(), "flow_closed": pl.DataFrame(), }
[docs] def ipanalysis_update_dataframes( list_of_dfs: dict[str, pl.DataFrame], message: dict ) -> dict[str, pl.DataFrame]: """ Updates the dictionary of Polars DataFrames based on the contents of a given message. Args: list_of_dfs (dict): A dictionary containing Polars DataFrames for various categories (ipanalysis_init_dataframes may be used to get the initial values). message (dict): A dictionary containing the message data to be processed. Returns: dict: The updated dictionary of Polars DataFrames. """ data = message msgs = [] # default key = "report" # test for a REPORT if "REPORT" in data: for i in data["REPORT"]["flows_stat"]: i["time"] = ( data["REPORT"]["time"]["secs"] * 1000000000 + data["REPORT"]["time"]["nanos"] ) msgs.append(i) # test for a FLOW_STARTED elif "FLOW_STARTED" in data: msgs = [data["FLOW_STARTED"]] key = "flow_started" # test for a CLASSIFICATION elif "UPDATE_CLASSIFICATION" in data: msgs = [data["UPDATE_CLASSIFICATION"]] key = "upd_classification" # test for a NETWORK elif "UPDATE_NETWORK" in data: msgs = [data["UPDATE_NETWORK"]] key = "upd_network" # test for a FQDN elif "UPDATE_FQDN" in data: msgs = [data["UPDATE_FQDN"]] key = "upd_fqdn" # test for a FQDN elif "UPDATE_SSL" in data: msgs = [data["UPDATE_SSL"]] key = "upd_ssl" # test for a FLOW_CLOSED elif "FLOW_CLOSED" in data: msgs = [data["FLOW_CLOSED"]] key = "flow_closed" # normalize the data for i in msgs: # test if 'time' key has not been replaced if isinstance(i["time"], dict): i["time"] = i["time"]["secs"] * 1000000000 + i["time"]["nanos"] msg_norm = fast_json_normalize.fast_json_normalize( i, separator="_", to_pandas=False, order_to_pandas=False, ) # convert the time to datetime with the correct timezone msg_df = pl.from_dicts([msg_norm]).with_columns( time=pl.from_epoch("time", time_unit="ns").dt.replace_time_zone("UTC") ) list_of_dfs[key] = pl.concat([list_of_dfs[key], msg_df], how="diagonal_relaxed") return list_of_dfs