Intersection of sorted lists is a cornerstone operation in many applications including search engines and databases because indexes are often implemented using different types of sorted structures. At GridDynamics, we recently worked on a custom database for realtime web analytics where fast intersection of very large lists of IDs was a must for good performance. From a functional point of view, we needed mainly a standard boolean query processing, so it was possible to use Solr/Lucene as a platform. However, it was interesting to evaluate performance of alternative approaches. In this article I describe several useful techniques that are based on SSE instructions and provide results of performance testing for Lucene, Java, and C implementations. I’d like to mention that in this study we were focused on a general case when selectivity of the intersection is low or unknown and optimization techniques like skip list are not necessarily beneficial.
Scalar Intersection
Our starting point is a simple element-by-element intersection algorithm (also known as Zipper). Its implementation in C is shown below and do not require lengthy explanations:
#define int32 unsigned int
// A, B - operands, sorted arrays
// s_a, s_b - sizes of A and B
// C - result buffer
// return size of the result C
size_t intersect_scalar(int32 *A, int32 *B, size_t s_a, size_t s_b, int32 *C) {
size_t i_a = 0, i_b = 0;
size_t counter = 0;
while(i_a < s_a && i_b < s_b) {
if(A[i_a] < B[i_b]) {
i_a++;
} else if(B[i_b] < A[i_a]) {
i_b++;
} else {
C[counter++] = A[i_a];
i_a++; i_b++;
}
}
return counter;
}
Performance of this procedure both in C and Java will be evaluated in the last section. I believe that it is possible to improve this approach using a branchless implementation, but I had no chance to try it out.
Vectorized Intersection
It is intuitively clear that performance of intersection may be improved by processing of multiple elements at once using SIMD instructions. Let us start with the following question: how to find and extract common elements in two short sorted arrays (let’s call them segments). SSE instruction set allow one to do a pairwise comparison of two segments of four 32-bit integers each using one instruction (_mm_cmpeq intrinsic) that produces a bit mask that highlights positions of equal elements. If one has two 4-element registers, A and B, it is possible to obtain a mask of common elements comparing A with different cyclic shifts of B (the left part of the figure below) and OR-ing the masks produced by each comparison (the right part of the figure):
The resulting comparison mask highlights the required elements in the segment A. This 128-bit mask can be transformed to a 4-bit value (shuffling mask index) using __mm_movemask intrinsic. When this short mask of common elements is obtained, we have to efficiently copy out common elements. This can be done by shuffling of the original elements according to the shuffling mask that can be looked up in the precomputed dictionary using the shuffling mask index (i.e. each of 16 possible 4-bit shuffling mask indexes is mapped to some permutation). All common elements should be placed to the beginning of the register, in this case register can be copied in one shot to the output buffer C as it shown in the figure above.
The described technique gives us a building block that can be used for intersection of long sorted lists. This process is somehow similar to the scalar intersection:
In this example, during the first cycle we compare first 4-element segments (highlighted in red) and copy out common elements (2 and 11). Similarly to the scalar intersection algorithm, we can move forward the pointer for the list B because the tail element of the compared segments is smaller in B (11 vs 14). At the second cycle (in green) we compare the first segment of A with the second segment of B, intersection is empty, and we move pointer for A. And so on. In this example, we need 5 comparisons to process two lists of 12 elements each.
The complete implementation of the described techniques is shown below:
static __m128i shuffle_mask[16]; // precomputed dictionary
size_t intersect_vector(int32 *A, int32 *B, size_t s_a, size_t s_b, int32 *C) {
size_t count = 0;
size_t i_a = 0, i_b = 0;
// trim lengths to be a multiple of 4
size_t st_a = (s_a / 4) * 4;
size_t st_b = (s_b / 4) * 4;
while(i_a < s_a && i_b < s_b) {
//[ load segments of four 32-bit elements
__m128i v_a = _mm_load_si128((__m128i*)&A[i_a]);
__m128i v_b = _mm_load_si128((__m128i*)&B[i_b]);
//]
//[ move pointers
int32 a_max = _mm_extract_epi32(v_a, 3);
int32 b_max = _mm_extract_epi32(v_b, 3);
i_a += (a_max <= b_max) * 4;
i_b += (a_max >= b_max) * 4;
//]
//[ compute mask of common elements
int32 cyclic_shift = _MM_SHUFFLE(0,3,2,1);
__m128i cmp_mask1 = _mm_cmpeq_epi32(v_a, v_b); // pairwise comparison
v_b = _mm_shuffle_epi32(v_b, cyclic_shift); // shuffling
__m128i cmp_mask2 = _mm_cmpeq_epi32(v_a, v_b); // again...
v_b = _mm_shuffle_epi32(v_b, cyclic_shift);
__m128i cmp_mask3 = _mm_cmpeq_epi32(v_a, v_b); // and again...
v_b = _mm_shuffle_epi32(v_b, cyclic_shift);
__m128i cmp_mask4 = _mm_cmpeq_epi32(v_a, v_b); // and again.
__m128i cmp_mask = _mm_or_si128(
_mm_or_si128(cmp_mask1, cmp_mask2),
_mm_or_si128(cmp_mask3, cmp_mask4)
); // OR-ing of comparison masks
// convert the 128-bit mask to the 4-bit mask
int32 mask = _mm_movemask_ps((__m128)cmp_mask);
//]
//[ copy out common elements
__m128i p = _mm_shuffle_epi8(v_a, shuffle_mask[mask]);
_mm_storeu_si128((__m128i*)&C[count], p);
count += _mm_popcnt_u32(mask); // a number of elements is a weight of the mask
//]
}
// intersect the tail using scalar intersection
...
return count;
}
The described implementation uses the shuffle_mask dictionary to map the mask of common elements to the shuffling parameter. Building of this dictionary is straightforward (each bit in the mask corresponds to 4 bytes in the register):
// a simple implementation, we don't care about performance here
void prepare_shuffling_dictionary() {
for(int i = 0; i < 16; i++) {
int counter = 0;
char permutation[16];
memset(permutation, 0xFF, sizeof(permutation));
for(char b = 0; b < 4; b++) {
if(getBit(i, b)) {
permutation[counter++] = 4*b;
permutation[counter++] = 4*b + 1;
permutation[counter++] = 4*b + 2;
permutation[counter++] = 4*b + 3;
}
}
__m128i mask = _mm_loadu_si128((const __m128i*)permutation);
shuffle_mask[i] = mask;
}
}
int getBit(int value, int position) {
return ( ( value & (1 << position) ) >> position);
}
Partitioned Vectorized Intersection
SSE 4.2 instruction set offers PCMPESTRM instruction that allows one to compare two segments of eight 16-bit values each and obtain a bit mask that highlights common elements. This sounds like an extremely efficient approach for intersection of sorted lists, but in its basic form this approach is limited by 16-bit values in the lists. This is not the case for many applications, so a workaround was recently suggested by Benjamin Schedel et al. in this article. The main idea is to store indexes in the partitioned format, where elements with the same most significant bits are grouped together. This approach also has limited applicability because each partition should contain a sufficient number of elements, i.e. it works well in case or very large lists or favorable distribution of the values.
Each partition has a header that includes a prefix which represents most significant bits that are common for all elements in the partition and the number of elements in the partition. The following figure illustrates the partitioning process:

The partitioning procedure that coverts 32-bit values into 16-bit values is shown in the code snippet below:
// A - sorted array
// s_a - size of A
// R - partitioned sorted array
size_t partition(int32 *A, size_t s_a, int16 *R) {
int16 high = 0;
size_t partition_length = 0;
size_t partition_size_position = 1;
size_t counter = 0;
for(size_t p = 0; p < s_a; p++) {
int16 chigh = _high16(A[p]); // upper dword
int16 clow = _low16(A[p]); // lower dword
if(chigh == high && p != 0) { // add element to the current partition
R[counter++] = clow;
partition_length++;
} else { // start new partition
R[counter++] = chigh; // partition prefix
R[counter++] = 0; // reserve place for partition size
R[counter++] = clow; // write the first element
R[partition_size_position] = partition_length;
partition_length = 1; // reset counters
partition_size_position = counter - 2;
high = chigh;
}
}
R[partition_size_position] = partition_length;
return counter;
}
A pair of partitions can be intersected using the following procedure that computes a mask of common elements using _mm_cmpestrm intrinsic and then shuffles these elements similarly to the vectorized intersection procedure what was described in the previous section.
size_t intersect_vector16(int16 *A, int16 *B, size_t s_a, size_t s_b, int16 *C) {
size_t count = 0;
size_t i_a = 0, i_b = 0;
size_t st_a = (s_a / 8) * 8;
size_t st_b = (s_b / 8) * 8;
while(i_a < st_a && i_b < st_b) {
__m128i v_a = _mm_loadu_si128((__m128i*)&A[i_a]);
__m128i v_b = _mm_loadu_si128((__m128i*)&B[i_b]);
__m128i res_v = _mm_cmpestrm(v_b, 8, v_a, 8,
_SIDD_UWORD_OPS|_SIDD_CMP_EQUAL_ANY|_SIDD_BIT_MASK);
int r = _mm_extract_epi32(res_v, 0);
__m128i p = _mm_shuffle_epi8(v_a, shuffle_mask16[r]);
_mm_storeu_si128((__m128i*)&C[count], p);
count += _mm_popcnt_u32(r);
int16 a_max = _mm_extract_epi16(v_a, 7);
int16 b_max = _mm_extract_epi16(v_b, 7);
i_a += (a_max <= b_max) * 4;
i_b += (a_max >= b_max) * 4;
}
// intersect the tail using scalar intersection
...
return count;
}
The whole intersection algorithm looks similarly to the scalar intersection. It receives two partitioned operands, iterates over headers of partitions and calls intersection of particular partitions if their prefixes match:
// A, B - partitioned operands
size_t intersect_partitioned(int16 *A, int16 *B, size_t s_a, size_t s_b, int16 *C) {
size_t i_a = 0, i_b = 0;
size_t counter = 0;
while(i_a < s_a && i_b < s_b) {
if(A[i_a] < B[i_b]) {
i_a += A[i_a + 1] + 2;
} else if(B[i_b] < A[i_a]) {
i_b += B[i_b + 1] + 2;
} else {
C[counter++] = A[i_a]; // write partition prefix
int16 partition_size = intersect_vector16(&A[i_a + 2], &B[i_b + 2],
A[i_a + 1], B[i_b + 1], &C[counter + 1]);
C[counter++] = partition_size; // write partition size
counter += partition_size;
i_a += A[i_a + 1] + 2;
i_b += B[i_b + 1] + 2;
}
}
return counter;
}
The output of this procedure is also a partitioned vector that can be used in further operations.
Performance Evaluation
Performance of the described techniques was evaluated for intersection of sorted lists of size 1 million elements, with average intersection selectivity about 30%. All evaluated methods excepts partitioned vectorized intersection do not require specific properties of the values in the lists. For partitioned vectorized intersection values were selected from range [0, 3M] to provide relatively large partitions.
In case of Lucene, a corpus of documents with two fields was generated to provide the mentioned index sizes and selectivity; RAMDirectory was used. Intersection was done using standard Boolean query with top hits limited by 1 to prevent generation of large result set. Of course, this not a fair comparison because Lucene is much more than a list intersector, but it is still interesting to try it out.
Performance testing was done on the ordinary Linux desktop with 2.8GHz cores. JDK 1.6 and gcc 4.5.2 (with -O3 option) were used.



Adrian Muraru
June 6, 2012
Cool, on a side note wondering what tool u used for drawiings
Ilya Katsov
March 21, 2013
I typically use PowerPoint; CorelDraw in some cases.
Paul Khuong
June 7, 2012
Is the sequence of cyclic permutations actually faster than just duplicating scalars four times?
Ilya Katsov
June 8, 2012
Paul,
Do you mean comparison with four duplicates of the first element, the second element etc? I didn’t try it, but I think it will be the same thing, the same instruction – for example, see this thread. Anyway, this is a good question, thank you.
Paul Khuong
June 8, 2012
RIght, that’s what I meant. Duplicating also comes down to a single PSHUFD. The difference is that all four duplicate + compare sequence are independent of each other, while the rotates form a dependency chain (and each comparison obviously depends on one of the rotate in that chain).
Ilya Katsov
June 9, 2012
I get it now, a good point. I quickly benchmarked the following routine:
__m128i v_b0 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(0,0,0,0));
__m128i cmp_mask1 = _mm_cmpeq_epi32(v_a, v_b0);
__m128i v_b1 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(1,1,1,1));
__m128i cmp_mask2 = _mm_cmpeq_epi32(v_a, v_b1);
__m128i v_b2 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(2,2,2,2));
__m128i cmp_mask3 = _mm_cmpeq_epi32(v_a, v_b2);
__m128i v_b3 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(3,3,3,3));
__m128i cmp_mask4 = _mm_cmpeq_epi32(v_a, v_b3);
and got the same result as for cyclic shifts, even a little bit slower (probably because of additional shuffling in the begining).
Paul Khuong
June 9, 2012
Interesting! Thanks for trying it out.
Fernando Bertoldi
June 8, 2012
Hi,
Your scalar intersection implementation has some errors. In the while body you forgot to copy to the destination in the first two cases:
if (a[i_a] < b[i_b]) {
c[counter++] = a[i_a++];
}
else if (b[i_b] < a[i_a]) {
c[counter++] = b[i_b++];
}
else {
c[counter++] = a[i_a];
++i_a; ++i_b;
}
You also don't treat the cases when one array is shorter than the other or the size is zero, but maybe that's beside the point.
Ilya Katsov
June 8, 2012
Fernando,
Please note that this is intersection. You are probably talking about union, aren’t you?.
Fernando
June 8, 2012
Oh I’m sorry, you’re right.
jfheidt
June 8, 2012
Amazing perf boost, I am also curious what application you used to mark up your drawings.
Ilya Katsov
June 8, 2012
This is definitely the most popular question
I typically use PowerPoint; CorelDraw in some cases.
digdog
June 8, 2012
very cool technique!