Externe Sortierung mit O (1) zusätzlichen Speicher

Nachdem ich diesen Artikel gelesen hatte , fiel mir ein, dass ich eine externe Sortierung geschrieben hatte, die O (1) externen Speicher verwendete. Die Funktion erhielt eine Binärdatei und die maximale Speichergröße, die sie dem Array zuweisen konnte:

void ext_sort(const std::string filename, const size_t memory)

Ich habe den Algorithmus von Effective Performance of External Sorting ohne zusätzlichen Speicherplatz verwendet :

  1. Teilen Sie die Datei in Blöcke auf, die in den verfügbaren Speicher passen. Bezeichnen Sie diese Blöcke Block_1, Block_2, ..., Block_ (S-1), Block_S. Setze P = 1.
  2. Block_P in den Speicher lesen.
  3. Sortieren Sie die Daten im Speicher und schreiben Sie sie zurück in Block_P. Setze P = P + 1 und wenn P ≤ S, dann lies Block_P in den Speicher und wiederhole diesen Schritt. Mit anderen Worten, sortieren Sie jeden Block der Datei.
  4. Teilen Sie jeden Block in kleinere Blöcke B_1 und B_2. Jeder dieser Blöcke belegt die Hälfte des verfügbaren Speichers.
  5. Wir lesen Block B_1 von Block Block_1 in der ersten Hälfte des verfügbaren Speichers. Setze Q = 2.
  6. Wir lesen den Block B_1 des Block_Q-Blocks in der zweiten Hälfte des verfügbaren Speichers.
  7. Wir vereinen die Arrays im Speicher unter Verwendung der In-Place-Zusammenführung, schreiben die zweite Hälfte des Speichers in Block B_1 von Block_Q und setzen Q = Q + 1, wenn Q ≤ S ist, lesen Sie Block B_1 von Block_Q auf die zweite Hälfte des verfügbaren Speichers und wiederholen Sie diesen Schritt.
  8. Wir schreiben die erste Hälfte des verfügbaren Speichers in Block B_1 von Block Block_1. Da wir immer weniger als die Hälfte der Elemente im Speicher belassen und mit allen Blöcken zusammengeführt haben, werden in diesem Teil des Speichers M minimale Elemente der gesamten Datei gespeichert.
  9. Lesen Sie den Block B_2 des Blocks Block_S in die zweite Hälfte des verfügbaren Speichers. Setze Q = S −1.
  10. Wir lesen den Block B_2 des Block_Q-Blocks in der ersten Hälfte des verfügbaren Speichers.
  11. Wir vereinen die Arrays im Speicher durch In-Place-Zusammenführung, schreiben die erste Hälfte des verfügbaren Speichers in Block B_2 von Block_Q und setzen Q = Q −1. Wenn Q ≥ 1 ist, lesen Sie Block B_2 von Block_Q in der ersten Hälfte des verfügbaren Speichers und wiederholen Sie diesen Schritt.
  12. Wir schreiben die zweite Hälfte des verfügbaren Speichers in Block B_2 von Block_S. Ähnlich wie in Schritt 8 werden hier die maximalen Elemente der gesamten Datei gespeichert.
  13. Definieren Sie ab Block B_2 von Block_1 bis Block B_1 von Block_S neue Blöcke in der Datei und nummerieren Sie sie erneut von Block_1 bis Block_S. Teilen Sie jeden Block in die Blöcke B_1 und B_2. Setze P = 1.
  14. Wir lesen B_1 und B_2 des Block_P-Blocks in den Speicher. Kombinieren Sie die Arrays im Speicher. schreibe das sortierte Array zurück in Block_P und setze P = P +1. Wenn P ≤ S ist, wiederholen Sie diesen Schritt.
  15. Wenn S> 1, kehren wir zu Schritt 5 zurück. Jedes Mal, wenn wir M minimale und maximale Elemente auswählen, schreiben Sie diese an den Anfang bzw. das Ende der Datei und machen das Gleiche mit den übrigen Elementen, bis wir die Mitte der Datei erreichen.

Der Vorteil dieses Algorithmus besteht neben dem Fehlen eines Puffers auf der Festplatte darin, dass Daten in relativ großen Abschnitten von der Festplatte gelesen werden, was den Algorithmus beschleunigt.

Wir implementieren den Algorithmus in C ++.

Bestimmen Sie zuerst die Anzahl der Blöcke und die Blockgröße in Bytes und in Elementen und ordnen Sie Speicher zu:

	const size_t type_size = sizeof(T);
	const uint64_t filesize = file_size(filename);
	std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
	const uint64_t chunk_number = filesize / memory;
	const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
		chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;
	std::vector *chunk = new std::vector(chunk_size);

Jetzt punkte 2-3 - sortiere jeden Block:

	for (uint64_t i = 0; i < chunk_number; ++i) {
		data.seekg(chunk_byte_size * i);
		data.read((char *) chunk->data(), chunk_byte_size);
		flat_quick_sort(chunk->begin(), chunk->end());
		data.seekp(chunk_byte_size * i);
		data.write((char *) chunk->data(), chunk_byte_size);
	}

Wir werden die Sortierung etwas später schreiben.

Wir gehen zu den Fusionen. Untere Hälfte:

	int64_t s = chunk_number, start = 0;
	while (s > 0) {
		data.seekp(half_chunk_byte_size * start);
		data.read((char *) chunk->data(), half_chunk_byte_size);
		for (int64_t q = 1; q < s; ++q) {
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
			data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
			data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		}
		data.seekp(half_chunk_byte_size * start);
		data.write((char *) chunk->data(), half_chunk_byte_size);

Und ähnlich top:

		data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		for (int64_t q = s - 2; q >= 0; --q) {
			data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.read((char *) chunk->data(), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.write((char *) chunk->data(), half_chunk_byte_size);
		}
		data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);

Wir verteilen die Blöcke neu, beenden den Zyklus und vergessen nicht, den Speicher freizugeben:

		--s;
		++start;
		for (int64_t p = 0; p < s; ++p) {
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
			data.read((char *) chunk->data(), chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
			data.write((char *) chunk->data(), chunk_byte_size);
		}
	}
	delete chunk;
}

Volle Funktion
template
void ext_sort(const std::string filename, const size_t memory) {
	const size_t type_size = sizeof(T);
	const uint64_t filesize = file_size(filename);
	std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
	const uint64_t chunk_number = filesize / memory;
	const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
		chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;
	std::vector *chunk = new std::vector(chunk_size);
	for (uint64_t i = 0; i < chunk_number; ++i) {
		data.seekg(chunk_byte_size * i);
		data.read((char *) chunk->data(), chunk_byte_size);
		flat_quick_sort(chunk->begin(), chunk->end());
		data.seekp(chunk_byte_size * i);
		data.write((char *) chunk->data(), chunk_byte_size);
	}
	int64_t s = chunk_number, start = 0;
	while (s > 0) {
		data.seekp(half_chunk_byte_size * start);
		data.read((char *) chunk->data(), half_chunk_byte_size);
		for (int64_t q = 1; q < s; ++q) {
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
			data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
			data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		}
		data.seekp(half_chunk_byte_size * start);
		data.write((char *) chunk->data(), half_chunk_byte_size);
		data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		for (int64_t q = s - 2; q >= 0; --q) {
			data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.read((char *) chunk->data(), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.write((char *) chunk->data(), half_chunk_byte_size);
		}
		data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		--s;
		++start;
		for (int64_t p = 0; p < s; ++p) {
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
			data.read((char *) chunk->data(), chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
			data.write((char *) chunk->data(), chunk_byte_size);
		}
	}
	delete chunk;
}


Осталось реализовать функции flat_quick_sort и in_place_merge. Идею (и большую часть реализации) flat quick sort я взял в статье хабраюзера ripatti. Я только заменил median of medians (посчитал это оверкиллом в среднем случае) на median-of-nine и добавил сортировку вставками для маленьких частей массива.

flat_quicksort.h
#ifndef EXTERNAL_SORT_FLAT_QUICKSORT_H
#define EXTERNAL_SORT_FLAT_QUICKSORT_H
template
void insertion_sort(ForwIt be, ForwIt en) {
	for (ForwIt ii = be + 1; ii != en; ii++) {
		ForwIt jj = ii - 1;
		auto val = *ii;
		while (jj >= be and *jj > val) {
			*(jj + 1) = *jj;
			--jj;
		}
		*(jj + 1) = val;
	}
}
template
ForwIt median_of_3(ForwIt it1, ForwIt it2, ForwIt it3) {
	return (*it1 > *it2) ?
	       (*it3 > *it2) ? (*it1 > *it3) ? it3 : it1 : it2 :
	       (*it3 > *it1) ? (*it2 > *it3) ? it3 : it2 : it1;
}
template
ForwIt choose_pivot(ForwIt be, ForwIt en) {
	ptrdiff_t s = (en - be) / 8;
	ForwIt mid = be + (en - be) / 2;
	ForwIt best1 = median_of_3(be, be + s, be + 2 * s), best2 = median_of_3(mid - s, mid, mid + s), best3 = median_of_3(
		en - 2 * s, en - s, en);
	return median_of_3(best1, best2, best3);
}
// search for the end of the current block
template
ForwIt block_range(ForwIt be, ForwIt en) {
	ForwIt it = be;
	while (it != en) {
		if (*be < *it)
			return it;
		++it;
	}
	return it;
}
// warning: after the partition outer pivot may point to random element
template
std::pair partition_range(ForwIt be, ForwIt en, ForwIt pivot) {
	std::pair re;
	ForwIt j = be;
	for (ForwIt i = be; i != en; ++i)
		if (*i < *pivot) {
			if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
			std::swap(*j, *i);
			++j;
		}
	re.first = j;
	for (ForwIt i = j; i != en; ++i)
		if (*pivot == *i) {
			if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
			std::swap(*j, *i);
			++j;
		}
	re.second = j;
	return re;
}
// makes largest element the first
template
void blockify(ForwIt be, ForwIt en) {
	for (ForwIt it = be; it != en; ++it)
		if (*be < *it)
			std::swap(*be, *it);
}
template
void flat_quick_sort(ForwIt be, ForwIt en) {
	ForwIt tmp = en; // the current end of block
	while (be != en) {
		if (std::is_sorted(be, tmp)) {
			be = tmp;
			tmp = block_range(be, en);
			continue;
		}
		if (tmp - be < 32)
			insertion_sort(be, tmp);
		else {
			ForwIt pivot = choose_pivot(be, tmp);
			std::pair range = partition_range(be, tmp, pivot);
			blockify(range.second, tmp);
			tmp = range.first;
		}
	}
}
#endif //EXTERNAL_SORT_FLAT_QUICKSORT_H


Со слиянием было сложнее. Сначала я использовал заглушку, использующую O(n) памяти:

template
void merge(std::vector &chunk, size_t s, size_t q, size_t r) {
	std::vector *chunk2 = new std::vector(q - s + 1);
	auto it2 = chunk2->begin(), it1 = chunk.begin() + q + 1, it = chunk.begin() + s;
	std::copy(it, it1, it2);
	while (it2 != chunk2->end() && it1 != chunk.begin() + r + 1) {
		if (*it1 > *it2) {
			*it = *it2;
			++it2;
		} else {
			*it = *it1;
			++it1;
		}
		++it;
	}
	if (it1 == chunk.begin() + r + 1)
		std::copy(it2, chunk2->end(), it);
	else
		std::copy(it1, chunk.begin() + r + 1, it);
	delete chunk2;
}

Когда я захотел заменить заглушку in-place версией, оказалось, что быстрые алгоритмы in-place слияния в большинстве своем достаточно запутанные (посмотрите, например, On optimal and efficient in place merging). Мне надо было что-то попроще, и я выбрал алгоритм из статьи A simple algorithm for in-place merging:

in-place merge
template
void merge_B_and_Y(Iter z, Iter y, Iter yn) {
	for (; z < y && y < yn; ++z) {
		Iter j = std::min_element(z, y);
		if (*j <= *y)
			std::swap(*z, *j);
		else {
			std::swap(*z, *y);
			++y;
		}
	}
	if (z < y)
		flat_quick_sort(z, yn);
}
template
Iter find_next_X_block(Iter x0, Iter z, Iter y, size_t k, size_t f, Iter b1,
                       Iter b2, auto max) {
	auto min1 = max, min2 = max;
	Iter m = x0 + (ptrdiff_t) floor((z - x0 - f) / k) * k + f, x = x0;
	if (m <= z)
		m += k;
	for (auto i = m; i + k <= y; i += k) {
		if (i != b1 && i != b2) {
			Iter j = (i < b1 && b1 < i + k) ? m - 1 : i + k - 1;
			if (*i <= min1 && *j <= min2) {
				x = i;
				min1 = *i;
				min2 = *j;
			}
		}
	}
	return x;
}
template
void in_place_merge(Iter x0, Iter y0, Iter yn, int64_t k, bool rec) {
	if (k == -1)
		k = (int64_t) sqrt(yn - x0);
	size_t f = (y0 - x0) % k;
	Iter x = (f == 0) ? y0 - 2 * k : y0 - k - f;
	auto t = *x, max = *std::max_element(x0, yn);
	*x = *x0;
	Iter z = x0, y = y0, b1 = x + 1, b2 = y0 - k;
	int i = 0;
	while (y - z > 2 * k) {
		++i;
		if (*x <= *y || y >= yn) {
			*z = *x;
			*x = *b1;
			++x;
			if ((x - x0) % k == f) if (z < x - k)
				b2 = x - k;
			x = find_next_X_block(x0, z, y, k, f, b1, b2, max);
		} else {
			*z = *y;
			*y = *b1;
			++y;
			if ((y - y0) % k == 0)
				b2 = y - k;
		}
		++z;
		*b1 = *z;
		if (z == x)
			x = b1;
		if (z == b2)
			b2 = yn + 1;
		++b1;
		if ((b1 - x0) % k == f)
			b1 = (b2 == yn + 1) ? b1 - k : b2;
	}
	*z = t;
	if (rec)
		merge_B_and_Y(z, y, yn);
	else  {
		flat_quick_sort(z, y);
		in_place_merge(z,y,yn,(int64_t)sqrt(k),true);
	}
}


Но на моем компьютере замена merge на in-place merge замедляла алгоритм почти на порядок. Возможно я ошибся в реализации или просто выбрал медленный алгоритм в погоне за простотой. Времени разбираться, как всегда, не было, к тому же gprof почему-то падал. И тут меня осенило. Если мы выделям M байт динамической памяти, то не важно, как мы её используем, мы все равно получаем O(1). Тогда просто выделим 2/3 под данные, а треть — под буфер слияния. Замедление будет гораздо меньше. И правда:
АлгоритмВремя (75MB int64 в 7,5MB памяти)Скорость (75MB int64 в 7,5MB памяти)Время (7,5MB int64 в 75KB памяти)Скорость (7,5MB int64 в 75KB памяти)Время (750MB int64 в 75MB памяти)Скорость (750MB int64 в 75MB памяти)
In-place merge6.04329 s1 241 045 B/s24.2993 s3 086 508 B/s--
Merge0.932663 s8 041 489 B/s2.73895 s27 382 756 B/s47.7946 s15 691 689 B/s
Algorithmus SLY_G1.79629 s4175272 B / s3,84775 s19 491 910 B / s39,77 s18 858 436 B / s
Leider verlangsamt sich der Algorithmus bei großen Volumes, was zu erwarten ist, da wir überhaupt keinen Puffer verwenden. Trotzdem ist die Geschwindigkeit des Algorithmus völlig ausreichend, und ich bin sicher, dass sie verbessert werden kann.

Alle Quellen sind hier .

Jetzt auch beliebt: