Getting started with PHP and ActiveMQ

What is ActiveMQ?

ActiveMQ is a message broker - software which is designed to facilitate the exchange of information between separate systems. Brokers are able to take messages sent by one system, using one protocol, and dispatch them to another system which may be using a different messaging protocol. These protocols may be AMQP, Stomp, HTTP, MQTT or other possibilities.

The basic principle is simply that we are able to send some data (a message) to a broker and another different system or component can receive and process it. It therefore then doesn't matter whether the systems on either side of the broker speak the same protocol, whether they're written in the same programming language, etc., because we can rely on the broker to act as a common intermediary.

Central to the function of message brokers or message-oriented middleware are the concepts of queues (1-1, point-to-point messaging) and topics (publisher/subscriber). We will take a look at these in more detail below.

In this tutorial, we are going to download and run ActiveMQ on our local machine, write a PHP script to send some messages to a queue and another script to act as a consumer of the queue, receiving the messages and processing them.

As we do this, we will cover the core concepts involved. We will be using the Stomp protocol, via a library providing a pure PHP implementation of this protocol (i.e. no special extensions required).

When to use a message broker

Message brokers are suitable in a range of use cases, including:

  • High throughput / high availability between user-facing services and backend systems. Rather than calling a specific API, relying on it being available and waiting for a response, you can shunt a message to a queue and allow any available consumer to pick it up.
  • Dispatching data updates to multiple systems at once which use their own localised data stores.
  • Facilitating transactions in distributed systems, by dispatching an ordered sequence of events which can in turn fire completed or failed events.
  • Communicating between on-prem and cloud systems.

Queue versus topic

A queue is a first-in-first-out (FIFO) system. Messages are received and dispatched to a consumer in the order they are received. Even if there are multiple consumers listening to a queue, only one consumer will receive a message before it is removed from the queue. Systems like ActiveMQ will implement load balancing to distribute messages between consumers. If there is no consumer available, messages will build up in the queue until a consumer becomes available. Thus there is one message to one consumer.

A topic is a publisher/subscriber model. The key difference here is the same message will be sent to all actively subscribed consumers. Consumers joining a subscription later will not receive earlier messages.

Setting up a project

Now we understand the basic principles of brokers, queues and topics, let's build an example in PHP.

First you will need to download ActiveMQ. It is of course possible to run this via Docker, feel free to do so if you're comfortable, but as ActiveMQ is a Java application it will run fine on a local machine with the JRE installed. If you don't already have the Java runtime on your system, be sure to install Java too.

Once you've downloaded ActiveMQ, extract the archive to a new folder on your system. There will be a bin folder inside containing an executable script activemq. We are going to run bin/activemq start.

Screenshot showing how to start ActiveMQ

Once you've started ActiveMQ, you should see something like the following on your terminal:

Screenshot showing ActiveMQ initialization

You will also need PHP (at least version 7.4) and Composer.

Create a new directory for your project and run the following command, which will create a composer.json file and install the required dependencies.

composer require stomp-php/stomp-php symfony/console

Now create a subdirectory in your project folder called src and another called bin. Your project directory should now look like this:

activemq_php/
├─ bin/
├─ src/
├─ composer.json
├─ vendor/

Lastly, open up composer.json inside your favourite IDE / editor and add the autoload section for our own classes:

{
    "require": {
        "stomp-php/stomp-php": "^5.0",
        "symfony/console": "^5.3"
    },
    "autoload": {
        "psr-4": {
            "App\\": "src/"
        }
    }
}

Setting up the Stomp connection

Inside your src directory, create a file called Broker.php with the following content:

<?php

namespace App;

use Stomp\Client;
use Stomp\Exception\ConnectionException;
use Stomp\Network\Connection;
use Stomp\Network\Observer\HeartbeatEmitter;
use Stomp\StatefulStomp;
use Stomp\Transport\Frame;
use Stomp\Transport\Message;

class Broker
{
    // The internal Stomp client
    private StatefulStomp $client;
    // A list of subscriptions held by this broker
    private array $subscriptions = [];

    /**
     * For our constructor, we'll pass in the hostname (or IP address) and port number.
     * @throws ConnectionException
     */
    public function __construct(string $host, int $port)
    {
        $connection = new Connection('tcp://' . $host . ':' . $port);
        $client = new Client($connection);
        // Once we've created the Stomp connection and client, we will add a heartbeat
        // to periodically let ActiveMQ know our connection is alive and healthy.
        $client->setHeartbeat(500);
        $connection->setReadTimeout(0, 250000);
        // We add a HeartBeatEmitter and attach it to the connection to automatically send these signals.
        $emitter = new HeartbeatEmitter($client->getConnection());
        $client->getConnection()->getObservers()->addObserver($emitter);
        // Lastly, we create our internal Stomp client which will be used in our methods to interact with ActiveMQ.
        $this->client = new StatefulStomp($client);
        $client->connect();
    }
}

The constructor sets up our Stomp connection and we can now use the object's client property to create some other methods which will talk to ActiveMQ.

Create a file in the bin of your project called send.php and add the following code:

<?php
use App\Broker;

require_once __DIR__.'/../vendor/autoload.php';

try {
    $broker = new Broker('localhost', 61613);
} catch (Exception $e) {
    echo "Failed to connect to broker\n";
    exit(1);
}

$broker->sendQueue('orders', json_encode([
    'items' => [
        [
            'code' => '3AF09',
            'description' => 'Jeans',
            'size' => 'M',
            'colour' => 'Yellow',
            'price' => '24.99',
        ],
        [
            'code' => '3AF16',
            'description' => 'Sweatshirt',
            'size' => 'M',
            'colour' => 'Black',
            'price' => '32.49',
        ],
    ],
    'customer' => 1,
]),
[
    'type' => 'order',
]);

$broker->sendQueue('orders', json_encode([
    'items' => [
        'code' => '3AF11',
        'description' => 'T-Shirt',
        'size' => 'M',
        'colour' => 'Grey',
        'price' => '11.99'
    ],
    'customer' => 2,
]),
[
    'type' => 'order',
]);

exit("Messages sent\n");

We'll run this script from the command line (php bin/send.php) to send a couple of sample messages to a queue called orders. We are including an array represented as JSON and a message header called type. This is a special header name in an ActiveMQ Stomp connection which will set the JMSType of the message on the queue, which we will see in the ActiveMQ web UI.

The first time we run this script, the orders queue will be automatically created.

Sending a message to a queue

First, we need to implement the sendQueue method on our Broker class. In the Broker.php file, add a new method:

public function sendQueue(string $queueName, string $message, array $headers = [])
{
    $destination = '/queue/' . $queueName;
    $this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
}

Note to the $headers array, we are adding the persistent value if it is not already set and it is set to the string 'true' (rather than boolean true). Stomp messages are not persistent by default, which means they will be lost if ActiveMQ is restarted. By setting this special header, we instruct ActiveMQ to persist the messages to disk.

Now, with ActiveMQ running, open up another terminal / console and run php bin/send.php.

$ php bin/send.php
Messages sent

Now in your browser, visit http://localhost:8161/admin/queues.jsp You will prompted for a login; the default username and password are both admin.

Once logged in, click on Queues on the top menu and you should see our orders queue has been created and contains 2 messages.

Screenshot showing ActiveMQ queues UI

Click on the queue name and click on the first message. You can see the type, persistence and message content we sent from the PHP script.

Screenshot showing message content

We have successfully sent messages from PHP to ActiveMQ. Now it's time to build another script to read them.

Reading messages from a queue

In the bin directory of your project, create a new file console.php. In the src directory, create a file called QueueListenerCommand.php.

Our console.php file will instantiate a simple Symfony console application to execute a long-running process which will establish a connection to a queue and read messages as they arrive. You could for example daemonize such a process as a service through systemd, upstart or other process managers.

Inside console.php, add the following content:

#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use App\QueueListenerCommand;

use Symfony\Component\Console\Application;

$application = new Application("ActiveMQ Stomp Demo", "1.0");
$application->add(new QueueListenerCommand);
$application->run();

Inside QueueListenerCommand.php, add the following code:

<?php

namespace App;

use Exception;
use Stomp\Transport\Frame;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class QueueListenerCommand extends Command
{
    protected static $defaultName = 'queue:listen';

    protected function configure(): void
    {
        $this->setDescription('Listens to orders queue and prints messages.');
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        try {
            $broker = new Broker('localhost', 61613);
        } catch (Exception $e) {
            $output->writeln('<error>Failed to connect to broker</error>');
            return Command::FAILURE;
        }
        $output->writeln('<comment>Connected to broker, listening for messages...</comment>');

        $broker->subscribeQueue('orders');

        while (true) {
            $message = $broker->read();
            if ($message instanceof Frame) {
                if ($message['type'] === 'terminate') {
                    $output->writeln('<comment>Received shutdown command</comment>');
                    return Command::SUCCESS;
                }
                $output->writeln('<info>Processed message: ' . $message->getBody() . '</info>');
                $broker->ack($message);
            }
            usleep(100000);
        }
    }
}

All we are doing here is reading messages in a continuous loop (every 100 milliseconds) and printing them to the console. Of course in a real application, you would do some processing here instead. If processing is successful, you ack() the message. If processing is not successful, you can replace this with a negative acknowledgement nack() to let ActiveMQ know it should either attempt to redeliver the message or move it to the appropriate dead letter queue (more on that below).

We also check if the message we received has a JMSType of "terminate", in which case we end the program. This is just for example purposes, allowing us to terminate our script by sending a special type of message.

In order to receive these messages, we first need to tell ActiveMQ our listener is a subscriber to the orders queue. You can see in the code above we need to amend our Broker class and add a couple of methods to subscribe to a queue, read from it and ack the messages once they are processed. Open up Broker.php again and add the following methods:

public function subscribeQueue(string $queueName, ?string $selector = null): void
{
    $destination = '/queue/' . $queueName;
    $this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
}

public function unsubscribeQueue(?string $queueName = null): void
{
    if ($queueName) {
        $destination = '/queue/' . $queueName;
        if (isset($this->subscriptions[$destination])) {
            $this->client->unsubscribe($this->subscriptions[$destination]);
        }
    } else {
        $this->client->unsubscribe();
    }
}

public function read(): ?Frame
{
    return ($frame = $this->client->read()) ? $frame : null;
}

public function ack(Frame $message): void
{
    $this->client->ack($message);
}

public function nack(Frame $message): void
{
    $this->client->nack($message);
}

You should now be in a position to run php bin\console.php and see the Symfony console app run:

$ php bin/console.php

ActiveMQ Stomp Demo 1.0

Usage:
  command [options] [arguments]

Options:
  -h, --help            Display help for the given command. When no command is given display help for the list command
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi|--no-ansi  Force (or disable --no-ansi) ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
  help          Display help for a command
  list          List commands
 queue
  queue:listen  Listens to orders queue and prints messages.

Let's run php bin\console.php queue:listen and we should pick up the 2 messages we have sitting in the orders queue:

$ php bin/console.php queue:listen

Connected to broker, listening for messages...
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}

If you check the Queues list in the ActiveMQ UI in your browser again (refresh the page), you will see our messages, having been acknowledged, have now disappeared:

Screenshot showing empty queue in web UI

In a separate terminal tab/window, run the php bin/send.php script again, to send 2 more messages. Now go back and check the terminal where our listener is running:

Connected to broker, listening for messages...
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}

The additional 2 messages have been picked up and consumed by the listener.

Now, in the ActiveMQ web UI, on the queues page click the link next to our orders queue which says "Send to" and fill in the type field on the form with terminate and a message body saying "Goodbye!". Then click the Send button.

Screenshot showing sending a message via the web UI

Check the console where our listener was running. You should see:

$ bin/console.php queue:listen
Connected to broker, listening for messages...
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
Received shutdown command

$

and the listener program has stopped.

Redeliver failed messages

If you modify our listener command to nack the message instead of ack them and refresh the queues page in your web UI, you'll see ActiveMQ has automatically created a Dead Letter Queue (DLQ) where the messages which could not be successfully processed have been sent. This prevents unprocessable messages from clogging up your primary queue, since it operates on a first-in-first-out basis.

You can however configure ActiveMQ to attempt limited redelivery of these messages. Messages can go to any active consumer and in the real world it's quite likely you will have multiple instances of your listeners running on different servers, so just because one server couldn't processs a message successfully, doesn't mean we don't want to retry with another.

In the directory on your system you extracted/installed ActiveMQ, there is a subdirectory called conf and in that directory, a file called activemq.xml. Open it up in your favourite editor and look for the broker element, which will look something like this:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

Change that to read:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

Now, inside the body of the broker element, add a plugins element like the following:

        <plugins>
            <redeliveryPlugin fallbackToDeadLetter="true" 
                              sendToDlqIfMaxRetriesExceeded="true">
                <redeliveryPolicyMap>
                    <redeliveryPolicyMap>
                        <defaultEntry>
                            <!-- the fallback policy for all other destinations -->
                            <redeliveryPolicy maximumRedeliveries="2" 
                                              initialRedeliveryDelay="5000"
                                              redeliveryDelay="10000"/>
                        </defaultEntry>
                    </redeliveryPolicyMap>
                </redeliveryPolicyMap>
            </redeliveryPlugin>
        </plugins>

You will then need to save the file and stop and restart ActiveMQ.

Try changing the listener command to randomly nack occasional messages (random_int(1,5) === 1 or something) instead of ack. Monitor the console output of the listener. You should find ActiveMQ will now attempt to redeliver the message twice, only shunting it to the DLQ if it is not ack'd on subsequent attempts.

Future enhancements

In the real world, you may wish to abstract out your queues, topics and messages in to your business domain, with individual classes representing for example each specific type of message and being responsible for its (de)serialization. These classes can wrap a Broker dependency, facilitating independent test coverage and slimmer interfaces to send and receive messages. Thus in our example, we might have had classes such as OrderQueue and OrderMessage.

You will also want to read further in to ActiveMQ's configuration and in particular one thing we have not covered here is security, though once configured in ActiveMQ the PHP Stomp client does provide a setLogin method for specifying a username and password. See Further Reading below.

I hope having followed this PHP ActiveMQ tutorial, you are now better equipped to take these steps in your real world applications. As always, if you have any feedback please feel free to contact me or comment below.

Further reading

You can download the sample code found in this blog post on my GitHub repository.


Comments

Add a comment

All comments are pre-moderated and will not be published until approval.

You can write in _italics_ or **bold** like this.

DevDev Sunday 16 January 2022, 22:29

Thank you for this tutorial. Awesome work :)

ganesan Wednesday 22 December 2021, 01:34

Thanks for this great tutorial.. Clean explain....

Recent posts


Tuesday 31 May 2022, 22:00

...and how not to handle customer service

musings

Tuesday 12 April 2022, 21:40

Ever wondered the best way to do encryption in PHP? This tutorial shows you how!

php

SPONSORED AD

Buy this advertising space. Your product, your logo, your promotional text, your call to action, visible on every page. Space available for 3, 6 or 12 months.

Get in touch

Tuesday 29 March 2022, 18:47

...the AI trained on open source which writes code for you!

php coding

Sunday 06 March 2022, 01:03

...when world affairs and programming collide.

musings

Friday 28 January 2022, 23:46

...just add water! Spin up an Apache server running your choice of PHP version with this free helper.

php