PostgreSQL Queue in Java + Spring

Overview

We already discussed about Queue management solutions in the past, and I am always happy to write about it. Queue managers are not easy to implement, and there is a reson if IBM MQ Series is still a successful product. Some month ago, a big bank customer asked me to provide a small queue implementation to increase asynchronous internal processing of our payment solution.

The project had very strong contraints: I could not use existing queue system because they were not yet available, and I need to be able to provide microservice parallelism in a cloud-environent.

I have very little time to provide a solid solution, and re-inventing the wheel was not an option. Performance was important, but we plan to have a managable numbers of transactions per seconds, far behind modern cloud database capacity.

Challenge accepted.

Background

Super fast queue implementations are normally not based on database (like Oracle/PostgreSQL/SQLite) because a queue normally need to respect the orders of the elements, and databases need to have good locks primitive to avoid contentions and locking.

For instance, Oracle is able to do fine-grained row-level locking if index are correctly provided, but other databases (like SQL Server) normally has fancy “lock escalation” strategies when there are a lot of parallel access. “Lock escalation” means database will locks pages of records (!) reducing even reader performance.

Distributed databases incur in extra penalties because normally they do not want to ‘move around’ the same database page from a node to the other to just extract some records in order, and they need some extra interconnection protocol to ask nearby nodes to lend the next row.

Nowadays distributed queue system can be implemented in an efficient way without a database…but…

  1. Modern database have a SKIP LOCKED construct you can use to increase throughput. For instance, in PostgreSQL you can write something like this, to take an element from a queue:

    1DELETE FROM queue
    2WHERE index = (SELECT index FROM queue ORDER BY index FOR UPDATE SKIP LOCKED LIMIT 1)
    3RETURNING item;
    

    You can run this query in parallel and reduce contention to the minimum.

  2. Lock semantics are already here and correctly implemented.

  3. Transactions and rollbacks are provided out of the box.

  4. Table partition can increase overall performance.

  5. You have very strong fault tolerance out of the box. We shall see that you can even increase performance if you opt for a ‘unlogged’ tables.

  6. Modern database knows how to handle JSON/JSONB object types, so you can do complex queries on data, for gathering statistics, or even index string fields for easy integration with a search engine!

  7. Last but not least, queue management is waaaay more easy on a database. Do you ever tried to empty an ApacheMQ queue, archiving the messages for later submission without an ad-hoc application implementation?

Oracle Advanced queue is database based and it is pretty decent. In the past, a coworker implemented a super efficient queue system on top of Oracle Advanced Queue: the idea was to fast ‘splitting’ the queue in K subqueue. K was chosen to match the number of the consumers. This will provide approximatly 0 contention on the Exadata Cluster, because every thread got its “own” queue and it was impressive the speed we could reach.

There are plenty of queue implementations for PostgreSQL, but they are normally not java-friendly. Also, most of them require to install custom modules, and in my scenario was not applicable and/or required too much review effort.

Because I need to review the code to avoid silly sql injection vulnerabilities, and less code means less risks.

Cut, paste and scrubbing

I ended up creating a stripped-down version of a postgresql-based implementation, reducing it to the bare minimum. I packed it with liquibase to create the API and with a pre-existing Spring implementation. I called it PQUE, because it resembles Star Trek Next Generation captain’s name (Picard) and because it means PostgreSQL QUEue.

In my simple tests, I was able to reach easily hundred of messages/second running the test on the same host and with just one worker node and emulating the processing time of a FOREX market server.

You can even use unlogged tables to get it faster, but you will risk data loss if something bad happen (but cloud sql based solutions normally have very strong consistencies, so…)

Also, partitioning table can help a lot increase overall performance, but this is a story which will need another article :)

Other Postgres implementations

As said, most of this implementation are Python based, and not java-friendly out of the box