Camel & RabbitMQ: Custom ConnectionFactory

@drse / updated June 26, 2015

Share:  

as of Camel+RabbitMQ v2.15.1:

By default each new camel rabbitmq route (rabbitmq://localhost/) will open up a new connection to the server.

If you are using the context to send messages or dynamically create routes, this can eat up connections quickly and become a performance drain.

The simplest solution is to provide a custom ConnectionFactory used by camel-rabbitmq when creating a connection via the connectionFactory uri attribute.

First, you'll need to subclass your own bean. Make sure to set the host, port, vhost, username, and password. When using connectionFactory, rabbitmq route does not pick up these values from the query URI. Also, override getHost and getPort as these are always called.

Failure to override cred methods will lead to strange connection errors! (that can often be confusing to troubleshoot in production since locally, the default connection prefs and creds may work, ie. localhost:5672)

Ofcourse, cache the connection as well:

import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Connection
import com.rabbitmq.client.Address

/**
 * http://www.rabbitmq.com/javadoc/com/rabbitmq/client/ConnectionFactory.html
 */
class CachedConnectionFactory extends ConnectionFactory {
  Connection cachedConnection

  protected Map config = [
    host: 'localhost',
    port: '5672',
    virtualHost: '/',
    username: 'guest',
    password: 'guest'
  ]

  CachedConnectionFactory() {
    super()
    setHost(this.config.host)
    setPort(this.config.port)
    setVirtualHost(this.config.virtualHost)
    setUsername(this.config.username)
    setPassword(this.config.password)
  }

  def getConfig() {
    [host: "localhost", port: "1234",
    virtualHost:'/', username:'guest', password:'guest']
  }

  /**
   * See: http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html
   */
  Connection createConnection(Connection newConnection) {
    cachedConnection = newConnection
    cachedConnection
  }

  Connection newConnection() {
    cachedConnection ?: createConnection(super.newConnection())
  } // newConnection

  Connection newConnection(Address[] addrs) {
    cachedConnection ?: createConnection(super.newConnection(addrs))
  }

  Connection newConnection(java.util.concurrent.ExecutorService executor) {
    cachedConnection ?: createConnection(super.newConnection(executor))
  }

  Connection newConnection(java.util.concurrent.ExecutorService executor, Address[] addrs) {
    cachedConnection ?: createConnection(super.newConnection(executor, addrs))
  }

  boolean isAutomaticRecoveryEnabled() {
    true
  }

  boolean isTopologyRecoveryEnabled() {
    true
  }

  long getNetworkRecoveryInterval() {
    5000
  }

  int getConnectionTimeout() {
    5000
  }

  Map getClientProperties() {
    super.getClientProperties()
  }

  // called always
  String getHost() {
    this.config.host ?: super.getHost()
  }

  // called always
  int getPort() {
    this.config.port ?: super.getPort()
  }

  String getVirtualHost() {
    this.config.virtualHost ?: super.getVirtualHost()
  }

  String getUsername() {
    this.config.username ?: super.getUsername()
  }

  String getPassword() {
    this.config.password ?: super.getPassword()
  }

  // called always
  boolean isSSL() {
    super.isSSL()
  }
} // CachedConnectionFactory

Now, you'll want to use it in the route. Note: you are passing in a bean name, not an instance. You'll either need to access the camelContext's spring context or provide a registry of some sort:

def connectionFactory = new CachedConnectionFactory()
def camelContext = new DefaultCamelContext()
def registry = new SimpleRegistry()
def beanName = "cachedConnectionFactory"
registry.put(beanName, connectionFactory)
camelContext.setRegistry(registry)
camelContext.addRoutes(new RouteBuilder() {
  def void configure() {
    from("direct:input")
    .to("rabbitmq://localhost/q?connectionFactory=$beanName")
  }
})

Note: you'll probably want use the CachedConnectionFactory in a static context.