as of Camel+RabbitMQ v2.15.1:
By default each new camel rabbitmq route (rabbitmq://localhost/
) will open up a new connection to the server.
If you are using the context to send messages or dynamically create routes, this can eat up connections quickly and become a performance drain.
The simplest solution is to provide a custom ConnectionFactory used by camel-rabbitmq when creating a connection via the connectionFactory
uri attribute.
First, you'll need to subclass your own bean. Make sure to set the host, port, vhost, username, and password. When using connectionFactory, rabbitmq route does not pick up these values from the query URI. Also, override getHost
and getPort
as these are always called.
Failure to override cred methods will lead to strange connection errors! (that can often be confusing to troubleshoot in production since locally, the default connection prefs and creds may work, ie. localhost:5672
)
Ofcourse, cache the connection as well:
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Connection
import com.rabbitmq.client.Address
/**
* http://www.rabbitmq.com/javadoc/com/rabbitmq/client/ConnectionFactory.html
*/
class CachedConnectionFactory extends ConnectionFactory {
Connection cachedConnection
protected Map config = [
host: 'localhost',
port: '5672',
virtualHost: '/',
username: 'guest',
password: 'guest'
]
CachedConnectionFactory() {
super()
setHost(this.config.host)
setPort(this.config.port)
setVirtualHost(this.config.virtualHost)
setUsername(this.config.username)
setPassword(this.config.password)
}
def getConfig() {
[host: "localhost", port: "1234",
virtualHost:'/', username:'guest', password:'guest']
}
/**
* See: http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html
*/
Connection createConnection(Connection newConnection) {
cachedConnection = newConnection
cachedConnection
}
Connection newConnection() {
cachedConnection ?: createConnection(super.newConnection())
} // newConnection
Connection newConnection(Address[] addrs) {
cachedConnection ?: createConnection(super.newConnection(addrs))
}
Connection newConnection(java.util.concurrent.ExecutorService executor) {
cachedConnection ?: createConnection(super.newConnection(executor))
}
Connection newConnection(java.util.concurrent.ExecutorService executor, Address[] addrs) {
cachedConnection ?: createConnection(super.newConnection(executor, addrs))
}
boolean isAutomaticRecoveryEnabled() {
true
}
boolean isTopologyRecoveryEnabled() {
true
}
long getNetworkRecoveryInterval() {
5000
}
int getConnectionTimeout() {
5000
}
Map getClientProperties() {
super.getClientProperties()
}
// called always
String getHost() {
this.config.host ?: super.getHost()
}
// called always
int getPort() {
this.config.port ?: super.getPort()
}
String getVirtualHost() {
this.config.virtualHost ?: super.getVirtualHost()
}
String getUsername() {
this.config.username ?: super.getUsername()
}
String getPassword() {
this.config.password ?: super.getPassword()
}
// called always
boolean isSSL() {
super.isSSL()
}
} // CachedConnectionFactory
Now, you'll want to use it in the route. Note: you are passing in a bean name, not an instance. You'll either need to access the camelContext's spring context or provide a registry of some sort:
def connectionFactory = new CachedConnectionFactory()
def camelContext = new DefaultCamelContext()
def registry = new SimpleRegistry()
def beanName = "cachedConnectionFactory"
registry.put(beanName, connectionFactory)
camelContext.setRegistry(registry)
camelContext.addRoutes(new RouteBuilder() {
def void configure() {
from("direct:input")
.to("rabbitmq://localhost/q?connectionFactory=$beanName")
}
})
Note: you'll probably want use the CachedConnectionFactory in a static context.