Hot questions for Using RabbitMQ in scala

Question:

I am using docker compose to run a sbt process which connect to rabbitmq-server . Following is my docker compose file : -

version: "3"
services:
  web:
    # replace username/repo:tag with your name and image details
    image: abhishekkumargaya/messanger
    ports:
      - "1883:1883"
    links:
      - rabbit-server
      - redis
      - mysql
    networks:
      - webnet
  rabbit-server:
    image: "rabbitmq:3-management"
    hostname: localhost
    environment:
      #RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
      RABBITMQ_DEFAULT_USER : guest
      RABBITMQ_DEFAULT_PASS : guest
    ports:
      - "5672:5672"
      - "15672:15672"

    networks:
      - webnet

  redis:
    image: "redis:alpine"

  mysql:
    image: mysql
    container_name: database.dev
    command: mysqld --user=root --verbose

    ports:
      - "3306:3306"
    environment:
      MYSQL_DATABASE: "user_messages"
      MYSQL_USER: "test"
      MYSQL_PASSWORD: "root"
      MYSQL_ROOT_PASSWORD: "root"
      MYSQL_ALLOW_EMPTY_PASSWORD: "yes"

networks:
  webnet:
volumes:
  redis-data:

I am getting connection refused error. I am using default value to connect to rabbit server in my code .

private lazy val factory = new ConnectionFactory
val connection = factory.newConnection()

My docker file : -

FROM openjdk:8
ENV SBT_VERSION 0.13.16
RUN \
  curl -L -o sbt-$SBT_VERSION.deb http://dl.bintray.com/sbt/debian/sbt-$SBT_VERSION.deb && \
  dpkg -i sbt-$SBT_VERSION.deb && \
  rm sbt-$SBT_VERSION.deb && \
  apt-get update && \
  apt-get install sbt && \
  sbt sbtVersion

WORKDIR /app
ADD target/scala-2.11/messanger-assembly-1.0.jar /app
EXPOSE 1883
CMD java -jar messanger-assembly-1.0.jar

The error which I am getting is as follows :-

Caused by: java.net.ConnectException: Connection refused (Connection refused)
web_1            |  at java.net.PlainSocketImpl.socketConnect(Native Method)
web_1            |  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
web_1            |  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
web_1            |  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
web_1            |  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
web_1            |  at java.net.Socket.connect(Socket.java:589)
web_1            |  at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
web_1            |  at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62)
web_1            |  at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
web_1            |  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:948)
web_1            |  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907)
web_1            |  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:865)
web_1            |  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1018)
web_1            |  at com.abhishek.rabbitmq.RabbitMqConnectionFactory$.<init>(RabbitMqConnectionFactory.scala:15)
web_1            |  at com.abhishek.rabbitmq.RabbitMqConnectionFactory$.<clinit>(RabbitMqConnectionFactory.scala)

I am not properly understanding how these variable works :-

hostname: localhost
environment:
  #RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
  - RABBITMQ_DEFAULT_USER = guest
  - RABBITMQ_DEFAULT_PASS = guest

I kept this variable like this because by default my client is using host as localhost , user as guest and password as guest to connect to rabbitmq-server If I am just running rabbitmq-server image with command -

docker-compose up --build

I am able to connect it through my localmachine .


Answer:

I am able to connect now. To connect to rabbit container, I need to put host as rabbit in my web container.Similarily for connection to mysql and redis , I need to use host as mysql and redis respectively.

Question:

I am using a Reactive RabbitMQ library which is written in Scala.

I came across this construct:

public interface DeliveryMode { }

public final class Persistent {
    public static String toString() {
        return Persistent$.MODULE$.toString();
    }

    // Other static functions.
}

public final class Persistent$ implements DeliveryMode, Product, Serializable {
    public static final Persistent$ MODULE$;

    static {
        new Persistent$();
    }

    // Other code
    public String toString() {
        return "Persistent";
    }

    private Persistent$() {
        MODULE$ = this;
        class.$init$(this);
    }
}

If they want to access a Function that accepts DeliveryMode like

public Message(DeliveryMode mode) {}

They simply pass in Scala

Message(Persistent);

Can this type be used in Java too?


Answer:

The code is in java so I don't see where the confusion is. Of course it works from java just as you expect. The class Persistent$ implements DeliveryMode has a static instance of itself called MODULE$.

Message(Persistent$.MODULE$);

Assuming Persistent$ is properly imported.

Question:

I want to post a message in JSON format to RabbitMQ and have that message consumed successfully. I'm attempting to use Camel to integrate producers and consumers. However, I'm struggling to understand how to create a route to make this happen. I'm using JSON Schema to define the interface between the Producer and Consumer. My application creates JSON, converts it to a byte[] and a Camel ProducerTemplate is used to send the message to RabbitMQ. On the consumer end, the byte[] message needs to be converted to a String, then to JSON, and then marshalled to an Object so I can process it. The following code line doesn't work however

from(startEndpoint).transform(body().convertToString()).marshal().json(JsonLibrary.Jackson, classOf[Payload]).bean(classOf[JsonBeanExample]), 

It's as if the bean is passed the original byte[] content and not the object created by JSON json(JsonLibrary.Jackson, classOf[Payload]). All the camel examples I've seen which use the json(..) call seem be followed by a to(..) which is the end of the route? Here is the error message

Caused by: org.apache.camel.InvalidPayloadException: No body available of type: uk.co.techneurons.messaging.Payload but has value: [B@48898819 of type: byte[] on: Message: "{\"id\":1}". Caused by: No type converter available to convert from type: byte[] to the required type:     uk.co.techneurons.messaging.Payload with value [B@48898819. Exchange[ID-Tonys-    iMac-local-54996-1446407983661-0-2][Message: "{\"id\":1}"]. Caused by:     [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: byte[] to the required type: uk.co.techneurons.messaging.Payload with value [B@48898819]`    

I don't really want to use Spring, Annotations etc, would like to service activation as simple as possible. Use Camel as much as possible

This is the producer

package uk.co.techneurons.messaging
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext

object RabbitMQProducer extends App {
   val camelContext = new DefaultCamelContext
   val rabbitMQEndpoint: String = "rabbitmq:localhost:5672/advert?autoAck=false&threadPoolSize=1&username=guest&password=guest&exchangeType=topic&autoDelete=false&declare=false"
   val rabbitMQRouteBuilder = new RouteBuilder() {
     override def configure(): Unit = {
       from("direct:start").to(rabbitMQEndpoint)
     }
   }
   camelContext.addRoutes(rabbitMQRouteBuilder)
   camelContext.start
   val producerTemplate = camelContext.createProducerTemplate
   producerTemplate.setDefaultEndpointUri("direct:start")
   producerTemplate.sendBodyAndHeader("{\"id\":1}","rabbitmq.ROUTING_KEY","advert.edited")
  camelContext.stop
}

This is the consumer..

package uk.co.techneurons.messaging
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.model.dataformat.JsonLibrary

object RabbitMQConsumer extends App {
  val camelContext = new DefaultCamelContext
  val startEndpoint = "rabbitmq:localhost:5672/advert?queue=es_index&exchangeType=topic&autoDelete=false&declare=false&autoAck=false"
  val consumer = camelContext.createConsumerTemplate
  val routeBuilder = new RouteBuilder() {
    override def configure(): Unit = {
        from(startEndpoint).transform(body().convertToString()).marshal().json(JsonLibrary.Jackson, classOf[Payload]).bean(classOf[JsonBeanExample])
    }
 }
 camelContext.addRoutes(routeBuilder)
 camelContext.start
 Thread.sleep(1000)
 camelContext.stop
}

case class Payload(id: Long)

class JsonBeanExample {
   def process(payload: Payload): Unit = {
     println(s"JSON ${payload}")
   }
}

For completeness, this is the sbt file for easy replication..

name := """camel-scala"""

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= {
 val scalaTestVersion = "2.2.4"
 val camelVersion: String = "2.16.0"
 val rabbitVersion: String = "3.5.6"
 val slf4jVersion: String = "1.7.12"
 val logbackVersion: String = "1.1.3"
 Seq(
   "org.scala-lang.modules" %% "scala-xml" % "1.0.3",
   "org.apache.camel" % "camel-core" % camelVersion,
   "org.apache.camel" % "camel-jackson" % camelVersion,
   "org.apache.camel" % "camel-scala" % camelVersion,
   "org.apache.camel" % "camel-rabbitmq" % camelVersion,
   "com.rabbitmq" % "amqp-client" % rabbitVersion,
   "org.slf4j" % "slf4j-api" % slf4jVersion,
   "ch.qos.logback" % "logback-classic" % logbackVersion,
   "org.apache.camel" % "camel-test" % camelVersion % "test",
   "org.scalatest" %% "scalatest" % scalaTestVersion % "test")
}

Thanks


Answer:

I decided that I needed to create a Bean and Register it (easier said than done! - for some as yet unknown reason JNDIRegistry didn't work with DefaultCamelContext - so I used a SimpleRegistry),

  val registry: SimpleRegistry  = new SimpleRegistry()
  registry.put("myBean", new JsonBeanExample())
  val camelContext = new DefaultCamelContext(registry)

Then I changed the consuming routeBuilder - seems like I had been over transforming the message.

  from(startEndpoint).unmarshal.json(JsonLibrary.Jackson, classOf[Payload]).to("bean:myBean?method=process")

I also changed the Bean so setter methods were available, and added a toString

class Payload {
   @BeanProperty var id: Long = _
   override def toString = s"Payload($id)"
} 
class JsonBeanExample() {
  def process(payload: Payload): Unit = {
     println(s"recieved ${payload}")
  }
}

The next problem now is to get dead letter queues working, and ensuring that failures in the Bean handler make their way properly back up the stack

Question:

I am trying to import the RabbitMQ library into my SBT Scala project, but I cannot use it. SBT builds just fine.

Here is my build.sbt rabbitmq line:

libraryDependencies += "com.rabbitmq" % "amqp-client" % "3.4.2"

and here is my import line in a .scala file:

import com.rabbitmq.client.AMQP;

and here is my compile error:

Error:(5, 12) object rabbitmq is not a member of package com
    import com.rabbitmq.client.AMQP;
       ^

It seems that the import is simply not working, but I have no idea why...


Answer:

The issue was with another import. I am using IntelliJ, and SBT importing failed silently with no errors until I used the 'SBT tasks' pane and the 'Refresh all SBT projects' button. Once the offending import was gone, everything works again.