Intersection of sorted lists is a cornerstone operation in many applications including search engines and databases because indexes are often implemented using different types of sorted structures. At GridDynamics, we recently worked on a custom database for realtime web analytics where fast intersection of very large lists of IDs was a must for good performance. From a functional point of view, we needed mainly a standard boolean query processing, so it was possible to use Solr/Lucene as a platform. However, it was interesting to evaluate performance of alternative approaches. In this article I describe several useful techniques that are based on SSE instructions and provide results of performance testing for Lucene, Java, and C implementations. I’d like to mention that in this study we were focused on a general case when selectivity of the intersection is low or unknown and optimization techniques like skip list are not necessarily beneficial.

## Scalar Intersection

Our starting point is a simple element-by-element intersection algorithm (also known as Zipper). Its implementation in C is shown below and do not require lengthy explanations:

#define int32 unsigned int // A, B - operands, sorted arrays // s_a, s_b - sizes of A and B // C - result buffer // return size of the result C size_t intersect_scalar(int32 *A, int32 *B, size_t s_a, size_t s_b, int32 *C) { size_t i_a = 0, i_b = 0; size_t counter = 0; while(i_a < s_a && i_b < s_b) { if(A[i_a] < B[i_b]) { i_a++; } else if(B[i_b] < A[i_a]) { i_b++; } else { C[counter++] = A[i_a]; i_a++; i_b++; } } return counter; }

Performance of this procedure both in C and Java will be evaluated in the last section. I believe that it is possible to improve this approach using a branchless implementation, but I had no chance to try it out.

## Vectorized Intersection

It is intuitively clear that performance of intersection may be improved by processing of multiple elements at once using SIMD instructions. Let us start with the following question: how to find and extract common elements in two short sorted arrays (let’s call them segments). SSE instruction set allow one to do a pairwise comparison of two segments of four 32-bit integers each using one instruction (_mm_cmpeq intrinsic) that produces a bit mask that highlights positions of equal elements. If one has two 4-element registers, A and B, it is possible to obtain a mask of common elements comparing A with different cyclic shifts of B (the left part of the figure below) and OR-ing the masks produced by each comparison (the right part of the figure):

The resulting* comparison mask* highlights the required elements in the segment A. This 128-bit mask can be transformed to a 4-bit value (*shuffling mask index*) using __mm_movemask intrinsic. When this short mask of common elements is obtained, we have to efficiently copy out common elements. This can be done by shuffling of the original elements according to the shuffling mask that can be looked up in the precomputed dictionary using the *shuffling mask index (*i.e. each of 16 possible 4-bit shuffling mask indexes* *is mapped to some permutation*)*. All common elements should be placed to the beginning of the register, in this case register can be copied in one shot to the output buffer C as it shown in the figure above.

The described technique gives us a building block that can be used for intersection of long sorted lists. This process is somehow similar to the scalar intersection:

In this example, during the first cycle we compare first 4-element segments (highlighted in red) and copy out common elements (2 and 11). Similarly to the scalar intersection algorithm, we can move forward the pointer for the list B because the tail element of the compared segments is smaller in B (11 vs 14). At the second cycle (in green) we compare the first segment of A with the second segment of B, intersection is empty, and we move pointer for A. And so on. In this example, we need 5 comparisons to process two lists of 12 elements each.

The complete implementation of the described techniques is shown below:

static __m128i shuffle_mask[16]; // precomputed dictionary size_t intersect_vector(int32 *A, int32 *B, size_t s_a, size_t s_b, int32 *C) { size_t count = 0; size_t i_a = 0, i_b = 0; // trim lengths to be a multiple of 4 size_t st_a = (s_a / 4) * 4; size_t st_b = (s_b / 4) * 4; while(i_a < s_a && i_b < s_b) { //[ load segments of four 32-bit elements __m128i v_a = _mm_load_si128((__m128i*)&A[i_a]); __m128i v_b = _mm_load_si128((__m128i*)&B[i_b]); //] //[ move pointers int32 a_max = _mm_extract_epi32(v_a, 3); int32 b_max = _mm_extract_epi32(v_b, 3); i_a += (a_max <= b_max) * 4; i_b += (a_max >= b_max) * 4; //] //[ compute mask of common elements int32 cyclic_shift = _MM_SHUFFLE(0,3,2,1); __m128i cmp_mask1 = _mm_cmpeq_epi32(v_a, v_b); // pairwise comparison v_b = _mm_shuffle_epi32(v_b, cyclic_shift); // shuffling __m128i cmp_mask2 = _mm_cmpeq_epi32(v_a, v_b); // again... v_b = _mm_shuffle_epi32(v_b, cyclic_shift); __m128i cmp_mask3 = _mm_cmpeq_epi32(v_a, v_b); // and again... v_b = _mm_shuffle_epi32(v_b, cyclic_shift); __m128i cmp_mask4 = _mm_cmpeq_epi32(v_a, v_b); // and again. __m128i cmp_mask = _mm_or_si128( _mm_or_si128(cmp_mask1, cmp_mask2), _mm_or_si128(cmp_mask3, cmp_mask4) ); // OR-ing of comparison masks // convert the 128-bit mask to the 4-bit mask int32 mask = _mm_movemask_ps((__m128)cmp_mask); //] //[ copy out common elements __m128i p = _mm_shuffle_epi8(v_a, shuffle_mask[mask]); _mm_storeu_si128((__m128i*)&C[count], p); count += _mm_popcnt_u32(mask); // a number of elements is a weight of the mask //] } // intersect the tail using scalar intersection ... return count; }

The described implementation uses the *shuffle_mask* dictionary to map the mask of common elements to the shuffling parameter. Building of this dictionary is straightforward (each bit in the mask corresponds to 4 bytes in the register):

// a simple implementation, we don't care about performance here void prepare_shuffling_dictionary() { for(int i = 0; i < 16; i++) { int counter = 0; char permutation[16]; memset(permutation, 0xFF, sizeof(permutation)); for(char b = 0; b < 4; b++) { if(getBit(i, b)) { permutation[counter++] = 4*b; permutation[counter++] = 4*b + 1; permutation[counter++] = 4*b + 2; permutation[counter++] = 4*b + 3; } } __m128i mask = _mm_loadu_si128((const __m128i*)permutation); shuffle_mask[i] = mask; } } int getBit(int value, int position) { return ( ( value & (1 << position) ) >> position); }

## Partitioned Vectorized Intersection

SSE 4.2 instruction set offers PCMPESTRM instruction that allows one to compare two segments of eight 16-bit values each and obtain a bit mask that highlights common elements. This sounds like an extremely efficient approach for intersection of sorted lists, but in its basic form this approach is limited by 16-bit values in the lists. This is not the case for many applications, so a workaround was recently suggested by Benjamin Schedel et al. in this article. The main idea is to store indexes in the partitioned format, where elements with the same most significant bits are grouped together. This approach also has limited applicability because each partition should contain a sufficient number of elements, i.e. it works well in case or very large lists or favorable distribution of the values.

Each partition has a header that includes a prefix which represents most significant bits that are common for all elements in the partition and the number of elements in the partition. The following figure illustrates the partitioning process:

The partitioning procedure that coverts 32-bit values into 16-bit values is shown in the code snippet below:

// A - sorted array // s_a - size of A // R - partitioned sorted array size_t partition(int32 *A, size_t s_a, int16 *R) { int16 high = 0; size_t partition_length = 0; size_t partition_size_position = 1; size_t counter = 0; for(size_t p = 0; p < s_a; p++) { int16 chigh = _high16(A[p]); // upper dword int16 clow = _low16(A[p]); // lower dword if(chigh == high && p != 0) { // add element to the current partition R[counter++] = clow; partition_length++; } else { // start new partition R[counter++] = chigh; // partition prefix R[counter++] = 0; // reserve place for partition size R[counter++] = clow; // write the first element R[partition_size_position] = partition_length; partition_length = 1; // reset counters partition_size_position = counter - 2; high = chigh; } } R[partition_size_position] = partition_length; return counter; }

A pair of partitions can be intersected using the following procedure that computes a mask of common elements using _mm_cmpestrm intrinsic and then shuffles these elements similarly to the vectorized intersection procedure what was described in the previous section.

size_t intersect_vector16(int16 *A, int16 *B, size_t s_a, size_t s_b, int16 *C) { size_t count = 0; size_t i_a = 0, i_b = 0; size_t st_a = (s_a / 8) * 8; size_t st_b = (s_b / 8) * 8; while(i_a < st_a && i_b < st_b) { __m128i v_a = _mm_loadu_si128((__m128i*)&A[i_a]); __m128i v_b = _mm_loadu_si128((__m128i*)&B[i_b]); __m128i res_v = _mm_cmpestrm(v_b, 8, v_a, 8, _SIDD_UWORD_OPS|_SIDD_CMP_EQUAL_ANY|_SIDD_BIT_MASK); int r = _mm_extract_epi32(res_v, 0); __m128i p = _mm_shuffle_epi8(v_a, shuffle_mask16[r]); _mm_storeu_si128((__m128i*)&C[count], p); count += _mm_popcnt_u32(r); int16 a_max = _mm_extract_epi16(v_a, 7); int16 b_max = _mm_extract_epi16(v_b, 7); i_a += (a_max <= b_max) * 4; i_b += (a_max >= b_max) * 4; } // intersect the tail using scalar intersection ... return count; }

The whole intersection algorithm looks similarly to the scalar intersection. It receives two partitioned operands, iterates over headers of partitions and calls intersection of particular partitions if their prefixes match:

// A, B - partitioned operands size_t intersect_partitioned(int16 *A, int16 *B, size_t s_a, size_t s_b, int16 *C) { size_t i_a = 0, i_b = 0; size_t counter = 0; while(i_a < s_a && i_b < s_b) { if(A[i_a] < B[i_b]) { i_a += A[i_a + 1] + 2; } else if(B[i_b] < A[i_a]) { i_b += B[i_b + 1] + 2; } else { C[counter++] = A[i_a]; // write partition prefix int16 partition_size = intersect_vector16(&A[i_a + 2], &B[i_b + 2], A[i_a + 1], B[i_b + 1], &C[counter + 1]); C[counter++] = partition_size; // write partition size counter += partition_size; i_a += A[i_a + 1] + 2; i_b += B[i_b + 1] + 2; } } return counter; }

The output of this procedure is also a partitioned vector that can be used in further operations.

## Performance Evaluation

Performance of the described techniques was evaluated for intersection of sorted lists of size 1 million elements, with average intersection selectivity about 30%. All evaluated methods excepts partitioned vectorized intersection do not require specific properties of the values in the lists. For partitioned vectorized intersection values were selected from range [0, 3M] to provide relatively large partitions.

In case of Lucene, a corpus of documents with two fields was generated to provide the mentioned index sizes and selectivity; RAMDirectory was used. Intersection was done using standard Boolean query with top hits limited by 1 to prevent generation of large result set. Of course, this not a fair comparison because Lucene is much more than a list intersector, but it is still interesting to try it out.

Performance testing was done on the ordinary Linux desktop with 2.8GHz cores. JDK 1.6 and gcc 4.5.2 (with -O3 option) were used.

Cool, on a side note wondering what tool u used for drawiings

I typically use PowerPoint; CorelDraw in some cases.

Is the sequence of cyclic permutations actually faster than just duplicating scalars four times?

Paul,

Do you mean comparison with four duplicates of the first element, the second element etc? I didn’t try it, but I think it will be the same thing, the same instruction – for example, see this thread. Anyway, this is a good question, thank you.

RIght, that’s what I meant. Duplicating also comes down to a single PSHUFD. The difference is that all four duplicate + compare sequence are independent of each other, while the rotates form a dependency chain (and each comparison obviously depends on one of the rotate in that chain).

I get it now, a good point. I quickly benchmarked the following routine:

__m128i v_b0 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(0,0,0,0));

__m128i cmp_mask1 = _mm_cmpeq_epi32(v_a, v_b0);

__m128i v_b1 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(1,1,1,1));

__m128i cmp_mask2 = _mm_cmpeq_epi32(v_a, v_b1);

__m128i v_b2 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(2,2,2,2));

__m128i cmp_mask3 = _mm_cmpeq_epi32(v_a, v_b2);

__m128i v_b3 = _mm_shuffle_epi32(v_b, _MM_SHUFFLE(3,3,3,3));

__m128i cmp_mask4 = _mm_cmpeq_epi32(v_a, v_b3);

and got the same result as for cyclic shifts, even a little bit slower (probably because of additional shuffling in the begining).

Interesting! Thanks for trying it out.

Hi,

Your scalar intersection implementation has some errors. In the while body you forgot to copy to the destination in the first two cases:

if (a[i_a] < b[i_b]) {

c[counter++] = a[i_a++];

}

else if (b[i_b] < a[i_a]) {

c[counter++] = b[i_b++];

}

else {

c[counter++] = a[i_a];

++i_a; ++i_b;

}

You also don't treat the cases when one array is shorter than the other or the size is zero, but maybe that's beside the point.

Fernando,

Please note that this is

intersection. You are probably talking aboutunion, aren’t you?.Oh I’m sorry, you’re right.

Amazing perf boost, I am also curious what application you used to mark up your drawings.

This is definitely the most popular question ;) I typically use PowerPoint; CorelDraw in some cases.

very cool technique!

There is a typos at the 11 line:

3 size_t intersect_vector(int32 *A, int32 *B, size_t s_a, size_t s_b, int32 *C) {

…

11 while(i_a < st_a && i_b < st_b) {

Alexander,

I don’t quite see what the typo is. Could you please elaborate on what exactly is incorrect?

Is it possible to implement vector or partitioned vector in Java? I have exactly the same problem of intersecting millions of sorted arrays with unique elements (dataset row ids) and in my algorithm this intersection operation (I use scalar method) takes about 70% of execution time. I see in C++ you can significantly speed it up.

What about in Java?

Obviously, you cannot use SIMD instructions in Java explicitly. JIT in HotSpot JVM has very limited support of SIMD, so it is not able to vectorize such algorithms. The only option is to write a native lib and call it from Java. However, I think that it could be nontrivial – native procedures receive the data as Java structures and format conversions can eat a lot of time.