Biglist for Single-Machine, Out-of-Memory Long Sequence
When dealing with large amounts of data, a common situation is that the amount of data is manageable on a single machine, but can be unwieldy to be loaded at once into memory, for example, when the data size on disk is above 10GB and growing.
To address this problem, we need a solution to persist these data on disk, and only load parts of it into memory as needed. In addition, we should not have to store all the data in a single file, which will become unmanageable sooner or later. Instead, we should use a number of files in a way that is transparent to the user. The number of files will grow with the amount of data, limited only by the disk size.
In this post, we’ll tackle a specific yet very common case,
which is, the data consist of a long sequence of “records” (or “elements”).
The idea is to build a tool to operate on such data,
and the operations are better as close to a regular list
as possible.
We are going to create a Python utility called biglist
for this.
At the start, these are the design requirements:
- It takes a directory as the storage location. What happens within the directory is pretty transparent to the user.
- It supports
append
andextend
, just like the built-inlist
. - It supports element access by index, slice, and iteration.
- It does not need to support mutation of existing elements, that is, it’s append-only.
- The type of elements are very general, as long as they can be pickled,
because we’ll store them using
pickle
.
Let’s get started.
Laying the groundwork
On a very high level, on-disk we’ll have a small info file for meta data—such as number of files—and a data store for a series of data files.
We are going to name them 0.pickle
, 1.pickle
, etc.
In-memory we’re going to have a buffer for reading, and another buffer for appending.
The size of both buffers is up to the size of a single file on disk,
which is indicated by the number of elements; this is specified by batch_size
.
The append-buffer is always “moving forward”: elements are appended to it
until their count reaches batch_size
; at that point the buffer’s content will saved in a file, and the buffer will restart empty, waiting for new elements.
Independent of append
, we may want to access any element of the list at any time. Depending on the element’s index, we will load the relevant file into the read-buffer to provision the element.
First, we need to create a few helper functions:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import json
import os, os.path
import pickle
from typing import Any
def json_load(path: str, *path_elements) -> Any:
with open(os.path.join(path, *path_elements), 'r') as f:
return json.load(f)
def pickle_load(path: str, *path_elements) -> Any:
with open(os.path.join(path, *path_elements), 'rb') as f:
return pickle.load(f)
def prepare_path(path: str, *path_elements):
ff = os.path.join(path, *path_elements)
dirname = os.path.dirname(os.path.abspath(ff))
if not os.path.isdir(dirname):
os.makedirs(dirname)
return ff
def json_dump(x: Any, path: str, *path_elements) -> None:
ff = prepare_path(path, *path_elements)
with open(ff, 'w') as f:
json.dump(x, f)
def pickle_dump(x: Any, path: str, *path_elements) -> None:
ff = prepare_path(path, *path_elements)
with open(ff, 'wb') as f:
pickle.dump(x, f, protocol=pickle.HIGHEST_PROTOCOL)
Our main class starts like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import shutil
import tempfile
from typing import Iterable, List, Union
class Biglist:
def __init__(
self,
path: str = None,
batch_size: int = None,
):
self.path = path or tempfile.mkdtemp()
assert self.path.startswith('/')
self.file_lengths = []
self.cum_file_lengths = [0]
if os.path.isdir(self.path):
z = os.listdir(self.path)
if z:
# directory is not empty; open for reading and/or appending.
if not os.path.isfile(self.info_file) or not os.path.isdir(self.data_dir):
raise RuntimeError(f"path '{self._path}' is not empty "
f"but is not a valid {self.__class__.__name__} folder")
info = json_load(self.info_file)
self.file_lengths = info['file_lengths']
batch_size = info['batch_size']
for n in self.file_lengths:
self.cum_file_lengths.append(self.cum_file_lengths[-1] + n)
else:
# directory is empty
os.makedirs(self.data_dir)
else:
# directory does not exist
os.makedirs(self.data_dir)
if batch_size is None:
batch_size = 10000
else:
assert batch_size > 0
self.batch_size = batch_size
self._read_buffer = None
self._read_buffer_file_idx = None
# `self._read_buffer` contains the content of the file
# indicated by `self._read_buffer_file_idx`.
self._append_buffer = None
@property
def info_file(self) -> str:
return os.path.join(self.path, 'info.json')
@property
def data_dir(self) -> str:
return os.path.join(self.path, 'store')
def data_file(self, file_idx: int) -> str:
return os.path.join(self.data_dir, str(file_idx) + '.pickle')
def __len__(self) -> int:
n = self.cum_file_lengths[-1]
if self._append_buffer:
return n + len(self._append_buffer)
return n
def __bool__(self) -> bool:
return len(self) > 0
Note that there are two situations while initializing a Biglist
object.
In one situation, the specified location is nonexistent or empty–in this case,
a new Biglist is being created.
In another situation, the specified location is the storage of a previously created Biglist–in this case, meta info gets read in, and the new Biglist
object
is ready to read existing data, in addition to append new data.
Writing data to files
An element is added to the end of the list by the method append
.
The element is first appended to the in-memory list self._append_buffer
.
Once the length of the buffer reaches self.batch_size
,
the method flush
is called (as a final step in the method append
) to dump the buffer to a file.
After flush
, the buffer will restart empty.
As such, the length of self._append_buffer
will never exceed self.batch_size
.
However, it can be anything below this size,
because the user is allowed to call flush
anytime.
Before diving into append
, let’s get flush
taken care of:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Biglist:
def flush(self) -> None:
if not self._append_buffer:
return
buffer_len = len(self._append_buffer)
pickle_dump(self._append_buffer, self.data_file(len(self.file_lengths)))
self.file_lengths.append(buffer_len)
self.cum_file_lengths.append(self.cum_file_lengths[-1] + buffer_len)
json_dump(
{'file_lengths': self.file_lengths,
'batch_size': self.batch_size,
},
self.info_file)
if buffer_len == self.batch_size:
self._append_buffer = []
else:
self._append_buffer = None
The final segment of the method sets the append-buffer to an empty list or None
,
depending on whether the append-buffer was at capacity.
As a result, if the append-buffer is an empty list, it must have been
set by flush
just after writing a full-length append-buffer.
Consequently, the currently final data file has “full length”,
meaning it contains as many as self.batch_size
elements.
On the other hand, if the append-buffer is None
,
it either has been set in flush
after writing a partial-length data file,
or has not been touched since __init__
, which sets it to None
.
In the second case, the currently final data file on disk may or may not be partial.
Consider the case where we initialize a Biglist
object to read an existing Biglist
that has a partial-length final data file.
These two empty values of the append-buffer are useful indicators,
as we’ll see shortly in append
.
When we are done with appending elements to the list,
it’s entirely possible that the append-buffer does not happen to be at capacity,
in which case flush
was not called when its last element was appended.
As a result, the partial append-buffer has not been persisted.
We could ask the user to call flush
.
As a safeguard, we also add a call to flush
` when the object goes out of scope.
1
2
3
4
class Biglist:
def __del__(self):
self.flush()
With this foundation in place, we are now ready to add elements to the list.
Appending elements
If the append-buffer has some elements but has not reached capacity,
append
simply appends to the buffer and calls flush
as needed.
Simple enough.
If the append-buffer is an empty list, as has been explained above,
it’s guaranteed that the currently final data file has “full length”.
It’s in good shape to start filling a new append-buffer,
hence append
also simply appends to the append-buffer and calls flush
as needed.
It requires more thinking if the append-buffer is None
.
This happens either a previous call to flush
has written a partial data file,
or the buffer has not need touched since being created with value None
in __init__
.
In the second case, the currently final data file, if any, may also be partial.
It is not essential to make all data files (except the final one) contain the same number of elements. However, this regularity is nice to have. Moreover, if we provide this guarantee, then some index calculations could be simplified.
We choose to provide this guarantee by the method called _init_append_buffer
.
With this, append
and extend
are easy.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Biglist:
def _init_append_buffer(self) -> None:
if self.file_lengths and self.file_lengths[-1] < self.batch_size:
self._append_buffer = pickle_load(
self.data_file(len(self.file_lengths) - 1))
if self._read_buffer_file_idx == len(self.file_lengths) - 1:
self._read_buffer_file_idx = None
self._read_buffer = None
# Note:
# do not delete the last data file.
# the next call to `flush` will overwrite this file.
self.file_lengths.pop()
self.cum_file_lengths.pop()
else:
self._append_buffer = []
def append(self, x) -> None:
if self._append_buffer is None:
self._init_append_buffer()
self._append_buffer.append(x)
if len(self._append_buffer) >= self.batch_size:
self.flush()
# Note that `flush` resets `self._append_buffer`.
def extend(self, x: Iterable) -> None:
for v in x:
self.append(v)
For the purpose of reading, the content of the Biglist consists of all the disk files
plus the append-buffer.
In _init_append_buffer
, if the final data file has partial length,
its content is loaded into the append-buffer, and that file is dropped from the bookkeeping
lists self.file_lengths
and self.cum_file_lengths
.
We also check whether the read-buffer contains the content of the partial data file.
If it does, we null the read buffer.
This could happen if we have accessed elements in the final file when the append-buffer
has been None
since __init__
.
Accessing a random element
Our Biglist
implements the
“Sequence Protocol”,
which, besides the special methods __len__
, requires the method
__getitem__
for accessing arbitrary element by index.
The main logic in this task involves determining which data file contains
the requested element. This is done in the method _get_file_idx_for_item
,
listed below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Biglist:
def _load_file_to_buffer(self, file_idx: int):
self._read_buffer = pickle_load(self.data_file(file_idx))
self._read_buffer_file_idx = file_idx
def _get_file_idx_for_item(self, idx: int) -> int:
if idx >= self.cum_file_lengths[-1]:
return len(self.file_lengths)
# This suggests the requested element at index `idx`
# resides in `self._append_buffer`.
if self._read_buffer_file_idx is None:
for k, n in enumerate(self.cum_file_lengths):
if idx < n:
return k-1
elif idx < self.cum_file_lengths[self._read_buffer_file_idx]:
for k in range(self._read_buffer_file_idx - 1, -1, -1):
if idx >= self.cum_file_lengths[k]:
return k
elif idx >= self.cum_file_lengths[self._read_buffer_file_idx + 1]:
for k in range(self._read_buffer_file_idx + 2, len(self.cum_file_lengths)):
if idx < self.cum_file_lengths[k]:
return k - 1
else:
return self._read_buffer_file_idx
def __getitem__(self, idx: int):
'''
Element access by single index; negative index works as expected.
'''
if not isinstance(idx, int):
raise TypeError('A single integer index is expected. To use slice, use `view`.')
idx = range(len(self))[idx]
file_idx = self._get_file_idx_for_item(idx)
if file_idx >= len(self.file_lengths):
return self._append_buffer[idx - self.cum_file_lengths[-1]]
if file_idx != self._read_buffer_file_idx:
self._load_file_to_buffer(file_idx)
n1 = self.cum_file_lengths[file_idx]
n2 = self.cum_file_lengths[file_idx + 1]
assert n1 <= idx < n2
return self._read_buffer[idx - n1]
A couple details need some explanation.
First, the whole Biglist is represented by the in-memory file list
plus the append-buffer (refer to the method __len__
above).
The requested element may fall in the append-buffer.
Second, when the current read-buffer contains the content of a data file, and the requested element is not in that file, we need to find out which file contains it. This can be done based on the element counts of each data file. Instead of a linear search from the first file to the last file, we search backward from the current file (i.e. the file corresponding to the current read-buffer) to the first file, or forward from the current file to the last file, based on whether the requested element lies before or after the current file. This approach is because, presumably, element access tends to be in sequential order in most use cases.
Iteration
Iteration can be trivially implemented using __len__
and __getitem__
.
However, __getitem__
contains some logic to jump to any arbitrary index,
which is not needed in a sequential walk-through.
The solution below walks through all the data files one by one,
followed by a walk-through of the append-buffer.
1
2
3
4
5
6
7
8
9
10
11
12
class Biglist:
def __iter__(self):
for file_idx in range(len(self.file_lengths)):
if file_idx == self._read_buffer_file_idx:
buffer = self._read_buffer
else:
buffer = load_pickle(self.data_file(file_idx))
yield from buffer
if self._append_buffer is not None:
yield from self._append_buffer
Slicing
By convention, slicing returns a new object of the same type as the original object. Let’s verify by a couple examples:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> x = range(13)
>>> y = x[-5:]
>>> type(y)
<class 'range'>
>>> type(x)
<class 'range'>
>>>
>>> xx = list(x)
>>> yy = xx[2:7]
>>> type(yy)
<class 'list'>
>>> type(xx)
<class 'list'>
>>>
For Biglist
,
we want slicing to return some kind of a read-only “view” into the list,
therefore it can not be of the class Biglist
.
(For example, we don’t want to support append
in this view.)
For this reason, the __getitem__
method does not accept a slice as the argument.
Instead, we have a simple method view
that returns an instance of a different class—let’s name it BiglistView
—which supports element access by index, slice, and iteration.
In line with the aforementioned convention, slicing a BiglistView
returns a new BiglistView
.
As a first cut, we start off like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Biglist:
def view(self):
return BiglistView(self)
class BiglistView:
def __init__(self, biglist: Biglist, range_: range = None):
self._list = biglist
if range_ is None:
range_ = range(len(biglist))
self._range = range_
def __len__(self):
return len(self._range)
def __bool__(self) -> bool:
return len(self) > 0
def __getitem__(self, idx: Union[int, slice]):
if isinstance(idx, int):
return self._list[self._range[idx]]
if not isinstance(idx, slice):
raise TypeError(f"an integer or slice is expected")
return self.__class__(
self._list,
view_slice_range(self._range, idx)
)
def __iter__(self):
for idx in self._range:
yield self._list[idx]
This listing contains a yet-to-be-defined function view_slice_range
,
which we’ll come to in a bit.
BiglistView
takes a Biglist
object and a range
value,
which specifies the “window” into the Biglist.
When range_
is missing, it defaults to the entire range of the Biglist.
However, once we slice the BiglistView
object
(in __getitem__
with an argument of type slice
),
we need to work out the new window into the original Biglist.
Some examples can make this concrete.
1
2
3
4
5
6
x = Biglist()
x.extend(range(100))
v = x.view()
v1 = v[10:20]
v2 = v1[::2]
v3 = v2[-2:]
In v
, the indices specified by v._range
apparently are 0, 1,..., 99
.
In v1
, the indices are 10, 11,..., 19
.
In v2
, the indices are 10, 12, 14, 16, 18
.
In v3
, the indices are 16, 18
.
BiglistView.__init__
expects a range in terms of the original, entire Biglist.
When we slice a BiglistView
, we are slicing the range Biglist._range
.
We need to turn this slice on BiglistView._range
into a simple range on the original Biglist.
Then we can create a new BiglistView
.
This slice-to-range conversion happens in BiglistView.__getitem__
and is accomplished by the function view_slice_range
.
The function name means “given a view and a slice, return the range that is the slice on the view”.
We rarely manipulate the Python classes range
and slice
.
According to the official documentation,
range
has three forms.
With a single argument:
1
2
3
4
5
6
7
8
>>> list(range(0))
[]
>>> list(range(1))
[0]
>>> list(range(4))
[0, 1, 2, 3]
>>> list(range(-4))
[]
With two arguments:
1
2
3
4
5
6
7
8
>>> list(range(2, 6))
[2, 3, 4, 5]
>>> list(range(6, 2))
[]
>>> list(range(3, 3))
[]
>>> list(range(-3, 2))
[-3, -2, -1, 0, 1]
With three arguments:
1
2
3
4
5
6
7
8
>>> list(range(2, 5, 1))
[2, 3, 4]
>>> list(range(1, 8, 3))
[1, 4, 7]
>>> list(range(1, 8, -1))
[]
>>> list(range(8, 1, -2))
[8, 6, 4, 2]
The slice
class
is very similar to range
, except that a slice object is almost never created by directly calling slice(...)
.
Instead, it is created for us when we call __getitem__
with the “colon” syntax like x[2:4]
, or x[2:8:2]
.
At first, I wrote some very careful logic for view_slice_range
and I got it right
with a few dozen lines of code. That code has been used for quite some time.
Just as I was writing this post,
I realized that we can index or slice a range
,
and it will do the right thingTM:
1
2
3
4
5
6
7
8
9
10
11
>>> x = range(20)
>>> x
range(0, 20)
>>> x[8]
8
>>> x[-4:]
range(16, 20)
>>> x[::2]
range(0, 20, 2)
>>> x[::2][::-1]
range(18, -2, -2)
For the concrete example above, we have
1
2
3
4
5
6
7
8
9
10
>>> v = range(100)
>>> v1 = v[10:20]
>>> v2 = v1[::2]
>>> v3 = v2[-2:]
>>> list(v1)
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
>>> list(v2)
[10, 12, 14, 16, 18]
>>> list(v3)
[16, 18]
With this enlightenment, the function view_slice_range
becomes totally trivial:
1
2
def view_slice_range(view_range: range, idx_on_view: slice):
return view_range[idx_on_view]
Really this doesn’t even need to be a function!
Supporting parallel processing
The Biglist
class is designed for large amounts of data, for example 30GB split into 300 files.
Now we don’t have to load the entire data into memory at once, and the data don’t need to be
contained in a single file. We have gained some scalability here.
Processing this data can take a long time, though.
A natural next step is to go parallel and process multiple segments of the data simultaneously.
Now we have BiglistView
to provide a “window” to part of the list.
Can we open multiple windows and use them to work on multiple parts of the list at the same time?
Conceptually, yes, but the design of BiglistView
has flaws.
First, it contains a reference to the “host” Biglist. As we read elements of a Biglist
,
it loads the relevant data file as needed. If we have multiple BiglistView
s referencing the
same Biglist
object, then when the views read different parts of the list, the single underlying list will have to keep loading different files to satisfy the views, whichever is accessing a certain element at the moment. This will be extremely inefficient. It will not work.
A view can not reference a shared Biglist
object.
It has to read data independent of other views.
Second, it may come time when we want to use multiprocessing
and send a BiglistView
to another process. When this happens, the BiglistView
object will be pickled and transmitted across the process boundary. It is best that this object does not contain another object of a custom class, like Biglist
, the reason being, as the design of Biglist
evolves, who knows what kind of things it will loop in (and they will all need to be pickled up and transmitted).
A solution is to not initiate BiglistView
with a Biglist
object.
Instead, just provide the file path and let the BiglistView
create a Biglist
object itself.
Further, don’t create the Biglist
object in BiglistView.__init__
—which is run to initialize
the view object before the object is sent to the other process.
Instead, delay the creation of a Biglist
object to the time when it is actually needed for element access, and that happens only after the BiglistView
object has landed in the other process.
So we make some revisions like below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class BiglistView:
def __init__(self, path: str, range_: range = None):
self._path = path
self._list = None
self._range = range_
def _open_list(self):
self._list = Biglist(self._path)
if self._range is None:
self._range = range(len(self._list))
def __getitem__(self, idx: Union[int, slice]):
if self._list is None:
self._open_list()
if isinstance(idx, int):
return self._list[self._range[idx]]
if not isinstance(idx, slice):
raise TypeError(f"an integer or slice is expected")
return self.__class__(self._path, self._range[idx])
def __iter__(self):
if self._list is None:
self._init_list()
for idx in self._range:
yield self._list[idx]
def __len__(self) -> int:
if self._range is None:
self._open_list()
return len(self._range)
# ... other methods same as above ...
class Biglist:
def view(self):
return BiglistView(self.path)
When we want concurrent processing, we simply chop up the whole range of the Biglist
into a series of BiglistView
s. The views will read their elements independent of all the other views.
It will work.
Can we do better?
Think about how we are going to chop up the Biglist
.
Noticing the list is backed by a series of files, one logical way is to create one view per file.
This way, each file is loaded into memory exactly once.
In order to do this, we need to work out the index range for each file, which is easy.
Further, we may as well add a method to Biglist
to iterate over a particular file.
It will be used in a BiglistView
when that view is accessing exactly the elements of one file.
1
2
3
4
5
6
7
8
class Biglist:
def iterfile(self, file_idx):
assert 0 <= file_idx < len(self.file_lengths)
if file_idx == self._read_buffer_file_idx:
yield from self._read_buffer
else:
yield from pickle_load(self.data_file(file_idx))
Now add a method to return a BiglistView
for a particular file:
1
2
3
4
5
6
7
8
9
class Biglist:
def fileview(self, file_idx: int) -> 'BiglistView':
assert not self._append_buffer
assert 0 <= file_idx < len(self.file_lengths)
return BiglistView(
self.path,
range(self.cum_file_lengths[file_idx], self.cum_file_lengths[file_idx+1]),
)
and a method to return a list of views covering the entire Biglist
:
1
2
3
4
5
6
7
class Biglist:
def fileviews(self) -> List['BiglistView']:
return [
self.fileview(i)
for i in range(len(self.file_lengths))
]
One more thing, we did not tell BiglistView
that its range covers exactly one file
of the host Biglist
. As a result, when we access an element or iterate over the view,
the element index is treated totally generically: Biglist.__getitem__
is called,
which determines which file contains the element, whether the file needs to be loaded, and such. When we iterate over the view—and that is the most common use case—this index logic is largely a waste of time. A much better approach is to use Biglist.iterfile
.
To that end, BiglistView
needs to determine whether its ranges covers exactly one file of the host Biglist
. The following revision does this.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class BiglistView:
def _fileview_idx(self):
if self._range.step != 1:
return None
try:
idx = self._list.cum_file_lengths.index(self._range.start)
except ValueError:
return None
if idx >= len(self._list.cum_file_lengths) - 1:
return None
if self._range.stop != self._list.cum_file_lengths[idx + 1]:
return None
return idx
def __iter__(self):
if self._list is None:
self._open_list()
file_idx = self._fileview_idx()
if file_idx is None:
for idx in self._range:
yield self._list[idx]
else:
yield from self._list.iterfile(file_idx)
We could also overload the range_
parameter of BiglistView.__init__
,
and let it indicate a file index when it’s an int
.
That overloading might be a little ugly, but it works, too.