分析- TiDB:一个基于raft的HTAP数据库
摘要
Hybrid Transactional and Analytical Processing (HTAP) databasesrequire processing transactional and analytical queries in isolationto remove the interference between them.
(问题,要为AP和TP维护不同的副本)To achieve this, it is necessaryto maintain different replicas of data specified for the twotypes of queries.
(挑战,副本间一致性,AP副本的fresh) However, it is challenging to provide a consistentview for distributed replicas within a storage system, where analyticalrequests can efficiently read consistent and fresh data fromtransactional workloads at scale and with high availability.
(Ideal)To meet this challenge, we propose extending replicated statemachine-based consensus algorithms to provide consistent replicasfor HTAP workloads.
(基于Ideal提出系统 )Based on this novel idea, we present a RaftbasedHTAP database: TiDB. In the database, we design a multi-Raft storage system which consists of a row store and a columnstore.
The row store is built based on the Raft algorithm. It is scalableto materialize updates from transactional requests with highavailability.
In particular, it asynchronously replicates Raft logs tolearners which transform row format to column format for tuples,forming a real-time updatable column store.
This column store allowsanalytical queries to efficiently read fresh and consistent datawith strong isolation from transactions on the row store.
(辅助的features)Based onthis storage system, we build an SQL engine to process large-scaledistributed transactions and expensive analytical queries. The SQLengine optimally accesses row-format and column-format replicasof data.
We also include a powerful analysis engine, TiSpark, tohelp TiDB connect to the Hadoop ecosystem.
(实验)Comprehensive experimentsshow that TiDB achieves isolated high performance underCH-benCHmark, a benchmark focusing on HTAP workloads.
Introduction
(趋势和现状)Relational database management systems (RDBMS) are popularwith their relational model, strong transactional guarantees, andSQL interface. They are widely adopted in traditional applications,like business systems.However, old RDBMSs do not providescalability and high availability.
(从RDBMS,NoSQL,NewSQL,AP谈现状)Therefore, at the beginningof the 2000s [11], internet applications preferred NoSQL systemslike Google Bigtable [12] and DynamoDB [36].
NoSQL systemsloosen the consistency requirements and provide high scalabilityand alternative data models, like key-value pairs, graphs, and documents.
However, many applications also need strong transactions,data consistency, and an SQL interface, so NewSQL systemsappeared.
NewSQL systems like CockroachDB [38] and GoogleSpanner [14] provide the high scalability of NoSQL for OnlineTransactional Processing (OLTP) read/write workloads and still maintainACID guarantees for transactions [32].
In addition, SQLbasedOnline Analytical Processing (OLAP) systems are being developedquickly, like many SQL-on-Hadoop systems [16].
These systems follow the “one size does not fit all” paradigm[37], using different data models and technologies for the differentpurposes of OLAP and OLTP.
However, multiple systems are veryexpensive to develop, deploy, and maintain. In addition, analyzingthe latest version of data in real time is compelling.
(总结现状的问题,one size does not fit all,提出HTAP产生的原因,要大一统)This has givenrise to hybrid OLTP and OLAP (HTAP) systems in industry andacademia [30].
HTAP systems should implement scalability, highavailability, and transnational consistency like NewSQL systems.
(什么是HTAP,需求是什么)Besides, HTAP systems need to efficiently read the latest data toguarantee the throughput and latency for OLTP and OLAP requestsunder two additional requirements: freshness and isolation.
(分析需求,Freshness,AP的副本如何读到最新数据)Freshness means how recent data is processed by the analyticalqueries [34]. Analyzing the latest data in real time has great businessvalue.
But it is not guaranteed in some HTAP solutions, suchas those based on an Extraction-Transformation-Loading (ETL) processing.Through the ETL process, OLTP systems periodically refresha batch of the latest data to OLAP systems.
The ETL costsseveral hours or days, so it cannot offer real-time analysis. The ETLphase can be replaced by streaming the latest updates to OLAP systemsto reduce synchronization time.
However, because these twoapproaches lack a global data governance model, it is more complexto consider consistency semantics. Interfacing with multiplesystems introduces additional overhead.
(分析需求,Isolation,TP和AP的负载如何互不干扰)Isolation refers to guaranteeing isolated performance for separateOLTP and OLAP queries. Some in-memory databases (such asHyPer [18]) enable analytical queries to read the latest version ofdata from transactional processing on the same server.
Althoughthis approach provides fresh data, it cannot achieve high performancefor both OLTP and OLAP. This is due to data synchronizationpenalties and workload interference. This effect is studied in[34] by running CH-benCHmark [13], an HTAP benchmark on Hy-Per and SAP HANA.
The study found that when a system co-runsanalytical queries, its maximum attainable OLTP throughput is significantly reduced. SAP HANA [22] throughput was reduced byat least three times, and HyPer by at least five times. Similar resultsare confirmed in MemSQL [24].
Furthermore, in-memorydatabases cannot provide high availability and scalability if theyare only deployed on a single server.
To guarantee isolated performance, it is necessary to run OLTPand OLAP requests on different hardware resources. The essentialdifficulty is to maintain up-to-date replicas for OLAP requestsfrom OLTP workloads within a single system.
Besides, the systemneeds to maintain data consistency among more replicates. Notethat maintaining consistent replicas is also required for availability[29]. High availability can be achieved using well-known consensusalgorithms, such as Paxos [20] and Raft [29].
They arebased on replicated state machines to synchronize replicas. It ispossible to extend these consensus algorithms to provide consistentreplicas for HTAP workloads. To the best of our knowledge,this idea has not been studied before.
(基于分析提出方案,在RAFT中增加一个角色,Learner,不参加选举和决策,只是异步的同步数据,并且会把数据转化成利于查询的列存格式)
Following this idea, we propose a Raft-based HTAP database:TiDB.
It introduces dedicated nodes (called learners) to the Raftconsensus algorithm. The learners asynchronously replicate transactionallogs from leader nodes to construct new replicas for OLAPqueries.
In particular, the learners transform the row-format tuplesin the logs into column format so that the replicas are better-suitedto analytical queries. Such log replication incurs little overhead ontransactional queries running on leader nodes.
Moreover, the latencyof such replication is so short that it can guarantee data freshnessfor OLAP. We use different data replicas to separately processOLAP and OLTP requests to avoid interference between them.
Wecan also optimize HTAP requests based on both row-format andcolumn-format data replicas.
Based on the Raft protocol, TiDBprovides high availability, scalability, and data consistency.
(提出contributions)We conclude our contributions as follows.
- We propose building an HTAP system based on consensus algorithmsand have implemented a Raft-based HTAP database,TiDB. It is an open-source project [7] that provides high availability,consistency, scalability, data freshness, and isolation forHTAP workloads.
- We introduce the learner role to the Raft algorithm to generate acolumnar store for real-time OLAP queries.
- We implement a multi-Raft storage system and optimize its read sand writes so that the system offers high performance when scalingto more nodes.
- We tailor an SQL engine for large-scale HTAP queries. The enginecan optimally choose to use a row-based store and a columnarstore.
- We conduct comprehensive experiments to evaluate TiDB’s performanceabout OLTP, OLAP, and HTAP using CH-benCHmark,an HTAP benchmark.
The remainder of this paper is organized as follows. We describethe main idea, Raft-based HTAP, in Section 2, and illustrate the architectureof TiDB in Section 3.
TiDB’s multi-Raft storage andHTAP engines are elaborated upon in Sections 4 and 5. Experimentalevaluation is presented in Section 6. We summarize relatedwork in Section 7.
Finally, we conclude our paper in Section 8.
RAFT-BASEDHTAP
(详细描述Ideal,强调创新性是用一致性算法来构建HTAP数据库)Consensus algorithms such as Raft and Paxos are the foundationof building consistent, scalable, and highly-available distributedsystems.
Their strength is that data is reliably replicated amongservers in real time using the replicated state machine.
We adaptthis function to replicate data to different servers for different HTAPworkloads.
In this way, we guarantee that OLTP and OLAP workloadsare isolated from each other, but also that OLAP requestshave a fresh and consistent view of the data.
To the best of ourknowledge, there is no previous work to use these consensus algorithmsto build an HTAP database.
Since the Raft algorithm is designed to be easy to understandand implement, we focus on our Raft extension on implementinga production-ready HTAP database.
As illustrated in Figure 1, ata high level, our ideas are as follows:
(描述Learner)Data is stored in multipleRaft groups using row format to serve transactional queries.
Each group is composed of a leader and followers. We add alearner role for each group to asynchronously replicate data fromthe leader.
This approach is low-overhead and maintains data consistency.Data replicated to learners are transformed to column-basedformat.
Query optimizer is extended to explore physicalplans accessing both the row-based and column-based replicas.
In a standard Raft group, each follower can become the leader toserve read and write requests.
Simply adding more followers, therefore,will not isolate resources.
Moreover, adding more followerswill impact the performance of the group because the leader mustwait for responses from a larger quorum of nodes before respondingto clients.
Therefore, we introduced a learner role to the Raftconsensus algorithm. A learner does not participate in leader elections,nor is it part of a quorum for log replication.
Log replicationfrom the leader to a learner is asynchronous; the leader does notneed to wait for success before responding to the client. The strongconsistency between the leader and the learner is enforced duringthe read time.
By design, the log replication lag between the leaderand learners is low, as demonstrated in the evaluation section.
(行存和列存的分析,提出行列混存的方案)Transactional queries require efficient data updates, while analyticalqueries such as join or aggregation require reading a subsetof columns, but a large number of rows for those columns.
Row-based format can leverage indexes to efficiently serve transactionalqueries. Column-based format can leverage data compressionand vectorized processing efficiently.
Therefore, when replicatingto Raft learners, data is transformed from row-based formatto column-based format.
Moreover, learners can be deployed inseparate physical resources. As a result, transaction queries andanalytical queries are processed in isolated resources.
Our design also provides new optimization opportunities. Becausedata is kept consistent between both the row-based formatand column-based format, our query optimizer can produce physicalplans which access either or both stores.
We have presented our ideas of extending Raft to satisfy thefreshness and isolation requirements of an HTAP database.
(实现这个ideal,面临的工程上的挑战)
Tomake an HTAP database production ready, we have overcome manyengineering challenges, mainly including:
(1) (如果满足高并发读写)How to build a scalable Raft storage system to support highlyconcurrent read/write?
If the amount of data exceeds the availablespace on each node managed by the Raft algorithm, weneed a partition strategy to distribute data on servers.
Besides,in the basic Raft process, requests are processed sequentially,and any request must be approved by the quorum of Raft nodesbefore responding to clients.
This process involves networkand disk operations, and thus is time-consuming. This overheadmakes the leader become a bottleneck to processing requests,especially on large datasets
(2) (如何保证同步低延迟)How to synchronize logs into learners with low latency to keepdata fresh? Undergoing transactions can generate some verylarge logs.
These logs need to be quickly replayed and materializedin learners so that the fresh data can be read. Transforminglog data into column format may encounter errors due tomismatched schemas.
This may delay log synchronization.
(3) (如何有效执行TP和AP)How to efficiently process both transactional and analyticalqueries with guaranteed performance?
Large transactional queriesneed to read and write huge amounts of data distributed inmultiple servers.
Analytical queries also consume intensive resourcesand should not impact online transactions.
To reduceexecution overhead, they also need to choose optimal plans onboth a row-format store and a column-format store.
在下面的章节,主要围绕这几个工程的问题进行阐述
In the following sections, we will elaborate the design and implementationof TiDB to address these challenges.
ARCHITECTURE
In this section, we describe the high-level structure of TiDB,which is illustrated in Figure 2.
TiDB supports the MySQL protocoland is accessible by MySQL-compatible clients.
It has threecore components: a distributed storage layer, a Placement Driver(PD), and a computation engine layer.
(存储层,TiKV和TiFlash)The distributed storage layer consists of a row store (TiKV) anda columnar store (TiFlash). Logically, the data stored in TiKV isan ordered key-value map.
Each tuple is mapped into a key-valuepair. The key is composed of its table ID and row ID, and the valueis the actual row data, where the table ID and row ID are uniqueintegers, and the row ID would be from a primary key column.
Forexample, a tuple with four columns is encoded as:
To scale out, we take a range partition strategy to split the large keyvaluemap into many contiguous ranges, each of which is calleda Region(分区).
Each Region has multiple replicas for high availability.The Raft consensus algorithm is used to maintain consistencyamong replicas for each Region, forming a Raft group.
The leadersof different Raft groups asynchronously replicate data from TiKVto TiFlash. TiKV and TiFlash can be deployed in separate physicalresources and thus offer isolation when processing transactionaland analytical queries.
(管控层,由于要支持事务,负责提供全局时间戳,保序) Placement Driver (PD) is responsible for managing Regions, includingsupplying each key’s Region and physical location, and automaticallymoving Regions to balance workloads.
PD is also ourtimestamp oracle, providing strictly increasing and globally uniquetimestamps. These timestamps also serve as our transaction IDs.
PD may contain multiple PD members for robustness and performance.PD has no persistent state, and on startup a PD membergathers all necessary data from other members and TiKV nodes.
(计算层)The computation engine layer is stateless and is scalable. Ourtailored SQL engine has a cost-based query optimizer and a distributedquery executor.
TiDB implements a two-phase commit(2PC) protocol based on Percolator [33] to support transactionalprocessing.
The query optimizer can optimally select to read fromTiKV and TiFlash based on the query.
(架构总结)The architecture of TiDB meets the requirement of an HTAPdatabase.
Each component of TiDB is designed to have high availabilityand scalability.
The storage layer uses the Raft algorithm toachieve consistency between data replicas. The low latency replicationbetween the TiKV and TiFlash makes fresh data available toanalytical queries.
The query optimizer, together with the strongly consistentdata between TiKV and TiFlash, offers fast analyticalquery processing with little impact on transactional processing.
Besides the components mentioned above, TiDB also integrateswith Spark, which is helpful to integrate data stored in TiDB andthe Hadoop Distributed File System (HDFS).
TiDB has a rich setof ecosystem tools to import data to and export data from TiDB andmigrate data from other databases to TiDB.
In the following sections, we will do a deep dive on the distributedstorage layer, the SQL engine, and TiSpark to demonstratethe capability of TiDB, a production-ready HTAP database.
MULTI-RAFTSTORAGE
(存储层概述)Figure 3 shows the architecture of the distributed storage layerin TiDB, where the objects with the same shape play the samerole.
The storage layer consists of a row-based store, TiKV, anda column-based store, TiFlash.
The storage maps a large table intoa big key-value map which is split into many Regions stored inTiKV.
Each Region uses the Raft consensus algorithm to maintainthe consistency among replicas to achieve high availability.
MultipleRegions can be merged into one partition when data is replicatedto TiFlash to facilitate table scan. (Partition指的是TIFlash中的分区,对应于一个或多个Regions,便于scan)
The data between TiKVand TiFlash is kept consistent through asynchronous log replication.
Since multiple Raft groups manage data in the distributedstorage layer, we call it multi-Raft storage. (有点强行multi)
In the following sections,we describe TiKV and TiFlash in detail, focusing on optimizationsto make TiDB a production-ready HTAP database.
(TiKV底层重用了RocksDB,Region固定大小96M)A TiKV deployment consists of many TiKV servers. Regionsare replicated between TiKV servers using Raft.
Each TiKV servercan be either a Raft leader or follower for different Regions.
Oneach TiKV server, data and metadata are persisted to RocksDB,an embeddable, persistent, key-value store [5].
Each Region hasa configurable max size, which is 96 MB by default. The TiKVserver for a Raft leader handles read/write requests for the correspondingRegion.
(RAFT标准流程,串行提交,吞吐受限)When the Raft algorithm responds to read and write requests, thebasic Raft process is executed between a leader and its followers:
(1) A Region leader receives a request from the SQL engine layer.
(2) The leader appends the request to its log.
(3) The leader sends the new log entries to its followers, which inturn append the entries to their logs.
(4) The leader waits for its followers to respond. If a quorum ofnodes respond successfully, then the leader commits the requestand applies it locally.
(5) The leader sends the result to the client and continues to processincoming requests.
This process ensures data consistency and high availability.
However,it does not provide efficient performance because the stepshappen sequentially, and may incur large I/O overheads (both diskand network).
The following sections describe how we have optimizedthis process to achieve high read/write throughput, i.e., solvingthe first challenge described in Section 2.
Optimization between Leaders and Followers
In the process described above,
(2,3步并行,写盘的时候,同时发给followers)the second and third steps canhappen in parallel because there is no dependency between them.Therefore, the leader appends logs locally and sends logs to followersat the same time.
If appending logs fails on the leader but aquorum of the followers successfully append the logs, the logs canstill be committed.
(3步,buffer并且batch同步)In the third step, when sending logs to followers,the leader buffers log entries and sends them to its followersin batches.
(不等followers返回,默认成功,继续发送)After sending the logs, the leader does not have to waitfor the followers to respond. Instead, it can assume success andsend further logs with the predicted log index.
If errors occur, theleader adjusts the log index and resends the replication requests.
(异步apply)Inthe fourth step, the leader applying committed log entries can behandled asynchronously by a different thread because at this stagethere is no risk to consistency.
Based on the optimizations above,the Raft process is updated as follows:
(1) A leader receives requests from the SQL engine layer.
(2) The leader sends corresponding logs to followers and appendslogs locally in parallel.
(3) The leader continues to receive requests from clients and repeatsstep (2).
(4) The leader commits the logs and sends them to another threadto be applied.
(5) After applying the logs, the leader returns the results to theclient.
In this optimal process, any request from a client still runs all theRaft steps, but requests from multiple clients are run in parallel, sothe overall throughput increases.
Accelerating Read Requests from Clients
Reading data from TiKV leaders is provided with linearizablesemantics.
This means when a value is read at time t from a Regionleader, the leader must not return a previous version of thevalue for read requests after t.
This can be achieved by using Raftas described above:
issuing a log entry for every read request andwaiting for that entry to be committed before returning.
However,this process is expensive because the log must be replicated acrossthe majority of nodes in a Raft group, incurring the overhead ofnetwork I/O.
To improve performance, we can avoid the log synchronizationphase.
Raft guarantees that once the leader successfully writes its data,the leader can respond to any read requests without synchronizinglogs across servers.
However, after a leader election, the leader rolemay be moved between servers in a Raft group. To achieve readingfrom leaders, TiKV implements the following read optimizationsas described in [29].
The first approach is called read index.
When a leader respondsto a read request, it records the current commit index as a local readindex, and then sends heartbeat messages to followers to confirmits leader role.
If it is indeed the leader, it can return the valueonce its applied index is greater than or equal to the read index.
This approach improves read performance, though it causes a littlenetwork overhead.
Another approach is lease read, which reduces the network overheadof heartbeats caused by the read index.
The leader and followersagree on a lease period, during which followers do not issueelection requests so that the leader is not changed.
During the leaseperiod, the leader can respond to any read request without connectingto its followers.
This approach works well if the CPU clock oneach node does not differ very much.
In addition to the leader, followers can also respond to read requestsfrom clients, which is called follower read.
After a followerreceives a read request, it asks the leader for the newest read index.
If the locally-applied index is equal to or greater than the read index,the follower can return the value to the client; otherwise, it hasto wait for the log to be applied.
Follower read can alleviate thepressure on the leader of a hot Region, thus improving read performance.
Read performance can then be further improved by addingmore followers.
Managing Massive Regions
Massive Regions are distributed on a cluster of servers.
Theservers and data size are dynamically changing, and Regions maycluster in some servers, especially leader replicas.
This causessome servers’ disks to become overused, while others are free. Inaddition, servers may be added to or moved from the cluster.
To balance Regions across servers, the Plancement Driver (PD)schedules Regions with constraints on the number and location ofreplicas.
One critical constraint is to place at least three replicasof a Region on different TiKV instances to ensure high availability.
PD is initialized by collecting specific information from serversthrough heartbeats. It also monitors the workloads of each serverand migrates hot Regions to different servers without impacting applications.
On the other hand, maintaining massive Regions involves sendingheartbeats and managing metadata, which can cause a lot of networkand storage overhead.
However, if a Raft group does not haveany workloads, the heartbeat is unnecessary.
Depending on howbusy the Regions’ the workloads are, we can adjust the frequencyof sending heartbeats.
This reduces the likelihood of running intoissues like network latency or overloaded nodes.
Dynamic Region Split and Merge
(描述Region Split和Merge的过程,也是作为command由raft进行同步)
A large Region may become too hot to be read or written in areasonable time.
Hot or large Regions should be split into smallerones to better distribute workload. On the other hand, it is possiblethat many Regions are small and seldom accessed;
however, thesystem still needs to maintain the heartbeat and metadata. In somecases, maintaining these small Regions incurs significant networkand CPU overhead.
Therefore, it is necessary to merge smaller Regions.Note that to maintain the order between Regions, we onlymerge adjacent Regions in the key space.
Based on observed workloads,PD dynamically sends split and merge commands to TiKV.
A split operation divides a Region into several new, smaller Regions,each of which covers a continuous range of keys in the originalRegion.
The Region that covers the rightmost range reuses theRaft group of the original Region. Other Regions use new Raftgroups.
The split process is similar to a normal update request inthe Raft process:
(1) PD issues a split command to the leader of a Region.
(2) After receiving the split command, the leader transforms thecommand into a log and replicates the log to all its followernodes. The log only includes a split command, instead of modifyingactual data.
(3) Once a quorum replicates the log, the leader commits the splitcommand, and the command is applied to all the nodes in theRaft group.
The apply process involves updating the originalRegion’s range and epoch metadata, and creating new Regionsto cover the remaining range. Note that the command is appliedatomically and synced to disk.
(4) For each replica of a split Region, a Raft state machine is createdand starts to work, forming a new Raft group.The leaderof the original Region reports the split result to PD. The splitprocess completes.
Note that the split process succeeds when a majority of nodescommit the split log. Similar to committing other Raft logs, ratherthan requiring all nodes to finish splitting the Region.
After thesplit, if the network is partitioned, the group of nodes with the mostrecent epoch wins.
The overhead of region split is low as only metadatachange is needed. After a split command finishes, the newlysplit Regions may be moved across servers due to PD’s regular loadbalancing.
Merging two adjacent Regions is the opposite of splitting one.
PD moves replicas of the two Regions to colocate them on separateservers.
Then, the colocated replicas of the two Regions aremerged locally on each server through a two-phase operation; thatis, stopping the service of one Region and merging it with anotherone.
This approach is different from splitting a Region, because itcannot use the log replication process between two Raft groups toagree on merging them.
Column-basedStorage (TiFlash)
Even though we optimize reading data from TiKV as describedabove, the row-format data in TiKV is not well-suited for fast analysis.
Therefore, we incorporate a column store (TiFlash) into TiDB.TiFlash is composed of learner nodes, which just receive Raft logsfrom Raft groups and transform row-format tuples into columnardata.
They do not participate in the Raft protocols to commit logsor elect leaders so they induce little overhead on TiKV.
A user can set up a column-format replica for a table using anSQL statement:
ALTER TABLE x SET TiFLASH REPLICA n;
where x is the name of the table and n is the number of replicas.The default value is 1.
Adding a column replica resembles addingan asynchronous columnar index to a table.
Each table in TiFlashis divided into many partitions, each of which covers a contiguousrange of tuples, in accordance with several continuous Regionsfrom TiKV.
The larger partition facilitates range scan.
When initializing a TiFlash instance, the Raft leaders of the relevantRegions begin to replicate their data to the new learners.
Ifthere is too much data for fast synchronization, the leader sends asnapshot of its data.
Once initialization is complete, the TiFlashinstance begins listening for updates from the Raft groups.
Aftera learner node receives a package of logs, it applies the logs to thelocal state machine, including replaying the logs, transforming thedata format, and updating the referred values in local storage.
In the following sections, we illustrate how TiFlash efficientlyapplies logs and maintains a consistent view with TiKV.
This meetsthe second challenge we described in Section 2.
Log Replayer
(Replay的过程,分为Compact,Decode,Columnar三步)In accordance with the Raft algorithm, the logs received by learnernodes are linearizable.
To keep the linearizable semantics of committeddata, they are replayed according to a first-in, first-out (FIFO)strategy. The log replay has three steps:
(1) Compacting logs: According to the transaction model describedin later Section 5.1, the transactional logs are classified intothree statuses: prewritten, committed, or rollbacked.
The datain the rollbacked logs does not need to be written to disks, soa compact process deletes invalid prewritten logs according torollbacked logs and puts valid logs into a buffer.
(2) Decoding tuples: The logs in the buffer are decoded into rowformattuples, removing redundant information about transactions.Then, the decoded tuples are put into a row buffer.
(3) Transforming data format: If the data size in the row bufferexceeds a size limit or its time duration exceeds a time intervallimit,
these row-format tuples are transformed to columnardata and are written to a local partition data pool.
Transformationrefers to local cached schemas, which are periodicallysynchronized with TiKV as described later.
To illustrate the details of the log replay process, consider the followingexample.
We abstract each Raft log item as transaction ID-operationtype[transaction status][@start ts][#commit ts]operationdata.
According to typical DMLs, the operation type includes inserting,updating, and deleting tuples.
Transactional status maybe prewritten, committed, or rollbacked. Operation data may be aspecifically-inserted or updated tuple, or a deleted key.
In our example shown in Table 1, the raw logs contain eightitems which attempt to insert two tuples, update one tuple, anddelete one tuple.
But inserting k1 is rolled back, so only six ofthe eight raw log items are preserved, from which three tuples aredecoded.
Finally, the three decoded tuples are transformed intofive columns: operation types, commit timestamps, keys, and twocolumns of data.
These columns are appended to the DeltaTree.
Columnar Delta Tree
(总体思路和LSM-tree差不多,实现有所不同,为了检索Delta加了B-tree索引,merge没有分level,直接merge到chunk上,chunk等同于row group)
To efficiently write and read the columnar data with high throughput,we design a new columnar storage engine, DeltaTree, whichappends delta updates immediately and later merges them with thepreviously stable version for each partition.
The delta updates andthe stable data are stored separately in the DeltaTree, as shown inFigure 4.
In the stable space, partition data is stored as chunks, andeach of them covers a smaller range of the partition’s tuples.
Moreover,these row-format tuples are stored column by column. In contrast,deltas are directly appended into the delta space in the orderTiKV generates them.
The store format of columnar data in TiFlashis similar to Parquet [4]. It also stores row groups into columnarchunks.
Differently, TiFlash stores column data of a row group andits metadata to different files to concurrently update files, insteadof only one file in Parquet.
TiFlash just compresses data files usingthe common LZ4 [2] compression to save their disk size.
New incoming deltas are an atomic batch of inserted data or adeleted range. These deltas are cached into memory and materializedinto disks.
They are stored in order, so they achieve the functionof a write-ahead log (WAL). These deltas are usually storedin many small files and therefore induce large IO overhead whenread.
To reduce cost, we periodically compact these small deltasinto a larger one, then flush the larger deltas to disks and replacethe previously materialized small deltas.
The in-memory copy ofincoming deltas facilitates reading the latest data, and if the olddeltas reach a limited size, they are removed.
When reading the latest data of some specific tuples, it is necessaryto merge all delta files with their stable tuples (i.e., read amplification),
because where the related deltas distribute is not known inadvance. Such a process is expensive due to reading a lot of files.
Inaddition, many delta files may contain useless data (i.e., space amplification)that wastes storage space and slows down merging themwith stable tuples.
Therefore, we periodically merge the deltas intothe stable space. Each delta file and its related chunks are readinto memory and merged. Inserted tuples in deltas are added intothe stable, modified tuples replace the original tuples, and deletedtuples are moved.
The merged chunks atomically replace originalchunks in disks.Merging deltas is expensive because the related keys are disorderedin the delta space.
Such disorder also slows down integratingdeltas with stable chunks to return the latest data for read requests.
Therefore, we build a B+ tree index on the top of the delta space.
Each delta item of updates is inserted into the B+ tree ordered byits key and timestamp.
This order priority helps to efficiently locateupdates for a range of keys or to look up a single key in the deltaspace when responding to read requests.
Also, the ordered data inthe B+ tree is easy to merge with stable chunks.
We conduct a micro-experiment to compare the DeltaTree’s performanceto the log-structured-merge (LSM) tree [28] in TiFlash,where data is read as it is updated according to Raft logs.
We setthree TiKV nodes and one TiFlash node, and the hardware configurationsare listed in the experimental section.
We run the onlywrite workload of Sysbench [6] on TiKV and run “select count(id),count(k) from sbtest1” on TiFlash.
To avoid the large write amplificationof data compaction, we implement the LSM storage engineusing a universal compaction, rather than a level style compaction.
This implementation is also adopted in ClickHouse [1], a columnorientedOLAP database.
As shown in Table 2, reading from the delta tree is about twotimes faster than the LSM tree, regardless of whether there are 100or 200 million tuples, as well as the transactional workloads.
Thisis because in the delta tree each read accesses at most one level ofdelta files that are indexed in a B+ tree, while it accesses more overlappedfiles in the LSM tree.
The performance remains almost stableunder different write workloads because the ratio of delta filesis nearly the same. Although the write amplification of DeltaTree(16.11) is greater than the LSM tree (4.74), it is also acceptable.
EXPERIMENTS
(先说下要测哪些)In this section, we first separately evaluate TiDB’s OLTP andOLAP ability.
For OLAP, we investigate the SQL engine’s abilityto choose TiKV and TiFlash, and compare TiSpark to other OLAPsystems.
Then, we measure TiDB’s HTAP performance, includingthe log replication delay between TiKV and TiFlash.
Finally, wecompare TiDB to MemSQL in terms of isolation.
Experimental Setup
(集群情况)Cluster.
We perform comprehensive experiments on a clusterof six servers; each has 188 GB memory and two IntelR XeonRCPU E5-2630 v4 processors, i.e., two NUMA nodes.
Each processorhas 10 physical cores (20 threads) and a 25 MB shared L3cache.
The servers run Centos version 7.6.1810 and are connectedby a 10 Gbps Ethernet network.
(负载情况)Workload.
Our experiments are conducted under a hybrid OLTPand OLAP workload using CH-benCHmark. Source code is publishedonline [7].
The benchmark is composed of standard OLTPand OLAP benchmarks: TPC-C and TPC-H.
It is built from theunmodified version of the TPC-C benchmark.
The OLAP part contains22 analytical queries inspired by TPC-H, whose schema isadapted from TPC-H to the CH-benCHmark schema, plus threemissing TPC-H relations.
At run time, the two workloads are issuedsimultaneously by multiple clients; the number of clients isvaried in the experiments.
Throughput is measured in queries persecond (QPS) or transactions per second (TPS), respectively. Theunit of data in CH-benCHmark is called a warehouse, the same withTPC-C. 100 warehouses take about 70 GB of memory.
。。。。。。
RELATED WORK
Common approaches for building HTAP systems are: evolvingfrom an existing database, extending an open source analytical system,or building from scratch.
TiDB is built from scratch and differsfrom other systems in architecture, data origination, computationengines, and consistency guarantees.
Evolving from an existing database.
Mature databases can provideHTAP solutions based on existing products, and they especiallyfocus on accelerating analytical queries.
They take customapproaches to separately achieve data consistency and high availability.
In contrast, TiDB naturally benefits from the log replicationin the Raft to achieve data consistency and high availability.
Oracle [19] introduced the Database In-Memory option in 2014as the industry’s first dual-format, in-memory RDBMS.
This optionaims to break performance barriers in analytic query workloadswithout compromising (or even improving) performance ofregular transactional workloads.
The columnar storage is a readonlysnapshot, consistent at a point in time, and it is updated usinga fully-online repopulation mechanism.
Oracle’s later work [27]presents the high availability aspects of its distributed architectureand provides fault-tolerant analytical query execution.
SQL Server [21] integrates two specialized storage engines intoits core: the Apollo column storage engine for analytical workloadsand the Hekaton in-memory engine for transactional workloads.
Data migration tasks periodically copy data from the tailof Hekaton tables into the compressed column store.
SQL Serveruses column store indexes and batch processing to efficiently processanalytical queries, utilizing SIMD [15] for data scans.
SAP HANA supports efficiently evaluating separate OLAP andOLTP queries, and uses different data organizations for each.
Toscale OLAP performance, it asynchronously copies row-store datato a columnar store distributed on a cluster of servers [22]. This approachprovides MVCC data with sub-second visibility.
However,it requires a lot of effort to handle errors and keep data consistent.Importantly, the transactional engine lacks high availabilitybecause it is only deployed on a single node.
Transforming an open-source system.
Apache Spark is anopen-source framework for data analysis. It needs a transactionalmodule to achieve HTAP.
Many systems listed below follow thisidea. TiDB does not deeply depend on Spark, as TiSpark is an extension.
TiDB is an independent HTAP database without TiSpark.
Wildfire [10, 9] builds an HTAP engine based on Spark. It processesboth analytical and transactional requests on the same columnardata organization, i.e., Parquet.
It adopts last-write-wins semanticsfor concurrent updates and snapshot isolation for reads.
For high availability, shard logs are replicated to multiple nodeswithout help from consensus algorithms. Analytical queries andtransactional queries can be processed on separate nodes;
however,there is a noticeable delay in processing the newest updates. Wildfireuses a unified multi-version and multi-zone indexing methodfor large-scale HTAP workloads [23].
SnappyData [25] presents a unified platform for OLTP, OLAP,and stream analytics.
It integrates a computational engine for highthroughput analytics (Spark) with a scale-out, in-memory transactionalstore (GemFire).
Recent updates are stored in row format,and then age into a columnar format for analytical queries.
Transactionsfollow a 2PC protocol using GemFire’s Paxos implementationto ensure consensus and a consistent view across the cluster.
Building from scratch.
Many new HTAP systems have investigateddifferent aspects of hybrid workloads, which include utilizingin-memory computing to improve performance, optimal data storage,and availability.
Unlike TiDB, they cannot provide high availability,data consistency, scalability, data freshness, and isolation atthe same time.
MemSQL [3] has an engine for both scalable in-memory OLTPand fast analytical queries.
MemSQL can store database tables eitherin row or column format. It can keep some portion of data inrow format and convert it to columnar format for fast analysis whenwriting data to disks.
It compiles repeat queries into low-level machinecode to accelerate analytic queries, and it uses many lockfreestructures to aid transactional processing.
However, it cannotprovide isolated performance for OLAP and OLTP when runningHTAP workloads.
HyPer [18] used the operating system’s fork system call to providesnapshot isolation for analytical workloads.
Its newer versionsadopt an MVCC implementation to offer serializability, fast transactionprocessing, and fast scans.
ScyPer [26] extends HyPer toevaluate analytical queries at scale on remote replicas by propagatingupdates either using a logical or physical redo log.
BatchDB [24] is an in-memory database engine designed forHTAP workloads. It relies on primary-secondary replication withdedicated replicas, each optimized for a particular workload type(i.e., OLTP or OLAP).
It minimizes load interaction between thetransactional and analytical engines, thus enabling real-time analysisover fresh data under tight SLAs for HTAP workloads.
Notethat it executes analytical queries on row-format replicas and doesnot promise high availability.
Lineage-based data store (L-Store) [35] combines real-time analyticaland transactional query processing within a single unifiedengine by introducing an update-friendly, lineage-based storage architecture.
The storage enables a contention-free update mechanismover a native, multi-version columnar storage model in orderto lazily and independently stage stable data from a write-optimizedcolumnar format into a read-optimized columnar layout.
Peloton [31] is a self-driving SQL database management system.
It attempts to adapt data origination [8] for HTAP workloads at runtime. It uses lock-free, multi-version concurrency control to supportreal-time analytics.
However, it is a single-node, in-memorydatabase by design.
Cockroach DB [38] is a distributed SQL database which offershigh availability, data consistency, scalability, and isolation.
LikeTiDB it is built on top of the Raft algorithm and supports distributedtransactions. It offers a stronger isolation property: serializability,rather than snapshot isolation.
However, it does not support dedicatedOLAP or HTAP functionality.
CONCLUSION
We have presented a production-ready, HTAP database: TiDB.
TiDB is built on top of TiKV, a distributed, row-based store, whichuses the Raft algorithm.
We introduce columnar learners for realtimeanalysis, which asynchronously replicate logs from TiKV, andtransform row-format data into column format.
Such log replicationbetween TiKV and TiFlash provides real-time data consistencywith little overhead.
TiKV and TiFlash can be deployed on separatephysical resources to efficiently process both transactional andanalytical queries.
They can be optimally chosen by TiDB to beaccessed when scanning tables for both transactional and analyticalqueries.
Experimental results show TiDB performs well underan HTAP benchmark, CH-benCHmark.
TiDB provides a genericsolution to evolve NewSQL systems into HTAP systems.