Biglist for Single-Machine, Out-of-Memory Long Sequence

When dealing with large amounts of data, a common situation is that the amount of data is manageable on a single machine, but can be unwieldy to be loaded at once into memory, for example, when the data size on disk is above 10GB and growing.

To address this problem, we need a solution to persist these data on disk, and only load parts of it into memory as needed. In addition, we should not have to store all the data in a single file, which will become unmanageable sooner or later. Instead, we should use a number of files in a way that is transparent to the user. The number of files will grow with the amount of data, limited only by the disk size.

In this post, we’ll tackle a specific yet very common case, which is, the data consist of a long sequence of “records” (or “elements”). The idea is to build a tool to operate on such data, and the operations are better as close to a regular list as possible. We are going to create a Python utility called biglist for this.

At the start, these are the design requirements:

  1. It takes a directory as the storage location. What happens within the directory is pretty transparent to the user.
  2. It supports append and extend, just like the built-in list.
  3. It supports element access by index, slice, and iteration.
  4. It does not need to support mutation of existing elements, that is, it’s append-only.
  5. The type of elements are very general, as long as they can be pickled, because we’ll store them using pickle.

Let’s get started.

Laying the groundwork

On a very high level, on-disk we’ll have a small info file for meta data—such as number of files—and a data store for a series of data files. We are going to name them 0.pickle, 1.pickle, etc. In-memory we’re going to have a buffer for reading, and another buffer for appending. The size of both buffers is up to the size of a single file on disk, which is indicated by the number of elements; this is specified by batch_size.

The append-buffer is always “moving forward”: elements are appended to it until their count reaches batch_size; at that point the buffer’s content will saved in a file, and the buffer will restart empty, waiting for new elements.

Independent of append, we may want to access any element of the list at any time. Depending on the element’s index, we will load the relevant file into the read-buffer to provision the element.

First, we need to create a few helper functions:

import json
import os, os.path
import pickle
from typing import Any


def json_load(path: str, *path_elements) -> Any:
    with open(os.path.join(path, *path_elements), 'r') as f:
        return json.load(f)


def pickle_load(path: str, *path_elements) -> Any:
    with open(os.path.join(path, *path_elements), 'rb') as f:
        return pickle.load(f)


def prepare_path(path: str, *path_elements):
    ff = os.path.join(path, *path_elements)
    dirname = os.path.dirname(os.path.abspath(ff))
    if not os.path.isdir(dirname):
        os.makedirs(dirname)
    return ff


def json_dump(x: Any, path: str, *path_elements) -> None:
    ff = prepare_path(path, *path_elements)
    with open(ff, 'w') as f:
        json.dump(x, f)


def pickle_dump(x: Any, path: str, *path_elements) -> None:
    ff = prepare_path(path, *path_elements)
    with open(ff, 'wb') as f:
        pickle.dump(x, f, protocol=pickle.HIGHEST_PROTOCOL)

Our main class starts like this:

import shutil
import tempfile
from typing import Iterable, List, Union


class Biglist:
    def __init__(
            self,
            path: str = None,
            batch_size: int = None,
            ):
        self.path = path or tempfile.mkdtemp()
        assert self.path.startswith('/')

        self.file_lengths = []
        self.cum_file_lengths = [0]

        if os.path.isdir(self.path):
            z = os.listdir(self.path)
            if z:  
                # directory is not empty; open for reading and/or appending.
                if not os.path.isfile(self.info_file) or not os.path.isdir(self.data_dir):
                    raise RuntimeError(f"path '{self._path}' is not empty "
                            f"but is not a valid {self.__class__.__name__} folder")
                info = json_load(self.info_file)

                self.file_lengths = info['file_lengths']
                batch_size = info['batch_size']
                for n in self.file_lengths:
                    self.cum_file_lengths.append(self.cum_file_lengths[-1] + n)
            else: 
                # directory is empty
                os.makedirs(self.data_dir)
        else:
            # directory does not exist
            os.makedirs(self.data_dir)

        if batch_size is None:
            batch_size = 10000
        else:
            assert batch_size > 0
        self.batch_size = batch_size
        self._read_buffer = None
        self._read_buffer_file_idx = None
        # `self._read_buffer` contains the content of the file 
        # indicated by `self._read_buffer_file_idx`.
        self._append_buffer = None

    @property
    def info_file(self) -> str:
        return os.path.join(self.path, 'info.json')

    @property
    def data_dir(self) -> str:
        return os.path.join(self.path, 'store')

    def data_file(self, file_idx: int) -> str:
        return os.path.join(self.data_dir, str(file_idx) + '.pickle')

    def __len__(self) -> int:
        n = self.cum_file_lengths[-1]
        if self._append_buffer:
            return n + len(self._append_buffer)
        return n

    def __bool__(self) -> bool:
        return len(self) > 0

Note that there are two situations while initializing a Biglist object. In one situation, the specified location is nonexistent or empty–in this case, a new Biglist is being created. In another situation, the specified location is the storage of a previously created Biglist–in this case, meta info gets read in, and the new Biglist object is ready to read existing data, in addition to append new data.

Writing data to files

An element is added to the end of the list by the method append. The element is first appended to the in-memory list self._append_buffer. Once the length of the buffer reaches self.batch_size, the method flush is called (as a final step in the method append) to dump the buffer to a file. After flush, the buffer will restart empty. As such, the length of self._append_buffer will never exceed self.batch_size. However, it can be anything below this size, because the user is allowed to call flush anytime.

Before diving into append, let’s get flush taken care of:

class Biglist:

    def flush(self) -> None:
        if not self._append_buffer:
            return

        buffer_len = len(self._append_buffer)
        pickle_dump(self._append_buffer, self.data_file(len(self.file_lengths)))
        self.file_lengths.append(buffer_len)
        self.cum_file_lengths.append(self.cum_file_lengths[-1] + buffer_len)
        json_dump(
            {'file_lengths': self.file_lengths,
             'batch_size': self.batch_size,
             },
            self.info_file)

        if buffer_len == self.batch_size:
            self._append_buffer = []
        else:
            self._append_buffer = None

The final segment of the method sets the append-buffer to an empty list or None, depending on whether the append-buffer was at capacity. As a result, if the append-buffer is an empty list, it must have been set by flush just after writing a full-length append-buffer. Consequently, the currently final data file has “full length”, meaning it contains as many as self.batch_size elements.

On the other hand, if the append-buffer is None, it either has been set in flush after writing a partial-length data file, or has not been touched since __init__, which sets it to None. In the second case, the currently final data file on disk may or may not be partial. Consider the case where we initialize a Biglist object to read an existing Biglist that has a partial-length final data file.

These two empty values of the append-buffer are useful indicators, as we’ll see shortly in append.

When we are done with appending elements to the list, it’s entirely possible that the append-buffer does not happen to be at capacity, in which case flush was not called when its last element was appended. As a result, the partial append-buffer has not been persisted. We could ask the user to call flush. As a safeguard, we also add a call to flush` when the object goes out of scope.

class Biglist:

    def __del__(self):
        self.flush()

With this foundation in place, we are now ready to add elements to the list.

Appending elements

If the append-buffer has some elements but has not reached capacity, append simply appends to the buffer and calls flush as needed. Simple enough.

If the append-buffer is an empty list, as has been explained above, it’s guaranteed that the currently final data file has “full length”. It’s in good shape to start filling a new append-buffer, hence append also simply appends to the append-buffer and calls flush as needed.

It requires more thinking if the append-buffer is None. This happens either a previous call to flush has written a partial data file, or the buffer has not need touched since being created with value None in __init__. In the second case, the currently final data file, if any, may also be partial.

It is not essential to make all data files (except the final one) contain the same number of elements. However, this regularity is nice to have. Moreover, if we provide this guarantee, then some index calculations could be simplified.

We choose to provide this guarantee by the method called _init_append_buffer. With this, append and extend are easy.

class Biglist:

    def _init_append_buffer(self) -> None:
        if self.file_lengths and self.file_lengths[-1] < self.batch_size:
            self._append_buffer = pickle_load(
                self.data_file(len(self.file_lengths) - 1))

            if self._read_buffer_file_idx == len(self.file_lengths) - 1:
                self._read_buffer_file_idx = None
                self._read_buffer = None

            # Note:
            # do not delete the last data file.
            # the next call to `flush` will overwrite this file.

            self.file_lengths.pop()
            self.cum_file_lengths.pop()
        else:
            self._append_buffer = []

    def append(self, x) -> None:
        if self._append_buffer is None:
            self._init_append_buffer()

        self._append_buffer.append(x)
        if len(self._append_buffer) >= self.batch_size:
            self.flush()
            # Note that `flush` resets `self._append_buffer`.

    def extend(self, x: Iterable) -> None:
        for v in x:
            self.append(v)

For the purpose of reading, the content of the Biglist consists of all the disk files plus the append-buffer. In _init_append_buffer, if the final data file has partial length, its content is loaded into the append-buffer, and that file is dropped from the bookkeeping lists self.file_lengths and self.cum_file_lengths. We also check whether the read-buffer contains the content of the partial data file. If it does, we null the read buffer. This could happen if we have accessed elements in the final file when the append-buffer has been None since __init__.

Accessing a random element

Our Biglist implements the “Sequence Protocol”, which, besides the special methods __len__, requires the method __getitem__ for accessing arbitrary element by index. The main logic in this task involves determining which data file contains the requested element. This is done in the method _get_file_idx_for_item, listed below.

class Biglist:

    def _load_file_to_buffer(self, file_idx: int):
        self._read_buffer = pickle_load(self.data_file(file_idx))
        self._read_buffer_file_idx = file_idx

    def _get_file_idx_for_item(self, idx: int) -> int:
        if idx >= self.cum_file_lengths[-1]:
            return len(self.file_lengths)
            # This suggests the requested element at index `idx`
            # resides in `self._append_buffer`.
        if self._read_buffer_file_idx is None:
            for k, n in enumerate(self.cum_file_lengths):
                if idx < n:
                    return k-1
        elif idx < self.cum_file_lengths[self._read_buffer_file_idx]:
            for k in range(self._read_buffer_file_idx - 1, -1, -1):
                if idx >= self.cum_file_lengths[k]:
                    return k
        elif idx >= self.cum_file_lengths[self._read_buffer_file_idx + 1]:
            for k in range(self._read_buffer_file_idx + 2, len(self.cum_file_lengths)):
                if idx < self.cum_file_lengths[k]:
                    return k - 1
        else:
            return self._read_buffer_file_idx

    def __getitem__(self, idx: int):
        '''
        Element access by single index; negative index works as expected.
        '''
        if not isinstance(idx, int):
            raise TypeError('A single integer index is expected. To use slice, use `view`.')

        idx = range(len(self))[idx]
        file_idx = self._get_file_idx_for_item(idx)

        if file_idx >= len(self.file_lengths):
            return self._append_buffer[idx - self.cum_file_lengths[-1]]

        if file_idx != self._read_buffer_file_idx:
            self._load_file_to_buffer(file_idx)

        n1 = self.cum_file_lengths[file_idx]
        n2 = self.cum_file_lengths[file_idx + 1]
        assert n1 <= idx < n2
        return self._read_buffer[idx - n1]

A couple details need some explanation. First, the whole Biglist is represented by the in-memory file list plus the append-buffer (refer to the method __len__ above). The requested element may fall in the append-buffer.

Second, when the current read-buffer contains the content of a data file, and the requested element is not in that file, we need to find out which file contains it. This can be done based on the element counts of each data file. Instead of a linear search from the first file to the last file, we search backward from the current file (i.e. the file corresponding to the current read-buffer) to the first file, or forward from the current file to the last file, based on whether the requested element lies before or after the current file. This approach is because, presumably, element access tends to be in sequential order in most use cases.

Iteration

Iteration can be trivially implemented using __len__ and __getitem__. However, __getitem__ contains some logic to jump to any arbitrary index, which is not needed in a sequential walk-through. The solution below walks through all the data files one by one, followed by a walk-through of the append-buffer.

class Biglist:

    def __iter__(self):
        for file_idx in range(len(self.file_lengths)):
            if file_idx == self._read_buffer_file_idx:
                buffer = self._read_buffer
            else:
                buffer = load_pickle(self.data_file(file_idx))
            yield from buffer

        if self._append_buffer is not None:
            yield from self._append_buffer

Slicing

By convention, slicing returns a new object of the same type as the original object. Let’s verify by a couple examples:

>>> x = range(13)
>>> y = x[-5:]
>>> type(y)
<class 'range'>
>>> type(x)
<class 'range'>
>>> 
>>> xx = list(x)
>>> yy = xx[2:7]
>>> type(yy)
<class 'list'>
>>> type(xx)
<class 'list'>
>>> 

For Biglist, we want slicing to return some kind of a read-only “view” into the list, therefore it can not be of the class Biglist. (For example, we don’t want to support append in this view.) For this reason, the __getitem__ method does not accept a slice as the argument.

Instead, we have a simple method view that returns an instance of a different class—let’s name it BiglistView—which supports element access by index, slice, and iteration. In line with the aforementioned convention, slicing a BiglistView returns a new BiglistView.

As a first cut, we start off like this:

class Biglist:

    def view(self):
        return BiglistView(self)


class BiglistView:

    def __init__(self, biglist: Biglist, range_: range = None):
        self._list = biglist
        if range_ is None:
            range_ = range(len(biglist))
        self._range = range_

    def __len__(self):
        return len(self._range)

    def __bool__(self) -> bool:
        return len(self) > 0

    def __getitem__(self, idx: Union[int, slice]):
        if isinstance(idx, int):
            return self._list[self._range[idx]]

        if not isinstance(idx, slice):
            raise TypeError(f"an integer or slice is expected")

        return self.__class__(
            self._list,
            view_slice_range(self._range, idx)
            )

    def __iter__(self):
        for idx in self._range:
            yield self._list[idx]

This listing contains a yet-to-be-defined function view_slice_range, which we’ll come to in a bit.

BiglistView takes a Biglist object and a range value, which specifies the “window” into the Biglist. When range_ is missing, it defaults to the entire range of the Biglist. However, once we slice the BiglistView object (in __getitem__ with an argument of type slice), we need to work out the new window into the original Biglist. Some examples can make this concrete.

x = Biglist()
x.extend(range(100))
v = x.view()
v1 = v[10:20]
v2 = v1[::2]
v3 = v2[-2:]

In v, the indices specified by v._range apparently are 0, 1,..., 99.

In v1, the indices are 10, 11,..., 19.

In v2, the indices are 10, 12, 14, 16, 18.

In v3, the indices are 16, 18.

BiglistView.__init__ expects a range in terms of the original, entire Biglist. When we slice a BiglistView, we are slicing the range Biglist._range. We need to turn this slice on BiglistView._range into a simple range on the original Biglist. Then we can create a new BiglistView.

This slice-to-range conversion happens in BiglistView.__getitem__ and is accomplished by the function view_slice_range. The function name means “given a view and a slice, return the range that is the slice on the view”.

We rarely manipulate the Python classes range and slice. According to the official documentation, range has three forms. With a single argument:

>>> list(range(0))
[]
>>> list(range(1))
[0]
>>> list(range(4))
[0, 1, 2, 3]
>>> list(range(-4))
[]

With two arguments:

>>> list(range(2, 6))
[2, 3, 4, 5]
>>> list(range(6, 2))
[]
>>> list(range(3, 3))
[]
>>> list(range(-3, 2))
[-3, -2, -1, 0, 1]

With three arguments:

>>> list(range(2, 5, 1))
[2, 3, 4]
>>> list(range(1, 8, 3))
[1, 4, 7]
>>> list(range(1, 8, -1))
[]
>>> list(range(8, 1, -2))
[8, 6, 4, 2]

The slice class is very similar to range, except that a slice object is almost never created by directly calling slice(...). Instead, it is created for us when we call __getitem__ with the “colon” syntax like x[2:4], or x[2:8:2].

At first, I wrote some very careful logic for view_slice_range and I got it right with a few dozen lines of code. That code has been used for quite some time. Just as I was writing this post, I realized that we can index or slice a range, and it will do the right thingTM:

>>> x = range(20)
>>> x
range(0, 20)
>>> x[8]
8
>>> x[-4:]
range(16, 20)
>>> x[::2]
range(0, 20, 2)
>>> x[::2][::-1]
range(18, -2, -2)

For the concrete example above, we have

>>> v = range(100)
>>> v1 = v[10:20]
>>> v2 = v1[::2]
>>> v3 = v2[-2:]
>>> list(v1)
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
>>> list(v2)
[10, 12, 14, 16, 18]
>>> list(v3)
[16, 18]

With this enlightenment, the function view_slice_range becomes totally trivial:

def view_slice_range(view_range: range, idx_on_view: slice):
    return view_range[idx_on_view]

Really this doesn’t even need to be a function!

Supporting parallel processing

The Biglist class is designed for large amounts of data, for example 30GB split into 300 files. Now we don’t have to load the entire data into memory at once, and the data don’t need to be contained in a single file. We have gained some scalability here. Processing this data can take a long time, though. A natural next step is to go parallel and process multiple segments of the data simultaneously.

Now we have BiglistView to provide a “window” to part of the list. Can we open multiple windows and use them to work on multiple parts of the list at the same time? Conceptually, yes, but the design of BiglistView has flaws.

First, it contains a reference to the “host” Biglist. As we read elements of a Biglist, it loads the relevant data file as needed. If we have multiple BiglistViews referencing the same Biglist object, then when the views read different parts of the list, the single underlying list will have to keep loading different files to satisfy the views, whichever is accessing a certain element at the moment. This will be extremely inefficient. It will not work. A view can not reference a shared Biglist object. It has to read data independent of other views.

Second, it may come time when we want to use multiprocessing and send a BiglistView to another process. When this happens, the BiglistView object will be pickled and transmitted across the process boundary. It is best that this object does not contain another object of a custom class, like Biglist, the reason being, as the design of Biglist evolves, who knows what kind of things it will loop in (and they will all need to be pickled up and transmitted).

A solution is to not initiate BiglistView with a Biglist object. Instead, just provide the file path and let the BiglistView create a Biglist object itself. Further, don’t create the Biglist object in BiglistView.__init__—which is run to initialize the view object before the object is sent to the other process. Instead, delay the creation of a Biglist object to the time when it is actually needed for element access, and that happens only after the BiglistView object has landed in the other process.

So we make some revisions like below:


class BiglistView:

    def __init__(self, path: str, range_: range = None):
        self._path = path
        self._list = None
        self._range = range_

    def _open_list(self):
        self._list = Biglist(self._path)
        if self._range is None:
            self._range = range(len(self._list))

    def __getitem__(self, idx: Union[int, slice]):
        if self._list is None:
            self._open_list()

        if isinstance(idx, int):
            return self._list[self._range[idx]]

        if not isinstance(idx, slice):
            raise TypeError(f"an integer or slice is expected")

        return self.__class__(self._path, self._range[idx])

    def __iter__(self):
        if self._list is None:
            self._init_list()

        for idx in self._range:
            yield self._list[idx]

    def __len__(self) -> int:
        if self._range is None:
            self._open_list()
        return len(self._range)

    # ... other methods same as above ...


class Biglist:

    def view(self):
        return BiglistView(self.path)

When we want concurrent processing, we simply chop up the whole range of the Biglist into a series of BiglistViews. The views will read their elements independent of all the other views. It will work.

Can we do better?

Think about how we are going to chop up the Biglist. Noticing the list is backed by a series of files, one logical way is to create one view per file. This way, each file is loaded into memory exactly once. In order to do this, we need to work out the index range for each file, which is easy. Further, we may as well add a method to Biglist to iterate over a particular file. It will be used in a BiglistView when that view is accessing exactly the elements of one file.

class Biglist:

    def iterfile(self, file_idx):
        assert 0 <= file_idx < len(self.file_lengths)
        if file_idx == self._read_buffer_file_idx:
            yield from self._read_buffer
        else:
            yield from pickle_load(self.data_file(file_idx))

Now add a method to return a BiglistView for a particular file:

class Biglist:

    def fileview(self, file_idx: int) -> 'BiglistView':
        assert not self._append_buffer
        assert 0 <= file_idx < len(self.file_lengths)
        return BiglistView(
            self.path,
            range(self.cum_file_lengths[file_idx], self.cum_file_lengths[file_idx+1]),
            )

and a method to return a list of views covering the entire Biglist:

class Biglist:

    def fileviews(self) -> List['BiglistView']:
        return [
            self.fileview(i)
            for i in range(len(self.file_lengths))
        ]

One more thing, we did not tell BiglistView that its range covers exactly one file of the host Biglist. As a result, when we access an element or iterate over the view, the element index is treated totally generically: Biglist.__getitem__ is called, which determines which file contains the element, whether the file needs to be loaded, and such. When we iterate over the view—and that is the most common use case—this index logic is largely a waste of time. A much better approach is to use Biglist.iterfile. To that end, BiglistView needs to determine whether its ranges covers exactly one file of the host Biglist. The following revision does this.

class BiglistView:

    def _fileview_idx(self):
        if self._range.step != 1:
            return None
        try:
            idx = self._list.cum_file_lengths.index(self._range.start)
        except ValueError:
            return None
        if idx >= len(self._list.cum_file_lengths) - 1:
            return None
        if self._range.stop != self._list.cum_file_lengths[idx + 1]:
            return None
        return idx

    def __iter__(self):
        if self._list is None:
            self._open_list()

        file_idx = self._fileview_idx()
        if file_idx is None:
            for idx in self._range:
                yield self._list[idx]
        else:
            yield from self._list.iterfile(file_idx)

We could also overload the range_ parameter of BiglistView.__init__, and let it indicate a file index when it’s an int. That overloading might be a little ugly, but it works, too.

Written on April 5, 2020