Skip to content

Overview

Concepts and Principles

Development

Overview

IDEs

API Explorer

Releases

Release Notes

TORO Integrate

Coder Studio

Coder Cloud

Bug Reports

Search

Configuring ActiveMQ with Master-Slave setup

Before proceeding to follow this documentation, it is recommended that you first understand how to configure a single remote ActiveMQ instance with TORO Integrate and also configure your remote ActiveMQ instance with authentication and authorization.

Prerequisites

It is recommended that you have (two) 2 separate machines running an ActiveMQ instance, but alternatively you can deploy two (2) instances of ActiveMQ in one machine.

An NFS server will be required for shared data between your master and slave instances.

Configuring NFS Server & Client

An NFS or a Network File System is a protocol that involves a server and a client. NFS enables you to mount directories to share and directly access files within a network.

Below are steps to configure an NFS server and client setup for CentOS 7 although any correctly configured NFS server on any other operating system will also be suitable.

Configurations for the NFS Server

Step 1: Determine the NFS server

Determine the instance which will act as the NFS server.

Step 2: Install NFS packages

Up next, we will install the NFS packages with yum.

1
yum install nfs-utils

nfs-utils package

The nfs-utils package contains the tools for both the server and client which are necessary for the kernel's NFS abilities.

Step 3: Create or determine the directory that will be shared for your NFS setup.

For this set up, we would recommend creating or targeting a directory in /datastore/activemq-data, but you may always choose differently or create another directory according to your preference.

1
mkdir -p /datastore/activemq-data

If this directory already exists, you may skip this step.

Step 4: Permissions of NFS directory

Once you have created or determined the NFS directory from the NFS server, you can now set the permissions of the folder using the chmod and chown commands.

1
chmod -R 7777 /datastore/activemq-data && chown $user:$group /datastore/activemq-data

Step 5: Start and enable the services

After the packages have been installed and the directories have been determined, created and configured. You may now proceed to enable the services to set up the NFS server.

1
2
3
4
5
6
7
8
systemctl enable rpcbind
systemctl enable nfs-server
systemctl enable nfs-lock
systemctl enable nfs-idmap
systemctl start rpcbind
systemctl start nfs-server
systemctl start nfs-lock
systemctl start nfs-idmap

Step 6: Sharing the NFS over the network

For this portion we will now share the NFS directory to enable clients to access it. First, open the /etc/exports file and edit it with your preferred text editor.

1
vi /etc/exports

Next, input the following:

1
/datastore/activemq-data    <nfs-client-ip>(rw,sync,no_root_squash,no_all_squash)

Note: The server and client must be able to ping one another.

Step 7: Start the NFS service

You can now start the nfs service for your server.

systemctl restart nfs-server

Step 8: Configuring the firewall for CentOS 7

1
2
3
4
firewall-cmd --permanent --zone=public --add-service=nfs
firewall-cmd --permanent --zone=public --add-service=mountd
firewall-cmd --permanent --zone=public --add-service=rpc-bind   
firewall-cmd --reload

Setting up the NFS client

After you have set up the NFS Server, it is now time to set up the client side.

Step 1: Install NFS packages

Similar to the first steps in the server side, you should install the NFS packages with yum.

1
yum install nfs-utils

Step 2: Create the NFS Directory mount point/s

1
mkdir /datastore/activemq-data

Step 3: Mount the directory

1
mount -t nfs <nfs-server-ip>:/datastore/activemq-data /datastore/activemq-data

Step 4: Verify NFS mount

1
df -kh

You will see a list of file systems and belonging in that list should be your NFS configurations earlier.

After verifying that your mounts exist, you may now proceed to populate your NFS directory.

Setting up a Permanent NFS Mount

By default, you will have to remount all your NFS directories after every reboot. To make sure it is available on boot you can follow the following steps:

Step 1: Modify the fstab file

Open the fstab file with your text editor of choice.

1
vi /etc/fstab

Step 2: Add the configurations on the file

Add the following lines to the file to have the mount points automatically configured after reboot.

1
<nfs-server-ip>:/datastore/activemq-data    /datastore/activemq-data    nfs   defaults 0 0

Master-Slave Configuration for TORO Integrate

To use more than one (1) instance of ActiveMQ, simply make the following changes to your override.properties file in your data directory.

1
jms.url=failover:tcp://<activemq1-ip-address>:61616,tcp://<activemq2-ip-address>:61616?CloseAsync=false

Embedded ActiveMQ

You can choose to select the embedded ActiveMQ instance as a fail-over option. Simply make these modifications to the data in the override.properties file:

1
jms.url=failover:tcp://<activemq1-ip-address>:61616,tcp://<activemq2-ip-address>:61616,tcp://0.0.0.0:61616?CloseAsync=false

tcp://0.0.0.0:61616 is what defines the embedded ActiveMQ instance.

Duplicate JMS Client IDs

It should be noted that no existing connections or instances should be using the same jms.clientId or the connection will fail.

ActiveMQ Configuration

The Broker Bean Element

Broker bean element is used to configure the ActiveMQ brokerName, dataDirectory, etc. The recommended setup of the remote ActiveMQ is the following:

1
2
3
4
5
6
7
8
9
<broker xmlns="http://activemq.apache.org/schema/core"
    brokerName="activemq-broker1"
    dataDirectory="${activemq.data}"
    useJmx="true" advisorySupport="true"
    persistent="true"
    deleteAllMessagesOnStartup="false"
    useShutdownHook="false"
    schedulerSupport="true"
    start="false">

Destination Policies

For per destination policies, there are number of different policies which can be attached to individual destinations (queues, topics) or to wildcards of queue/topic hierarchies. The following shows configuration for topics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<destinationPolicy>
<policyMap>
    <policyEntries>
        <policyEntry topic=">"
            producerFlowControl="false"
            gcInactiveDestinations="true"
            inactiveTimeoutBeforeGC="5000"
            memoryLimit="5mb"
            expireMessagesPeriod="0"
            advisoryWhenFull="true"
            maxDestinations="200" >
            <!-- 100 destinations all in all. -1 is default meaning infinite and can prefetch 32767 messages -->
            <!-- This parameter is intended to limit the number of hierarchical destinations that can be created under a wildcard destination. -->

            <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="20"/> <!--   20 seconds worth  -->
            </pendingMessageLimitStrategy>
            <dispatchPolicy>
                <strictOrderDispatchPolicy />
            </dispatchPolicy>

            <subscriptionRecoveryPolicy>
                <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> <!--  1 minutes worth -->
            </subscriptionRecoveryPolicy>
        </policyEntry>
    </policyEntries>
</policyMap>
</destinationPolicy>

Changes

Changes are immediately applied once saved. Check logs for confirmation.

Management Context

The next one is for manageability. The following can be configured if you'd like to have control the behavior of the broker via the JMX MBeans. This will allow you to connect remotely via JConsole.

1
2
3
<managementContext>
    <managementContext createConnector="true" rmiServerPort="1098" connectorPort="1099"/>
</managementContext>

JConsole

Connect to jconsole through: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi.

KahaDB Directory

KahaDB is a file-based persistence database that is local to the message broker that is using it. Point to the directory /datastore/activemq-data created a while ago.

1
2
3
<persistenceAdapter>
    <kahaDB directory="/datastore/activemq-data" indexWriteBatchSize="1000" enableIndexWriteAsync="true"/>
</persistenceAdapter>

Transport Connectors

Since this documentation is about fail-over, one of the transport connectors that should be used is openwire. This is so that brokers can talk to each other over the network. The configuration below shows the transport connector for the first instance of our broker:

1
2
3
4
5
<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true"/>
    ....
</transportConnectors>

Default Ports

  • 61616 - Default TCP port

  • 8161 - Default web console port

  • 5672 - Default amqp port

Summary

You have successfully setup a master-slave setup for ActiveMQ. Your completed activemq.xml file should look something like this:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance withzb
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

<!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
        lazy-init="false" scope="singleton"
        init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
<broker xmlns="http://activemq.apache.org/schema/core"
        brokerName="activemq-broker1"
        dataDirectory="${activemq.data}"
        useJmx="true" advisorySupport="true"
        persistent="true"
        deleteAllMessagesOnStartup="false"
        useShutdownHook="false"
        schedulerSupport="true"
        start="false">

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic=">"
                producerFlowControl="false"
                gcInactiveDestinations="true"
                inactiveTimeoutBeforeGC="5000"
                memoryLimit="5mb"
                expireMessagesPeriod="0"
                advisoryWhenFull="true"
                maxDestinations="200" >
                <!-- 100 destinations all in all. -1 is default meaning infinite and can prefetch 32767 messages -->
                <!-- This parameter is intended to limit the number of hierarchical destinations that can be created under a wildcard destination. -->

                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="20"/> <!--   20 seconds worth  -->
                </pendingMessageLimitStrategy>
                <dispatchPolicy>
                    <strictOrderDispatchPolicy />
                </dispatchPolicy>

                <subscriptionRecoveryPolicy>
                    <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> <!--  1 minutes worth -->
                </subscriptionRecoveryPolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="true" rmiServerPort="1098" connectorPort="1099"/>
        </managementContext>
    <plugins>
            <jaasAuthenticationPlugin configuration="activemq-domain" />
            <runtimeConfigurationPlugin checkPeriod="1000" />
        <authorizationPlugin>
        <map>
            <authorizationMap>
                <authorizationEntries>
                    <!--To make security on every topic, the application needs be to configured to have their unique identifier on their topic name-->
                    <authorizationEntry topic="jmsPrefix.statistics.>" read="admins" write="admins" admin="admins" />
                    <authorizationEntry topic="jmsPrefix.io.toro.integrate.>" read="admins" write="admins" admin="admins" />
                    <authorizationEntry queue="jmsPrefix.io.toro.integrate.>" read="admins" write="admins" admin="admins" />
                    <authorizationEntry topic="ActiveMQ.Advisory.>"
                        read="admins"
                        write="admins"
                        admin="admins"/>

                </authorizationEntries>
            </authorizationMap>
        </map>
        </authorizationPlugin>
    </plugins>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="/datastore/activemq-data" indexWriteBatchSize="1000" enableIndexWriteAsync="true"/>
        </persistenceAdapter>


        <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

Jetty Settings

The activemq.xml file will often either contain Jetty settings or import them from another file.

1
<import resource="jetty.xml"/>

MCollective doesn’t use this. If you’re not using it to manage ActiveMQ, leaving it enabled may be a security risk.