Calculating Moving Average (or other aggregations) using LINQ

A while ago I wrote about calculating moving average of live data using the Buffer() method in Reactive Extensions. The method creates a buffer of updates whose boundary you, as the developer specify, and provides you this running buffer with every subsequent update. The ‘current’ update would be last in the buffer, which allows you to calculate the inclusive moving average, one where the last factor is included in the results.

Recently, I had to do something similar with static data in an application which was not using RX, bud had a well-defined LINQ-based data provider. To my surprise, I found that LINQ does not provide a buffering method out of the box. To my greater surprise, I found that it was easy to implement one. The code is below.

I did not want to assume that we would only perform average aggregations, therefore, I kept my solution fairly generic. It would be up to the user of my function to code up the aggregation logic. In this post, I would demonstrate average aggregations, but the user of this function would be able to do anything with a given set of items.

First, let’s define a data structure, called Aggregation which would be used to wrap the original and add a new AggregatedValue property (line 16) that we would be populated during aggregation process. Then I define a sample TimedDataPoint which would be used to hold stock prices in my example.

public struct Aggregation<TSource, TValue>
{
	private TSource _dp;

	public Aggregation(TSource dataPoint)
	{
		_dp = dataPoint;
		AggregatedValue = default(TValue);
	}

	public TSource DataPoint
	{
		get { return _dp; }
	}

	public TValue AggregatedValue;
}

public struct TimedDataPoint<T>
{
	public DateTime Date { get; set; }

	public T Value { get; set; }

	public override string ToString()
	{
		return string.Format("{0} @ {1}", 
 			Date.ToShortDateString(), Value);
	}
}

Finally, let’s define a LINQ function that would buffer a set of items in the original set and would return a modified set with fewer items, where each item would contain an aggregation value for the buffered set.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MovingAverage.Linq
{
    public static class Aggregations
    {
        public static IEnumerable<Aggregation<TSource, TValue>> MovingAggregation<TSource, TValue>(
            this IEnumerable<TSource> source,
            Func<TSource, int, bool> shouldBuffer,
            Func<Aggregation<TSource, TValue>, TSource, IEnumerable<TSource>, TSource, int, TValue> projection)
        {
            Queue<TSource> buffer = new Queue<TSource>();
            TSource droppedItem = default(TSource);
            var runningAgg = default(Aggregation<TSource, TValue>);

            foreach (var item in source.TakeWhile(val => shouldBuffer(val, buffer.Count)))
            {
                buffer.Enqueue(item);
            }

            int skipCount = 0;
            foreach (var item in source.SkipWhile(val => shouldBuffer(val, skipCount++)))
            {
                //Add to the buffer for inclusion
                buffer.Enqueue(item);
                runningAgg = new Aggregation<TSource, TValue>(item) { AggregatedValue = projection(runningAgg, item, buffer, droppedItem, buffer.Count) };
                droppedItem = buffer.Dequeue();

                yield return runningAgg;
            }
        }
    }
}

To claim success, I need to test the function. To play with real numbers, let’s get the history of weekly MSFT stock returns from yahoo. Let’s export this data to excel and create set of moving averages of Close price to test our data. Here’s what the spreadsheet should look like:

Moving Average in Excel

We can then create a sample dataset to represent the original closing prices. We can then test our function my creating a moving average calculation on the dataset. The code is below.

First, I create the dataaset, then I iterate through the dataset, piping the data into my aggregator function, which in turn calls me back and asks me to make 2 important decisions:

1. Should it continue buffering? (Lines 60 and 71)
2. Given a buffer provide an aggregation.(Line 61 in the first example and lines 74-82 in the second)

The output of the sample should match the 6-day Avg column in the attached spreadsheet.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MovingAverage.Linq;

namespace MovingAverage
{
    class Program
    {
        static void Main(string[] args)
        {
            var datapoints = new List<TimedDataPoint<decimal>>()
      {
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 1,2), Value = 36.91m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 1,6), Value = 36.04m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 1,13), Value = 36.38m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 1,21), Value = 36.81m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 1,27), Value = 37.84m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 2,3), Value = 36.56m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 2,10), Value = 37.62m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 2,18), Value = 37.98m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 2,24), Value = 38.31m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 3,3), Value = 37.90m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 3,10), Value = 37.70m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 3,17), Value = 40.16m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 3,24), Value = 40.30m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 3,31), Value = 39.87m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 4,7), Value = 39.21m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 4,14), Value = 40.01m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 4,21), Value = 39.91m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 4,28), Value = 39.69m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 5,5), Value = 39.54m},
          new TimedDataPoint<decimal> 
          { Date = new DateTime(2014, 5,12), Value = 39.83m},
      };


            foreach (var item in datapoints.MovingAggregation<TimedDataPoint<decimal>, decimal>(
                    (item, count) => count < 5,
                    (seed, item, buffer, droppedoff, count) => buffer.Average(val => val.Value)))
            {
                Console.WriteLine("Value {0} MovingAverage {1:F3}",
                                    item.DataPoint,
                                    item.AggregatedValue);
            }

            Console.WriteLine("==================================");

            foreach (var item in datapoints.MovingAggregation<TimedDataPoint<decimal>, decimal>(
                    (item, count) => count < 5,
                    (aggSoFar, item, buffer, droppedoff, count) =>
                    {
                        if (aggSoFar.AggregatedValue == 0)
                        {
                            return buffer.Average(val => val.Value);
                        }
                        else
                        {
                            return aggSoFar.AggregatedValue +
                              (item.Value - droppedoff.Value) / count;
                        }
                    }))
            {
                Console.WriteLine("Value {0} MovingAverage {1:F3}",
                                    item.DataPoint,
                                    item.AggregatedValue);
            }
        }
    }
}

I found this aggregator function very useful. Hope you will too.

Leave a comment