Using PySpark and AWS Glue to analyze multi-line log files

In this post I’m going to explain how you can analyze multiline-logs using PySpark and/or AWS Glue. If you’re not familiar with Glue yet, you can check out this article for an introduction.

One of the main challenges with log analyses is the peculiar file format. In lots of cases tools produce multiline log messages like these:

2021-12-02T14:00:00,000Z	DEBUG	This is
a message that
spans multiple linees
2021-12-02T14:00:01,000Z	DEBUG	Single-line-message
2021-12-02T14:00:02,000Z	DEBUG	Another message
2021-12-02T14:00:03,000Z	INFO	This is
another multi-line message

Why is this a challenge? When you read data in PySpark using most connectors, they operate on a line by line basis, i.e. you get a record in your data frame for each line in the file. So your data frame will look something like this:

+-------------------------------------------------------+
| 2021-12-02T14:00:00,000Z	DEBUG	This is             |
+-------------------------------------------------------+
| a message that                                        |
+-------------------------------------------------------+
| spans multiple linees                                 |
+-------------------------------------------------------+
| 2021-12-02T14:00:01,000Z	DEBUG	Single-line-message |
+-------------------------------------------------------+
| 2021-12-02T14:00:02,000Z	DEBUG	Another message     |
+-------------------------------------------------------+
| 2021-12-02T14:00:03,000Z	INFO	This is             |
+-------------------------------------------------------+
| another multi-line message                            |
+-------------------------------------------------------+

That makes meaningful analyses tricky. Data that belongs together is distributed across multiple records, because the message part of the log is spread out. Fortunately there is another approach that circumvents these issues.

In order to correctly process the logs, we need to treat the file as a whole when we read it. We have to use a reader that doesn’t create records from the individual lines in the files, but one record per file. Fortunately there is something that does exactly that.

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

import pyspark.sql.functions as f

# Change this to your data source
S3_INPUT_PATH = "s3a://<my-log-bucket>/"

SC = SparkContext.getOrCreate()
SPARK = SparkSession(SC)

# Load all files as individual records, i.e. each record has the path as _1 and the content as _2
logs_df = SC.wholeTextFiles(S3_INPUT_PATH).toDF()

The wholeTextFiles reader loads the files into a data frame with two columns. The column _1 contains the path to the file and _2 its content. (Note: I’d avoid printing the column _2 in jupyter notebooks, in most cases the content will be too much to handle.) This is important, because treating the file as a whole allows us to use our own splitting logic to separate the individual log records.

We can achieve this by using the split function in combination with the explode function like this:

multiline_str_df = logs_df.select(
    f.explode(
        f.split("_2", r"(?=\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2},\d{3}[\S\s]*\t)")
    ).alias("value")
)

What’s going on here is probably not that intuitive, since a regular expression is involved. Let’s talk about it. Now that we’re working on the content of the whole file, we need to split the file into log records. We can’t use new line characters for this precisely because of the reason outlined above. Instead we can use the timestamp, because each new record starts with a timestamp followed by the tab-separator (\t).

Unfortunately the regular split operator removes the character which we use to split a string, so splitting 11A11A11 at A would yield [11, 11, 11] and we’d lose the A. If we now split at the timestamp, we’d lose it, which is not good. This is where regular expressions can help. They allow for a look-ahead match. The details don’t really matter, but if you start a capture group with ?= it will match everything before the pattern. By using a look ahead capture group, we’re able to match everything before the timestamp:

(?=\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2},\d{3}[\S\s]*\t)

The next step is to move all the matches into rows in a data frame, which we can use explode for. It returns a new row for all matches. After running our code, the new dataframe multiline_str_df looks roughly like this:

+-------------------------------------------------------+
|                        value                          |
+-------------------------------------------------------+
| 2021-12-02T14:00:00,000Z	DEBUG	This is             |
| a message that                                        |
| spans multiple linees                                 |
+-------------------------------------------------------+
| 2021-12-02T14:00:01,000Z	DEBUG	Single-line-message |
+-------------------------------------------------------+
| 2021-12-02T14:00:02,000Z	DEBUG	Another message     |
+-------------------------------------------------------+
| 2021-12-02T14:00:03,000Z	INFO	This is             |
| another multi-line message                            |
+-------------------------------------------------------+

Now we can use another regex to filter the individual rows and extract timestamp, log level as well as the message:

REGEX_PATTERN = r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2},\d{3})[\S\s]*\t([\S\s]*?)\s*\t([\s\S]*)'
# 1: Timestamp
# 2: Log Level
# 3: Message


log_data_df = multiline_str_df.select(
    f.regexp_extract('value', REGEX_PATTERN, 1).alias('timestamp'),
    f.regexp_extract('value', REGEX_PATTERN, 2).alias('log_level'),
    f.regexp_extract('value', REGEX_PATTERN, 3).alias('message'),
)

Finally log_data_df will look like this and you can do further processing based on that:

+---------------------------------------------------------------+
| timestamp                | log_level | message                |
+---------------------------------------------------------------+
| 2021-12-02T14:00:00,000Z | DEBUG     | This is                |
|                          |           | a message that         |
|                          |           | spans multiple linees  |
+---------------------------------------------------------------+
| 2021-12-02T14:00:01,000Z | DEBUG     | Single-line-message    |
+---------------------------------------------------------------+
| 2021-12-02T14:00:02,000Z | DEBUG     | Another message        |
+---------------------------------------------------------------+
| 2021-12-02T14:00:03,000Z | INFO      | This is                |
|                          |           | another multi-line[...]|
+---------------------------------------------------------------+

Now that we’ve gotten our data into shape, it’s time to upload it to S3 and configure a Glue Job to process our log files in S3. If you’re not using Glue or S3 that’s also fine - the code is plain PySpark and not AWS-dependent.

Is this a one-size-fits-all solution? No, definitely not, but the approach is broadly applicable. In your case the identifier for new log records will most likely be different and you’ll have to adapt the regex to locate it. Just make sure to use a look-ahead group in the split logic (starting with ?=). I recommend you use something like regex101.com to tinker with your regular expression until it works as you want to use it.

Hopefully this helped you. If you have any feedback, feel free to reach out via the social media channels in my bio.

— Maurice

Similar Posts You Might Enjoy

What I wish somebody had explained to me before I started to use AWS Glue

There are many components under the Glue umbrella that can fit together into a cohesive big picture. In this introduction to Glue I’m explaining my version of this big picture. - by Maurice Borgmeier

Complexity costs: Read performance for nested DynamoDB items with different Lambda configurations

DynamoDB allows us to store complex data structures and deeply nested objects, but this complexity isn’t free. In this post we take a look at how different Lambda configurations impact the read times from boto3. We examine how different resource configurations can improve the read time of the same item by more than a factor of 12. - by Maurice Borgmeier

How to ingest MQTT data from VerneMQ into your Data Lake using IoT Core

This post explains how you can ingest data from a MQTT broker such as VerneMQ into your data lake via IoT Core and Kinesis Data Firehose. We’ll set up a data processing pipeline from start to finish in Terraform. - by Maurice Borgmeier