Massively Distributed Indexing of Time Series for Apache Spark

Find Out More

Work in progress...

Sketch method

This code is a Scala implementation of a sketch/random projection-based method to efficiently perform the parallel indexing of time series and similarity search on them.



What We Offer

Index construction

Query processing



Indexing and Querying Time Series with Spark-parSketch ...

Index Construction + Query Processing

Parallel index construction

Time-series index construction on the input dataset D within distributed data processing frameworks proceeds as follows:

  1. At the sketch computation stage the dot product of time series with the random transformation matrix R, where each element is a random variable in {1,-1}, results in a vector of much lower dimension. At each node, sketch vectors SD over the input set of time series D are built locally and then partitioned into equal subvectors of given size. Each subvector corresponds to a grid. Thus, each sketch is assigned to a grid cell in each of the grids.
  2. At the next stage grouping by grid cell is performed, which requires data shuffling between computation nodes. As a result, each grid cell is mapped to the list of time series identifiers assigned to that cell.
  3. We use a set of relational database instances to store the resulting grids GD, previously created and distributed at the nodes.

Parallel query processing

Given a collection of queries Q, in the form of distributed time series dataset, and a previously constructed index on a dataset D, we consider the problem of finding time series that are similar to Q in D. We perform such a search in the following steps:

  1. The sketches of the time series queries Q are computed in parallel S Q, using the same random matrix R, as for grid construction. The resulting sketches are then partitioned into sub-vectors in parallel. Then the sub-vectors are sent to their appropriate grids and placed in grid cells.
  2. Then, the contents of the corresponding grid cells in the index, previously stored as a collection of grids GD, are retrieved from all the database instances in parallel on each node. Thus, if a sub-vector of a query time series q lands in the same grid cell as a database time series d, then d is a possible match for q. Two time series (a query and an indexed one) are considered to be similar if they are assigned to the same grid cell in a large user-tunable fraction of grids.
  3. This requires global communication.
  4. Because sketching is approximate, each candidate match between a query q and data vector d is checked by performing correlations.



The main components of Spark-parSketch, i.e., Index Construction ( IC ) and Query Processing ( QP ), are developed within Spark. Spark is deployed on top of Hadoop Distributed File System ( HDFS ) in order to efficiently read input time series, query time series, as well as to store preliminary data and final results, and thus to overcome the bottleneck of centralized data storing.

Spark-parSketch stores grids (the resulting indexes) to a distributed relational storage ( RDB Store ), setup as a number of PostgreSQL instances. Each Spark worker connects to each of the databases through JDBC and persists the contents of each grid cell as soon as they are identified. This pipelined procedure avoids the in-memory storage of large intermediate datasets, hence relaxes the memory consumption during grid construction, to avoid memory overflow at Spark workers. Moreover, the embedded feature of RDBMS for indexation provides for more efficient query processing.

Source code


Demonstration Scenarios

Use Case #1: Synthetic datasets

In this use-case, each object of dataset is identified with an ID and consist of 256 values. At each time point, a random walk generator draws a random number from a Gaussian distribution N(0,1), then adds the value of the last number to the new number. The number of time series varies from 50 million to 300 million depending on the experiment.

Use Case #2: Seismic datasets

In this use-case, the real world data represents seismic time series collected from the IRIS Seismic Data Access repository at various earthquake zones. After preprocessing we've achieved two seismic datasets: the first one contains 600 thousand time series of 462 values and the second one - 40 million time series of 200 values each.

We show below set of charts to demonstrate the Spark-parSketch performance (vs. parallelized Linear Search algorithm), in terms of response time and precision. Bar chart compares time performance of two approaches. Two line charts depict the precision of similarity search as top candidates (up to 10) for selected query in a given dataset.

Parameters: The user can observe the tool performance on a range of input datasets ( Input Time Series ), alongside with various scale of query dataset ( Queries Number ). To proceed throught separate Query in the selected Dataset user may use the bottom slide bar ( Query # ).

The combo box Cell size demonstrate how some parameters of spark-parSketch tool, related to the sketch-based algorithm, could impact the precision accuracy.


Input Time Series :
Batch Size (number of queries) :
Cell Size1 :

Response Time

Quality Ratio2: N/A


Parallel Linear Search


Query # : 0

1 the parameter not affects the PLS precision chart

2 ratio between the 10th highest correlation of the time series found by sketches to the 10th highest correlation found by parallel linear search

Experiments were conducted on a cluster of 16 compute nodes with two 8 cores Intel Xeon E5-2630 v3 CPUs, 128 GB RAM, 2x558GB capacity storage per node. The cluster is running under Hadoop version 2.7, Spark v. 2.1 and PostgreSQL v. 9.4 as a relational database system.

* Experimental runs with PLS on a larger datasets couldn't be finished due big time consuming, thus lack of cluster reservation time to conclude similarity search.

The video of demonstration use-case is available here .



Oleksandra Levchenko

Reza Akbarinia

Florent Masseglia

Dennis Shasha

Boyan Kolev