High Performance Log parsing in C# - First Attempt

Before I get too far into this, let me be up front, I went through a lot of iterations of this code. Some of these were really bad or really stupid. I’m going to tell you what I remember of the process no matter how dumb it may have been or how incompotent it may make me look. Why would I do this? Because creation is a process, and if you hide your mistakes, you won’t learn from them. If I hide my mistakes, how can I teach you not to make them?

How did this first attempt go? In short: bad.

So admittedly I went into this version with the lowest possible standards. With a mindset akin to:

Parallel All The Things!

As you can imagine, if you just throw “all teh tasks” at the problem it doesn’t turn out well. Once I had worked out the majority of the race conditions, I ended up with something that looked a bit like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var readerComplete = false;
var preParsedLines = new ConcurrentQueue<string>();
var parsedLines = new ConcurrentQueue<LogEntry>();
var readerTask = Task.Run(async () => 
{
    using(var fs = File.OpenRead("logFile.log"))
    using(var sr = new StreamReader(fs))
    {
        string line;
        while((line = await sr.ReadLineAsync()) != null)
        {
            preParsedLines.Enqueue(line);
        }
    }
    readerComplete = true;
});
var parserTask = Enumerable.Range(0,10).Select(x => Task.Run(async () =>
{
    while(preParsedLines.Any() || readerComplete == false)
    {
        string line;
        if (preParsedLines.TryDequeue(out line))
        {
            var entry = LogEntry.Parse(line);
            parsedLines.Enqueue(entry);
        }
    }
}));
var cacherTask = /* to be expanded upon later */
await readerTask;
await Task.WhenAll(parserTasks);

Now you may notice a few issues here:

  • I just throw 10 tasks at it and cross my fingers
  • Both queues are unbounded

So what happened when I ran this? Fire, brimstone, melting silicon, smoke gates exploding left and right. Ok, maybe not that bad, I hit 100% CPU while only achieving 3-5 MB/s in read throughput. To put this in perspective, my current implementation can achieve ~140 MB/s in this basic setup.

10 Tasks and Cross Your Fingers!

In this producer/consumer model, I’m assuming that we can produce a string for parsing 10x faster than I can parse it. It turns out, this isn’t the case. This results in a significant amount of contention chewing up processor time while yielding little in terms of useful work. Lesson number 1 for me in the async/await concurrency model. While I may not be explicitly calling things as bad as lock, there is locking occurring, the framework is just being really nice and hiding it from me. (Interesting tangent Why Skylake CPUs Are Sometimes 50% Slower – How Intel Has Broken Existing Code.)

Unbounded Queues!

What could go wrong? Turns out, just about exactly what you expect. I ended up with parsed entries sitting waiting to be dequeued by the CachingTask.

Caching

I tried to get creative with my caching the first run through. It ended up working pretty well. But it had some issues. Namely, compression algorithms tend to be very slow. And I wanted my cache files to be not just machine readable, but human readable as well, for debugging purposes. So I decided to combine some off the shelf components into a cache file.

This file is basically a zip file, with 1 compressed file for every 10,000 LogEntry objects. The LogEntry object is written as JSON to the file, one per line. This yields a structure something like:

Zip Container
    |- Batch1 Container
    |   |- LogEntry1
    |   |- LogEntry2
    |   |- ...
    |   |- LogEntry10000
    |- Batch2 Container
    |   |- LogEntry1
    |   |- LogEntry2
    |   |- ...
    |   |- LogEntry10000

Each entry LogEntry contains an extra crucial piece of information beyond the parsed data from the line, the original line number in the file. This allows us to tie a point on a graph to a line in a file. Now a user of the software is able to use their own tools to delve deeper into a particular line or set of lines in the file after looking at our charts.

So lets see how this fits into my parsing code above.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
var readerComplete = false;
var parserComplete = false;
var preParsedLines = new ConcurrentQueue<string>();
var parsedLines = new ConcurrentQueue<LogEntry>();
var readerTask = Task.Run(async () => 
{
    using(var fs = File.OpenRead("logFile.log"))
    using(var sr = new StreamReader(fs))
    {
        string line;
        while((line = await sr.ReadLineAsync()) != null)
        {
            preParsedLines.Enqueue(line);
        }
    }
    readerComplete = true;
});
var parserTask = Enumerable.Range(0,10).Select(x => Task.Run(async () =>
{
    while(preParsedLines.Any() || readerComplete == false)
    {
        string line;
        if (preParsedLines.TryDequeue(out line))
        {
            var entry = LogEntry.Parse(line);
            parsedLines.Enqueue(entry);
        }
    }
    parserComplete = true;
}));
var writerTask = Task.Run(async() => 
{
    /* What follows is pseudo code. I lost the actual code, but you'll get the gist. */
    using(var zf = new ZipFile("cache.zip"))
    {
        var batchNum = 0;
        var linesInBatch = new List<string>(10000);
        while(parsedLines.Any() || parserComplete == false)
        {
            LogEntry entry;
            if(parsedLines.TrDequeue(out entry))
            {
                linesInBatch.Add(JsonConvert.SerializeObject(entry));
            }
            if (linesInBatch == 10000 || parserComplete)
            {
                using(var ze = new ZipEntry(batchNum))
                using(var ms = new MemoryStream())
                using(var sr = new StreamWriter())
                {
                    foreach(var line in linesInBatch)
                    {
                        await sr.WriteLineAsync(line);
                    }
                    ms.CopyTo(ze.Stream);
                }

                batchNum++;
            }
        }
    }
})
var cacherTask = /* to be expanded upon later */
await readerTask;
await Task.WhenAll(parserTasks);
await writerTask;

Because of how Zip Files work, they’re not really thread safe, at least not the libraries I tried for this project. So much like the reader task being a bottleneck, the writer task is another new bottleneck.

Ultimately, this code is pretty simple, has a few problems, and got me 3-5 MB/s of read. It allowed me to get a POC on the table and taught me you can’t just throw Tasks at a problem and have it all automgically work. My next attempt was much better, took advantage of multiple kinds of parallel programming and avoided some of the memory issues present in this incarnation.

comments powered by Disqus