Getting started with PHP and ActiveMQ
What is ActiveMQ?
ActiveMQ is a message broker - software which is designed to facilitate the exchange of information between separate systems. Brokers are able to take messages sent by one system, using one protocol, and dispatch them to another system which may be using a different messaging protocol. These protocols may be AMQP, Stomp, HTTP, MQTT or other possibilities.
The basic principle is simply that we are able to send some data (a message) to a broker and another different system or component can receive and process it. It therefore then doesn't matter whether the systems on either side of the broker speak the same protocol, whether they're written in the same programming language, etc., because we can rely on the broker to act as a common intermediary.
Central to the function of message brokers or message-oriented middleware are the concepts of queues (1-1, point-to-point messaging) and topics (publisher/subscriber). We will take a look at these in more detail below.
In this tutorial, we are going to download and run ActiveMQ on our local machine, write a PHP script to send some messages to a queue and another script to act as a consumer of the queue, receiving the messages and processing them.
As we do this, we will cover the core concepts involved. We will be using the Stomp protocol, via a library providing a pure PHP implementation of this protocol (i.e. no special extensions required).
When to use a message broker
Message brokers are suitable in a range of use cases, including:
- High throughput / high availability between user-facing services and backend systems. Rather than calling a specific API, relying on it being available and waiting for a response, you can shunt a message to a queue and allow any available consumer to pick it up.
- Dispatching data updates to multiple systems at once which use their own localised data stores.
- Facilitating transactions in distributed systems, by dispatching an ordered sequence of events which can in turn fire completed or failed events.
- Communicating between on-prem and cloud systems.
Queue versus topic
A queue is a first-in-first-out (FIFO) system. Messages are received and dispatched to a consumer in the order they are received. Even if there are multiple consumers listening to a queue, only one consumer will receive a message before it is removed from the queue. Systems like ActiveMQ will implement load balancing to distribute messages between consumers. If there is no consumer available, messages will build up in the queue until a consumer becomes available. Thus there is one message to one consumer.
A topic is a publisher/subscriber model. The key difference here is the same message will be sent to all actively subscribed consumers. Consumers joining a subscription later will not receive earlier messages.
Setting up a project
Now we understand the basic principles of brokers, queues and topics, let's build an example in PHP.
First you will need to download ActiveMQ. It is of course possible to run this via Docker, feel free to do so if you're comfortable, but as ActiveMQ is a Java application it will run fine on a local machine with the JRE installed. If you don't already have the Java runtime on your system, be sure to install Java too.
Once you've downloaded ActiveMQ, extract the archive to a new folder on your system. There will be a bin
folder inside containing an executable script activemq
. We are going to run bin/activemq start
.
Once you've started ActiveMQ, you should see something like the following on your terminal:
You will also need PHP (at least version 7.4) and Composer.
Create a new directory for your project and run the following command, which will create a composer.json
file and install the required dependencies.
composer require stomp-php/stomp-php symfony/console
Now create a subdirectory in your project folder called src
and another called bin
. Your project directory should now look like this:
activemq_php/
├─ bin/
├─ src/
├─ composer.json
├─ vendor/
Lastly, open up composer.json
inside your favourite IDE / editor and add the autoload
section for our own classes:
{
"require": {
"stomp-php/stomp-php": "^5.0",
"symfony/console": "^5.3"
},
"autoload": {
"psr-4": {
"App\\": "src/"
}
}
}
Setting up the Stomp connection
Inside your src
directory, create a file called Broker.php
with the following content:
<?php
namespace App;
use Stomp\Client;
use Stomp\Exception\ConnectionException;
use Stomp\Network\Connection;
use Stomp\Network\Observer\HeartbeatEmitter;
use Stomp\StatefulStomp;
use Stomp\Transport\Frame;
use Stomp\Transport\Message;
class Broker
{
// The internal Stomp client
private StatefulStomp $client;
// A list of subscriptions held by this broker
private array $subscriptions = [];
/**
* For our constructor, we'll pass in the hostname (or IP address) and port number.
* @throws ConnectionException
*/
public function __construct(string $host, int $port)
{
$connection = new Connection('tcp://' . $host . ':' . $port);
$client = new Client($connection);
// Once we've created the Stomp connection and client, we will add a heartbeat
// to periodically let ActiveMQ know our connection is alive and healthy.
$client->setHeartbeat(500);
$connection->setReadTimeout(0, 250000);
// We add a HeartBeatEmitter and attach it to the connection to automatically send these signals.
$emitter = new HeartbeatEmitter($client->getConnection());
$client->getConnection()->getObservers()->addObserver($emitter);
// Lastly, we create our internal Stomp client which will be used in our methods to interact with ActiveMQ.
$this->client = new StatefulStomp($client);
$client->connect();
}
}
The constructor sets up our Stomp connection and we can now use the object's client
property to create some other methods which will talk to ActiveMQ.
Create a file in the bin
of your project called send.php
and add the following code:
<?php
use App\Broker;
require_once __DIR__.'/../vendor/autoload.php';
try {
$broker = new Broker('localhost', 61613);
} catch (Exception $e) {
echo "Failed to connect to broker\n";
exit(1);
}
$broker->sendQueue('orders', json_encode([
'items' => [
[
'code' => '3AF09',
'description' => 'Jeans',
'size' => 'M',
'colour' => 'Yellow',
'price' => '24.99',
],
[
'code' => '3AF16',
'description' => 'Sweatshirt',
'size' => 'M',
'colour' => 'Black',
'price' => '32.49',
],
],
'customer' => 1,
]),
[
'type' => 'order',
]);
$broker->sendQueue('orders', json_encode([
'items' => [
'code' => '3AF11',
'description' => 'T-Shirt',
'size' => 'M',
'colour' => 'Grey',
'price' => '11.99'
],
'customer' => 2,
]),
[
'type' => 'order',
]);
exit("Messages sent\n");
We'll run this script from the command line (php bin/send.php
) to send a couple of sample messages to a queue called orders
.
We are including an array represented as JSON and a message header called type
. This is a special header name in an ActiveMQ Stomp connection which will set the JMSType of the message on the queue, which
we will see in the ActiveMQ web UI.
The first time we run this script, the orders
queue will be automatically created.
Sending a message to a queue
First, we need to implement the sendQueue
method on our Broker
class. In the Broker.php
file, add a new method:
public function sendQueue(string $queueName, string $message, array $headers = [])
{
$destination = '/queue/' . $queueName;
$this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
}
Note to the $headers
array, we are adding the persistent
value if it is not already set and it is set to the string 'true'
(rather than boolean true
). Stomp messages are not persistent by default,
which means they will be lost if ActiveMQ is restarted. By setting this special header, we instruct ActiveMQ to persist the messages to disk.
Now, with ActiveMQ running, open up another terminal / console and run php bin/send.php
.
$ php bin/send.php
Messages sent
Now in your browser, visit http://localhost:8161/admin/queues.jsp
You will prompted for a login; the default username and password are both admin
.
Once logged in, click on Queues on the top menu and you should see our orders
queue has been created and contains 2 messages.
Click on the queue name and click on the first message. You can see the type, persistence and message content we sent from the PHP script.
We have successfully sent messages from PHP to ActiveMQ. Now it's time to build another script to read them.
Reading messages from a queue
In the bin
directory of your project, create a new file console.php
. In the src
directory, create a file called QueueListenerCommand.php
.
Our console.php
file will instantiate a simple Symfony console application to execute a long-running process which will establish a connection to a queue and read messages as they
arrive. You could for example daemonize such a process as a service through systemd, upstart or other process managers.
Inside console.php
, add the following content:
#!/usr/bin/env php
<?php
require __DIR__.'/../vendor/autoload.php';
use App\QueueListenerCommand;
use Symfony\Component\Console\Application;
$application = new Application("ActiveMQ Stomp Demo", "1.0");
$application->add(new QueueListenerCommand);
$application->run();
Inside QueueListenerCommand.php
, add the following code:
<?php
namespace App;
use Exception;
use Stomp\Transport\Frame;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class QueueListenerCommand extends Command
{
protected static $defaultName = 'queue:listen';
protected function configure(): void
{
$this->setDescription('Listens to orders queue and prints messages.');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
try {
$broker = new Broker('localhost', 61613);
} catch (Exception $e) {
$output->writeln('<error>Failed to connect to broker</error>');
return Command::FAILURE;
}
$output->writeln('<comment>Connected to broker, listening for messages...</comment>');
$broker->subscribeQueue('orders');
while (true) {
$message = $broker->read();
if ($message instanceof Frame) {
if ($message['type'] === 'terminate') {
$output->writeln('<comment>Received shutdown command</comment>');
return Command::SUCCESS;
}
$output->writeln('<info>Processed message: ' . $message->getBody() . '</info>');
$broker->ack($message);
}
usleep(100000);
}
}
}
All we are doing here is reading messages in a continuous loop (every 100 milliseconds) and printing them to the console. Of course in a real application, you would do some processing here instead. If
processing is successful, you ack()
the message. If processing is not successful, you can replace this with a negative acknowledgement nack()
to let ActiveMQ know it should either attempt to redeliver the message or
move it to the appropriate dead letter queue (more on that below).
We also check if the message we received has a JMSType of "terminate", in which case we end the program. This is just for example purposes, allowing us to terminate our script by sending a special type of message.
In order to receive these messages, we first need to tell ActiveMQ our listener is a subscriber to the orders
queue. You can see in the code above we need to amend our Broker
class and add a couple of methods to
subscribe to a queue, read from it and ack the messages once they are processed. Open up Broker.php
again and add the following methods:
public function subscribeQueue(string $queueName, ?string $selector = null): void
{
$destination = '/queue/' . $queueName;
$this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
}
public function unsubscribeQueue(?string $queueName = null): void
{
if ($queueName) {
$destination = '/queue/' . $queueName;
if (isset($this->subscriptions[$destination])) {
$this->client->unsubscribe($this->subscriptions[$destination]);
}
} else {
$this->client->unsubscribe();
}
}
public function read(): ?Frame
{
return ($frame = $this->client->read()) ? $frame : null;
}
public function ack(Frame $message): void
{
$this->client->ack($message);
}
public function nack(Frame $message): void
{
$this->client->nack($message);
}
You should now be in a position to run php bin\console.php
and see the Symfony console app run:
$ php bin/console.php
ActiveMQ Stomp Demo 1.0
Usage:
command [options] [arguments]
Options:
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
Available commands:
help Display help for a command
list List commands
queue
queue:listen Listens to orders queue and prints messages.
Let's run php bin\console.php queue:listen
and we should pick up the 2 messages we have sitting in the orders
queue:
$ php bin/console.php queue:listen
Connected to broker, listening for messages...
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
If you check the Queues list in the ActiveMQ UI in your browser again (refresh the page), you will see our messages, having been acknowledged, have now disappeared:
In a separate terminal tab/window, run the php bin/send.php
script again, to send 2 more messages. Now go back and check the terminal where our listener is running:
Connected to broker, listening for messages...
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
The additional 2 messages have been picked up and consumed by the listener.
Now, in the ActiveMQ web UI, on the queues page click the link next to our orders queue which says "Send to" and fill in the type
field on the form with terminate
and a message body saying "Goodbye!". Then
click the Send button.
Check the console where our listener was running. You should see:
$ bin/console.php queue:listen
Connected to broker, listening for messages...
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
Processed message: {"items":[{"code":"3AF09","description":"Jeans","size":"M","colour":"Yellow","price":"24.99"},{"code":"3AF16","description":"Sweatshirt","size":"M","colour":"Black","price":"32.49"}],"customer":1}
Processed message: {"items":{"code":"3AF11","description":"T-Shirt","size":"M","colour":"Grey","price":"11.99"},"customer":2}
Received shutdown command
$
and the listener program has stopped.
Redeliver failed messages
If you modify our listener command to nack
the message instead of ack
them and refresh the queues page in your web UI, you'll see ActiveMQ has automatically created a Dead Letter Queue (DLQ) where the messages
which could not be successfully processed have been sent. This prevents unprocessable messages from clogging up your primary queue, since it operates on a first-in-first-out basis.
You can however configure ActiveMQ to attempt limited redelivery of these messages. Messages can go to any active consumer and in the real world it's quite likely you will have multiple instances of your listeners running on different servers, so just because one server couldn't processs a message successfully, doesn't mean we don't want to retry with another.
In the directory on your system you extracted/installed ActiveMQ, there is a subdirectory called conf
and in that directory, a file called activemq.xml
. Open it up in your favourite editor and look for the broker
element,
which will look something like this:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
Change that to read:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
Now, inside the body of the broker
element, add a plugins
element like the following:
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<defaultEntry>
<!-- the fallback policy for all other destinations -->
<redeliveryPolicy maximumRedeliveries="2"
initialRedeliveryDelay="5000"
redeliveryDelay="10000"/>
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
You will then need to save the file and stop and restart ActiveMQ.
Try changing the listener command to randomly nack occasional messages (random_int(1,5) === 1
or something) instead of ack. Monitor the console output of the listener. You should find ActiveMQ will now attempt to redeliver the
message twice, only shunting it to the DLQ if it is not ack'd on subsequent attempts.
Future enhancements
In the real world, you may wish to abstract out your queues, topics and messages in to your business domain, with individual classes representing for example each specific type of message and being
responsible for its (de)serialization. These classes can wrap a Broker dependency, facilitating independent test coverage and slimmer interfaces to send and receive messages. Thus in our example, we might have had classes
such as OrderQueue
and OrderMessage
.
You will also want to read further in to ActiveMQ's configuration and in particular one thing we have not covered here is security, though once configured in ActiveMQ the PHP Stomp client does provide a setLogin
method for specifying a
username and password. See Further Reading below.
I hope having followed this PHP ActiveMQ tutorial, you are now better equipped to take these steps in your real world applications. As always, if you have any feedback please feel free to contact me or comment below.
Further reading
- Stomp protocol reference
- ActiveMQ Stomp reference
- ActiveMQ JMS Selectors reference
- ActiveMQ security reference
- ActiveMQ redelivery reference
- Stomp PHP library
You can download the sample code found in this blog post on my GitHub repository.
Comments
All comments are pre-moderated and will not be published until approval.
Moderation policy: no abuse, no spam, no problem.
Hey guys,
I'm trying to sell my house fast in Colorado and I was wondering if anyone had any tips or suggestions on how to do it quickly and efficiently? I've already tried listing it on some popular real estate websites, but I haven't had much luck yet.
I'm considering selling my house for cash, but I'm not sure if that's the right choice.
Any advice you have would be greatly appreciated.
Thanks in advance!
Editor's reply: For some reason this article, more than any other on my site, attracts a lot of spam comment submissions. This is by far the weirdest, though.
Thank you for this tutorial. Awesome work :)
Thanks for this great tutorial.. Clean explain....
Recent posts
Keep your database data secure by selectively encrypting fields using this free bundle.
php
Learn how to build an extensible plugin system for a Symfony application
php
Buy this advertising space. Your product, your logo, your promotional text, your call to action, visible on every page. Space available for 3, 6 or 12 months.
The difference between failure and success isn't whether you make mistakes, it's whether you learn from them.
musings coding
Recalling the time I turned down a job offer because the company's interview technique sucked.
musings
Recalling the time I was rejected on the basis of a tech test...for the strangest reason!
musings
Nice tutorial - very helpfull, thanks !