This example demonstrates a Queue Worker process that polls the default queue for jobs, and how to enqueue jobs for this Queue Worker.
To run this example, you'll need:
- PostgreSQL, version 9.5 or later, installed locally.
- The Clojure
cljcommand-line tool, version 1.10.1.697 or later, with support for the -X option. (Installation instructions) - Two terminal windows.
- The Proletarian Git repo cloned to a
local directory:
git clone git@github.com:msolli/proletarian.git. We'll assume you're in this directory going forward.
All right, let's get started.
There is a Makefile target for creating the example database. Run it with
make examples.db.install. Here's how it looks:
$ make examples.db.install
DATABASE_NAME=proletarian ./database/install.sh
Installing Proletarian Database
= = =
Creating User
- - -
» proletarian role
Creating Database
- - -
» proletarian database
Creating Schema, Tables, and Index
» proletarian schema
» proletarian.job table
» proletarian.archived_job table
» proletarian.job_queue_process_at index
Granting Privileges
- - -
» schema privileges
» table privileges
= = =
Done Installing Proletarian Database
You can uninstall the example database with make examples.db.uninstall, and
re-create it with make examples.db.recreate.
Please note that while the database install script allows for customization of
the database name, the examples assume that the default (proletarian)
is used.
You might want to exercise Proletarian with an ephemeral Postgres instance running on Docker. Here's how.
In one terminal window:
docker run -p 55432:5432 -e POSTGRES_PASSWORD=proletarian postgresIn another:
PGHOST=localhost PGPORT=55432 PGUSER=postgres PGPASSWORD=proletarian make examples.db.install
In the examples, make sure the DATABASE_URL environment variable is set:
export DATABASE_URL="jdbc:postgresql://localhost:55432/proletarian?user=postgres&password=proletarian"
In one of your terminal windows, run this command to start the Queue Worker:
clj -X:examples example-a.worker/runIt should start a process that polls the default queue for jobs every 5 seconds:
Number of jobs in :proletarian/default queue: 0
Number of jobs in proletarian.jobs table: 0
Number of jobs in proletarian.archived_jobs table: 0
Starting worker for :proletarian/default queue with polling interval 5 s
:proletarian.worker/polling-for-jobs {:worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
:proletarian.worker/polling-for-jobs {:worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
[...and so forth, until you press Ctrl-C]
Leave this process running while you continue to step 3.
What is happening here, is that we've started a Queue Worker thread pool in a JVM process that continuously polls the job table for new work to be done. The polling interval is configurable – in this example it is 5 seconds. (In a production system you'd probably use a much smaller polling interval – the default is 100 ms. For this example, though, it is easier to see what's going on with a larger polling interval.)
The code for this can be found in
the example-a.worker
namespace.
In your other terminal window, run this command to enqueue a job:
clj -X:examples example-a.enqueue-jobs/runIt should add a job to the default queue and print the job details:
Adding new job to :proletarian/default queue:
{:job-id #uuid "...",
:job-type :example-a.enqueue-jobs/echo,
:payload {:message "Hello world!",
:timestamp #inst "..."}}
You should see this job get picked up by the queue worker process in the other terminal window. You'll see output like this:
:proletarian.worker/handling-job {:job-id #uuid "...", :job-type :example-a.enqueue-jobs/echo, :attempt 1, :worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
Running job :example-a.enqueue-jobs/echo. Payload:
{:message "Hello world!", :timestamp #inst "..."}
:proletarian.worker/job-finished {:job-id #uuid "...", :job-type :example-a.enqueue-jobs/echo, :attempt 1, :worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
Run the same command again to see the miracle unfold one more time.
Things to note:
- The job is assigned a
job-idUUID. You shouldn't need to keep track of this id, it's just the unique id of this job. - The job has a
:job-type, which is a Clojure keyword. This keyword is what determines which handler function Proletarian will invoke for this job. You need to implement theproletarian.job/handle-job!multimethod for each job type you want to handle. A common practice is to use a namespaced keyword with shorthand notation for the current namespace (the double colon:::) as your job type. In this example we use::echoin the code, which expands to:example-a.enqueue-jobs/echo, as seen in the output above. - The payload is serialized
using Transit, and stored in
a
TEXTcolumn in the database. Proletarian deserializes the payload and passes it to the handler. The serializer is configurable – implement theproletarian.protocols/Serializerprotocol and pass an instance of this under the:proletarian/serializeroption toproletarian.job/enqueue!andproletarian.worker/queue-worker-controller.