Growing with the Web

Merge sort

Published , updated
Tags:

Merge sort is a sorting algorithm that runs in O(n \log n) time. It is a divide and conquer algorithm, so it can get the most out of today’s multi-cored systems. It works by continually splitting up the array until each item stands on its own. The items are then merged back with the items that they were split with in the correct order.

Merge sort example

Merge sort is also a stable sort, this means that if there are elements considered equal, they will be in the same order in the final list. This is illustrated in the below image, teal and blue (9) are in the same order in both the source and sorted lists.

Complexity

Time Space
Worst case Best case Average case Worst case
O(n \log n) O(n \log n) O(n \log n) O(n) auxiliary

Parallelism

Due to it being a divide and conquer algorithm the work can be easily split up and worked on using different threads, a very useful trait for high performance systems with large amounts of data.

Here is an example of how you could run merge sort using two threads to potentially halve the runtime of the middle section.

Parallel merge sort

Code

Top-down

The top-down variant is considered the ‘normal’ merge sorting algorithm.

public class MergeSort<T> : IGenericSortingAlgorithm<T> where T : IComparable
{
    public void Sort(IList<T> list) {
        var sorted = InternalSort(list);
        for (var i = 0; i < list.Count; i++) {
            list[i] = sorted[i]; 
        }
    }
        
    public IList<T> InternalSort(IList<T> list)
    {
        if (list.Count <= 1)
            return list;

        int middle = list.Count / 2;
        IList<T> left = new T[middle];
        IList<T> right = new T[list.Count - middle];

        for (int i = 0; i < left.Count; i++) {
            left[i] = list[i];
        }
        for (int i = 0; i < right.Count; i++) {
            right[i] = list[i + left.Count];
        }

        left = InternalSort(left);
        right = InternalSort(right);

        return Merge(left, right);
    }

    private T[] Merge(IList<T> left, IList<T> right) {
        T[] result = new T[left.Count + right.Count];
        int leftIndex = 0;
        int rightIndex = 0;
        int resultIndex = 0;
        while (leftIndex < left.Count || rightIndex < right.Count) {
            if (leftIndex < left.Count && rightIndex < right.Count) {
                if (left[leftIndex].CompareTo(right[rightIndex]) <= 0) {
                    result[resultIndex++] = left[leftIndex++];
                } else {
                    result[resultIndex++] = right[rightIndex++];
                }
            } else if (leftIndex < left.Count) {
                result[resultIndex++] = left[leftIndex++];
            } else if (rightIndex < right.Count) {
                result[resultIndex++] = right[rightIndex++];
            }
        }
        return result;
    }
}
public class MergeSort {
    public static Integer[] sort(Integer[] array) {
        if (array.length <= 1)
            return array;

        int middle = array.length / 2;
        Integer[] left = new Integer[middle];
        Integer[] right = new Integer[array.length - middle];

        for (int i = 0; i < left.length; i++) {
            left[i] = array[i];
        }
        for (int i = 0; i < right.length; i++) {
            right[i] = array[i + left.length];
        }

        left = sort(left);
        right = sort(right);

        return merge(left, right);
    }

    public static Integer[] merge(Integer[] left, Integer[] right) {
        Integer[] result = new Integer[left.length + right.length];
        int leftIndex = 0;
        int rightIndex = 0;
        int resultIndex = 0;
        while (leftIndex < left.length || rightIndex < right.length) {
            if (leftIndex < left.length && rightIndex < right.length) {
                if (left[leftIndex] <= right[rightIndex]) {
                    result[resultIndex++] = left[leftIndex++];
                } else {
                    result[resultIndex++] = right[rightIndex++];
                }
            } else if (leftIndex < left.length) {
                result[resultIndex++] = left[leftIndex++];
            } else if (rightIndex < right.length) {
                result[resultIndex++] = right[rightIndex++];
            }
        }
        return result;
    }
}
/**
 * Merges two arrays in to a new array.
 * @param {Array} left The first array.
 * @param {Array} left The second array.
 * @param {function} compare The compare function.
 * @return The merged array.
 */
function merge(left, right, compare) {
  var result = [];
  var leftIndex = 0;
  var rightIndex = 0;

  while (leftIndex < left.length || rightIndex < right.length) {
    if (leftIndex < left.length && rightIndex < right.length) {
      if (compare(left[leftIndex], right[rightIndex]) <= 0) {
        result.push(left[leftIndex++]);
      } else {
        result.push(right[rightIndex++]);
      }
    } else if (leftIndex < left.length) {
      result.push(left[leftIndex++]);
    } else if (rightIndex < right.length) {
      result.push(right[rightIndex++]);
    }
  }
  return result;
}

/**
 * Sorts an array using top down merge sort.
 * @param {Array} array The array to sort.
 * @param {function} compare The compare function.
 * @returns The sorted array.
 */
function sort(array, compare) {
  if (array.length <= 1) {
    return array;
  }

  var i;
  var middle = Math.floor(array.length / 2);
  var left = new Array(middle);
  var right = new Array(array.length - middle);

  for (i = 0; i < left.length; i++) {
    left[i] = array[i];
  }
  for (i = 0; i < right.length; i++) {
    right[i] = array[i + left.length];
  }

  return merge(sort(left, compare), sort(right, compare), compare);
}
def default_compare(a, b):
  if a < b:
    return -1
  elif a > b:
    return 1
  return 0

def sort(array, compare=default_compare):
  if len(array) <= 1:
    return array

  middle = math.floor(len(array) / 2)
  left = []
  right = []

  for i in range(0, middle):
    left.append(array[i])
  for i in range(middle, len(array)):
    right.append(array[i])

  return merge(sort(left, compare), sort(right, compare), compare)

def merge(left, right, compare):
  result = []
  leftIndex = 0
  rightIndex = 0

  while leftIndex < len(left) or rightIndex < len(right):
    if leftIndex < len(left) and rightIndex < len(right):
      if compare(left[leftIndex], right[rightIndex]) <= 0:
        result.append(left[leftIndex])
        leftIndex += 1
      else:
        result.append(right[rightIndex])
        rightIndex += 1
    elif leftIndex < len(left):
      result.append(left[leftIndex])
      leftIndex += 1
    elif rightIndex < len(right):
      result.append(right[rightIndex])
      rightIndex += 1

  return result
module MergeSort
  # Sorts an array using merge sort, returning the sorted array.
  def self.sort(array, compare = lambda { |a, b| a <=> b })
    if array.length <= 1
      return array
    end

    middle = (array.length / 2).floor
    left = array.take(middle)
    right = Array.new(array.length - middle)
    (0..right.length - 1).each do |i|
      right[i] = array[i + left.length]
    end

    return self.merge(self.sort(left, compare), self.sort(right, compare), compare)
  end

  # Sorts an array using merge sort, overriding the original array.
  def self.sort!(array, compare = lambda { |a, b| a <=> b })
    result = self.sort(array, compare)
    (0..array.length - 1).each do |i|
      array[i] = result[i]
    end
  end

private

  # Merges two arrays in to a new array.
  def self.merge(left, right, compare)
    result = []
    left_index = 0
    right_index = 0
    while left_index < left.length or right_index < right.length
      if left_index < left.length and right_index < right.length
        if compare.call(left[left_index], right[right_index]) <= 0
          result.push(left[left_index])
          left_index += 1
        else
          result.push(right[right_index])
          right_index += 1
        end
      elsif left_index < left.length
        result.push(left[left_index])
        left_index += 1
      elsif right_index < right.length
        result.push(right[right_index])
        right_index += 1
      end
    end
    return result
  end
end

Bottom-up

Bottom up is a non-recursive variant of merge sort that splits the array into chunks of size one, uses an in-place merge to merge adjacent elements and then repeats with the chunks doubling in size.

public class MergeSortBottomUp<T> : IGenericSortingAlgorithm<T> where T : IComparable
{
    public void Sort(IList<T> list) {
        IList<T> workList = new T[list.Count];
        int chunkSize = 1;
        while (chunkSize < list.Count) {
            int i = 0;
            while (i < list.Count - chunkSize) {
                Merge(list, i, chunkSize, workList);
                i += chunkSize * 2;
            }
            chunkSize *= 2;
        }
    }

    private void Merge(IList<T> list, int leftPosition, int chunkSize, IList<T> workList) {
        int rightPosition = leftPosition + chunkSize;
        int endPosition = Math.Min(leftPosition + chunkSize * 2 - 1, list.Count - 1);
        int leftIndex = leftPosition;
        int rightIndex = rightPosition;

        for (int i = 0; i <= endPosition - leftPosition; i++) {
            if (leftIndex < rightPosition &&
                    (rightIndex > endPosition ||
                    list[leftIndex].CompareTo(list[rightIndex]) <= 0)) {
                workList[i] = list[leftIndex++];
            } else {
                workList[i] = list[rightIndex++];
            }
        }

        for (int i = leftPosition; i <= endPosition; i++) {
            list[i] = workList[i - leftPosition];
        }
    }
}
public class MergeSortBottomUp {
    public static void sort(Integer[] array) {
        Integer[] workArray = new Integer[array.length];
        int chunkSize = 1;
        while (chunkSize < array.length) {
            int i = 0;
            while (i < array.length - chunkSize) {
                merge(array, i, chunkSize, workArray);
                i += chunkSize * 2;
            }
            chunkSize *= 2;
        }
    }

    public static void merge(Integer[] array, int leftPosition, int chunkSize, Integer[] workArray) {
        int rightPosition = leftPosition + chunkSize;
        int endPosition = Math.min(leftPosition + chunkSize * 2 - 1, array.length - 1);
        int leftIndex = leftPosition;
        int rightIndex = rightPosition;

        for (int i = 0; i <= endPosition - leftPosition; i++) {
            if (leftIndex < rightPosition &&
                    (rightIndex > endPosition ||
                    array[leftIndex] <= array[rightIndex])) {
                workArray[i] = array[leftIndex++];
            } else {
                workArray[i] = array[rightIndex++];
            }
        }

        for (int i = leftPosition; i <= endPosition; i++) {
            array[i] = workArray[i - leftPosition];
        }
    }
}
/**
 * Merge a portion of the array.
 * @param {Array} array The array to merge.
 * @param {number} leftPosition The starting index of the array to merge.
 * @param {number} chunkSize The size of the portion of the array to merge.
 * @param {Array} workArray A work array the size of the array argument, this
 * parameter is for performance reasons so many arrays aren't created by the
 * algorithm.
 * @param {function} compare The compare function.
 * @returns The sorted array.
 */
function bottomUpMerge(
    array, leftPosition, chunkSize, workArray, compare) {
  var i;
  var rightPosition = leftPosition + chunkSize;
  var endPosition = Math.min(leftPosition + chunkSize * 2 - 1,
                             array.length - 1);
  var leftIndex = leftPosition;
  var rightIndex = rightPosition;

  for (i = 0; i <= endPosition - leftPosition; i++) {
    if (leftIndex < rightPosition &&
        (rightIndex > endPosition ||
        compare(array[leftIndex], array[rightIndex]) <= 0)) {
      workArray[i] = array[leftIndex++];
    } else {
      workArray[i] = array[rightIndex++];
    }
  }

  for (i = leftPosition; i <= endPosition; i++) {
    array[i] = workArray[i - leftPosition];
  }
}

/**
 * Sorts an array using bottom up merge sort.
 * @param {Array} array The array to sort.
 * @param {function} compare The compare function.
 * @returns The sorted array.
 */
function sort(array, compare) {
  var workArray = new Array(array.length);
  var chunkSize = 1;
  while (chunkSize < array.length) {
    var i = 0;
    while (i < array.length - chunkSize) {
      bottomUpMerge(array, i, chunkSize, workArray, compare);
      i += chunkSize * 2;
    }
    chunkSize *= 2;
  }
  return array;
}
def default_compare(a, b):
  if a < b:
    return -1
  elif a > b:
    return 1
  return 0

def sort(array, compare=default_compare):
  workArray = [0] * len(array)
  chunkSize = 1

  while chunkSize < len(array):
    i = 0
    while i < len(array) - chunkSize:
      bottomUpMerge(array, i, chunkSize, workArray, compare)
      i += chunkSize * 2
    chunkSize *= 2

  return array

def bottomUpMerge(array, leftPosition, chunkSize, workArray, compare):
  rightPosition = leftPosition + chunkSize
  endPosition = min(leftPosition + chunkSize * 2 - 1, len(array) - 1)
  leftIndex = leftPosition
  rightIndex = rightPosition

  for i in range(0, endPosition - leftPosition + 1):
    if leftIndex < rightPosition and \
        (rightIndex > endPosition or \
         compare(array[leftIndex], array[rightIndex]) <= 0):
      workArray[i] = array[leftIndex]
      leftIndex += 1
    else:
      workArray[i] = array[rightIndex]
      rightIndex += 1

  for i in range(leftPosition, endPosition + 1):
    array[i] = workArray[i - leftPosition]
module MergeSortBottomUp
  # Sorts an array using bottom up merge sort.
  def self.sort(array, compare = lambda { |a, b| a <=> b })
    work_array = Array.new(array.length)
    chunk_size = 1
    while chunk_size < array.length
      i = 0
      while i < array.length - chunk_size
        self.merge(array, i, chunk_size, work_array, compare)
        i += chunk_size * 2
      end
      chunk_size *= 2
    end
  end

private

  # Merges two arrays in to a new array.
  def self.merge(array, left_position, chunk_size, work_array, compare)
    right_position = left_position + chunk_size
    end_position = [left_position + chunk_size * 2 - 1, array.length - 1].min
    left_index = left_position
    right_index = right_position
    (0..end_position - left_position).each do |i|
      if left_index < right_position and
          (right_index > end_position or
           compare.call(array[left_index], array[right_index]) <= 0)
        work_array[i] = array[left_index]
        left_index += 1
      else
        work_array[i] = array[right_index]
        right_index += 1
      end
    end
    (left_position..end_position).each do |i|
      array[i] = work_array[i - left_position]
    end
  end
end

This algorithm could be improved by alternating array and workArray whenever chunkSize is incremented. I didn’t want to do this in the above code as I wanted to focus more on the implementation of the algorithm, not so much optimisation.

Comments

comments powered by Disqus
Like this article?
Subscribe for more!