Hot questions for Using GlassFish in message driven bean

Top Java Programmings / GlassFish / message driven bean

Question:

I have an web application, that runs under Glassfish 4.1, that contains a couple of features that require JMS/MDB. In particular I am having problems regarding the generation of a report using JMS/MDB, that is, obtain data from a table and dump them in a file.

This is what happens, i have a JMS/MDB message that does a couple tasks in an Oracle database and after having the final result in a table, i would like to obtain a csv report from that table (which usually is 30M+ records).

So while in JMS/MDB this is what happens to generate the report:

public boolean handleReportContent() {

    Connection conn = null;

    try {
        System.out.println("Handling report content... " + new Date());
        conn = DriverManager.getConnection(data.getUrl(), data.getUsername(), data.getPassword());
        int reportLine = 1;
        String sql = "SELECT FIELD_NAME, VALUE_A, VALUE_B, DIFFERENCE FROM " + data.getDbTableName() + " WHERE SET_PK IN ( SELECT DISTINCT SET_PK FROM " + data.getDbTableName() + " WHERE IS_VALID=? )";
        PreparedStatement ps = conn.prepareStatement(sql);
        ps.setBoolean(1, false);
        ResultSet rs = ps.executeQuery();

        List<ReportLine> lst = new ArrayList<>();
        int columns = data.getLstFormats().size();
        int size = 0;
        int linesDone = 0;

        while (rs.next()) {

            ReportLine rl = new ReportLine(reportLine, rs.getString("FIELD_NAME"), rs.getString("VALUE_A"), rs.getString("VALUE_B"), rs.getString("DIFFERENCE"));
            lst.add(rl);
            linesDone = columns * (reportLine - 1);
            size++;
            if ((size - linesDone) == columns) {
                reportLine++;

                if (lst.size() > 4000) {
                    appendReportContentNew(lst);
                    lst.clear();
                }
            }
        }

        if (lst.size() > 0) {
            appendReportContentNew(lst);
            lst.clear();
        }

        ps.close();
        conn.close();
        return true;
    } catch (Exception e) {
        System.out.println("exception handling report content new: " + e.toString());
        return false;
    }

This is working, i am aware it is slow and inneficient and most likely there is a better option to perform the same operation. What this method does is:

  • collect the data from the ResultSet;
  • dump it in a List;
  • for each 4K objects will call the method appendReportContentNew()
  • dump the data in the List for the file

    public void appendReportContentNew(List<ReportLine> lst) {
    
    File f = new File(data.getJobFilenamePath());
    
    try {
        if (!f.exists()) {
            f.createNewFile();
        }
    
        FileWriter fw = new FileWriter(data.getJobFilenamePath(), true);
        BufferedWriter bw = new BufferedWriter(fw);
    
        for (ReportLine rl : lst) {
            String rID = "R" + rl.getLine();
            String fieldName = rl.getFieldName();
            String rline = rID + "," + fieldName + "," + rl.getValue1() + "," + rl.getValue2() + "," + rl.getDifference();
            bw.append(rline);
            bw.append("\n");
        }
    
        bw.close();
    
    } catch (IOException e) {
        System.out.println("exception appending report content: " + e.toString());
    }
    

    }

With this method, in 20 minutes, it wrote 800k lines (30Mb file) it usually goes to 4Gb or more. This is what i want to improve, if possible.

So i decided to try OpenCSV, and i got the following method:

public boolean handleReportContentv2() {

    Connection conn = null;

    try {
        FileWriter fw = new FileWriter(data.getJobFilenamePath(), true);
        System.out.println("Handling report content v2... " + new Date());
        conn = DriverManager.getConnection(data.getUrl(), data.getUsername(), data.getPassword());
        String sql = "SELECT NLINE, FIELD_NAME, VALUE_A, VALUE_B, DIFFERENCE FROM " + data.getDbTableName() + " WHERE SET_PK IN ( SELECT DISTINCT SET_PK FROM " + data.getDbTableName() + " WHERE IS_VALID=? )";
        PreparedStatement ps = conn.prepareStatement(sql);
        ps.setBoolean(1, false);
        ps.setFetchSize(500);
        ResultSet rs = ps.executeQuery();

        BufferedWriter out = new BufferedWriter(fw);
        CSVWriter writer = new CSVWriter(out, ',', CSVWriter.NO_QUOTE_CHARACTER);
        writer.writeAll(rs, false);

        fw.close();
        writer.close();
        rs.close();
        ps.close();
        conn.close();
        return true;
    } catch (Exception e) {
        System.out.println("exception handling report content v2: " + e.toString());
        return false;
    }
}

So I am collecting all the data from the ResultSet, and dumping in the CSVWriter. This operation for the same 20 minutes, only wrote 7k lines.

But the same method, if I use it outside the JMS/MDB, it has an incredible difference, just for the first 4 minutes it wrote 3M rows in the file. For the same 20 minutes, it generated a file of 500Mb+.

Clearly using OpenCSV is by far the best option if i want to improve the performance, my question is why it doesn't perform the same way inside the JMS/MDB? If it is not possible is there any possible solution to improve the same task by any other way?

I appreciate the feedback and help on this matter, i am trying to understand the reason why the behavior/performance is different in/out of the JMS/MDB.

**

EDIT:

**

@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "MessageQueue")})

public class JobProcessorBean implements MessageListener {

private static final int TYPE_A_ID = 0;
private static final int TYPE_B_ID = 1;

@Inject
JobDao jobsDao;

@Inject
private AsyncReport generator;

public JobProcessorBean() {
}

@Override
public void onMessage(Message message) {
    int jobId = -1;
    ObjectMessage msg = (ObjectMessage) message;
    try {
        boolean valid = true;
        JobWrapper jobw = (JobWrapper) msg.getObject();
        jobId = jobw.getJob().getJobId().intValue();

        switch (jobw.getJob().getJobTypeId().getJobTypeId().intValue()) {
            case TYPE_A_ID:
                jobsDao.updateJobStatus(jobId, 0);
                valid = processTask1(jobw);
                if(valid) {
                    jobsDao.updateJobFileName(jobId, generator.getData().getJobFilename());
                    System.out.println(":: :: JOBW FileName :: "+generator.getData().getJobFilename());
                    jobsDao.updateJobStatus(jobId, 0);
                }
                else {
                    System.out.println("error...");
                    jobsDao.updateJobStatus(jobId, 1);
                }
                **boolean validfile = handleReportContentv2();**
                if(!validfile) {
                    System.out.println("error file...");
                    jobsDao.updateJobStatus(jobId, 1);
                }
                break;
            case TYPE_B_ID:
                (...)
        }
        if(valid) {        
            jobsDao.updateJobStatus(jobw.getJob().getJobId().intValue(), 2); //updated to complete
        }
        System.out.println("***********---------Finished JOB " + jobId + "-----------****************");
        System.out.println();
        jobw = null;
    } catch (JMSException ex) {
        Logger.getLogger(JobProcessorBean.class.getName()).log(Level.SEVERE, null, ex);
        jobsDao.updateJobStatus(jobId, 1);
    } catch (Exception ex) {
        Logger.getLogger(JobProcessorBean.class.getName()).log(Level.SEVERE, null, ex);
        jobsDao.updateJobStatus(jobId, 1);
    } finally {
        msg = null;
    }
}

private boolean processTask1(JobWrapper jobw) throws Exception {

    boolean valid = true;
    jobsDao.updateJobStatus(jobw.getJob().getJobId().intValue(), 0);

    generator.setData(jobw.getData());
    valid = generator.deployGenerator();
    if(!valid) return false;
    jobsDao.updateJobParameters(jobw.getJob().getJobId().intValue(),new ReportContent());

    Logger.getLogger(JobProcessorBean.class.getName()).log(Level.INFO, null, "Job Finished");
    return true;
}

So if the same method, handleReportContent() is executed inside the generator.deployGenerator() is has those slow results. If I wait for everything inside that method and make the file in this bean JobProcessorBean is way more fast. I am just trying to figure out why/how the behavior works to performs like this.


Answer:

Adding the @TransactionAttribute(NOT_SUPPORTED) annotation on the bean might solve the problem (and it did, as your comment indicates).

Why is this so? Because if you don't put any transactional annotation on a message-driven bean, the default becomes @TransactionAttribute(REQUIRED) (so everything the bean does, is supervised by a transaction manager). Apparently, this slows things down.

Question:

I'm running on glassfish v4 and I use a messagedriven bean. Currently I'm defining the principal under which the bean runs in the glassfish-ejb-jar.xml like:

<enterprise-beans>
  <ejb>
    <ejb-name>MessageConsumerBean</ejb-name>
    <principal>
      <name>MDBPrincipal</name>
    </principal>

Is it possible to do the same using just annotations e.g. @MessageDriven?


Answer:

According to https://java.net/downloads/ejb-spec/mdb.no-method.interface.pdf section 5.4.14 page 133 a Principal may be propagated in the security context but details are not governed by the EJB spec. In other words it's a platform dependent feature.

One can use @RunAs If MDB need to be granted a specific role to run some protected @RolesAllowed method.

If you need Principal for some application logic I'm afraid there's only platform dependent solution like RunAsPrincipal (JBoss) or glassfish-ejb-jar.xml in your case.

Question:

My basic problem: the MessageDriven bean throws an Exception somewhere, which leads to a rollback in the JMS Queue until it just blows up and the server breaks down. I can't even look for the Exceptions to solve the problem because the server-logs grow to a few hundred MB in only a few Minutes.

What I am looking for: a way to first clear the JMS Queue and then stop the rollbacks once and for all.

This is what I do:


   the object         stateless bean to         Message-Driven Bean     here it throws
 I want to send     send the ObjectMessage     to receive the message   the Exception
  +-----------+      +-----------------+        +-----------------+     +----------+
  |  MyObject |----->| MessageProducer |------->| MessageReceiver |---->| do stuff |
  +-----------+      +-----------------+        +-----------------+     +----------+  

The object I send is a simple serializable POJO.

The MessageProducer looks like this:

@Stateless
public class MessageProducer{

    @Resource(mappedName = "jms/JMSConnectionFactory")
    private ConnectionFactory connectionFactory;

    @Resource(mappedName = "jms/MessageReceiver")
    Queue messageReceiver;

    public void sendMessage(PostContainer postContainer){          
        MessageProducer messageProducer;
        // try with resources to close everything on Exception
        try(Connection connection = connectionFactory.createConnection(); 
               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)){
            messageProducer = session.createProducer(messageReceiver);

            ObjectMessage objectMessage = session.createObjectMessage();
            objectMessage.setObject(myObject);
            objectMessage.setJMSRedelivered(false);  // this doesn't seem to have any effect  
            messageProducer.send(objectMessage);
            messageProducer.close();
        }catch(Exception ex){
            System.out.err("error when sendingMessage .. ignoring"); // when "ignoring" the Exception I thought to at least save me the enormous log messages
        }
    }
}

The message bean looks like this:

@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/MessageReceiver")
})
public class MessageReceiver implements MessageListener{

    public MessageReceiver(){
    }

    @Override
    public void onMessage(Message message){
        try{
            if(message instanceof ObjectMessage){
                ObjectMessage objectMessage = (ObjectMessage) message;
                MyObject myObject = (MyObject) objectMessage.getObject();
                doStuff(myObject); // the method that handles the incoming object - here the Exception gets thrown
            }
        }catch(Exception ex){
           System.err.println("message could not be received: " + ex.getMessage());  // still trying to ignore exceptions here
        }
    }
}

Some part of the Exception

javax.ejb.TransactionRolledbackLocalException: Exception thrown from bean
    at com.sun.ejb.containers.EJBContainerTransactionManager.checkExceptionClientTx(EJBContainerTransactionManager.java:662)
    at com.sun.ejb.containers.EJBContainerTransactionManager.postInvokeTx(EJBContainerTransactionManager.java:507)
    at com.sun.ejb.containers.BaseContainer.postInvokeTx(BaseContainer.java:4566)
    ......... (and so on....)
    at com.me.MessageBeans.MessageReceiver.doStuff(MessageReceiver.java:86)
    at com.me.MessageBeans.MessageReceiver.onMessage(MessageReceiver.java:61)
    at sun.reflect.GeneratedMethodAccessor313.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.glassfish.ejb.security.application.EJBSecurityManager.runMethod(EJBSecurityManager.java:1081)
    at org.glassfish.ejb.security.application.EJBSecurityManager.invoke(EJBSecurityManager.java:1153)
    at com.sun.ejb.containers.BaseContainer.invokeBeanMethod(BaseContainer.java:4786)
    at com.sun.ejb.EjbInvocation.invokeBeanMethod(EjbInvocation.java:656)
    at com.sun.ejb.containers.interceptors.AroundInvokeChainImpl.invokeNext(InterceptorManager.java:822)
    at com.sun.ejb.EjbInvocation.proceed(EjbInvocation.java:608)
    at org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:55)
    at org.jboss.weld.ejb.SessionBeanInterceptor.aroundInvoke(SessionBeanInterceptor.java:52)
    at sun.reflect.GeneratedMethodAccessor310.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.sun.ejb.containers.interceptors.AroundInvokeInterceptor.intercept(InterceptorManager.java:883)
    ......... (and so on....)
Caused by: javax.validation.ConstraintViolationException: Bean Validation constraint(s) violated while executing Automatic Bean Validation on callback event:'prePersist'. Please refer to embedded ConstraintViolations for details.

 // I try to persist myObject with JPA into the database (that's what's not working right now.. but it's beside the point here
    ......... (and so on....)

Now my actual question
:) How can I stop my JMS Queue to redeliver messages (that cause any Exceptions or could not be delivered successfully for any other reason) ? (e.g catching the exceptions, setting some parameters when sending the message...)

Is there a parameter/option I have to set in GlassFish when setting up the DestinationResources or ConnectionFactories ?

And how could I clear the Queue manually, because right now it's full with "redeliver messages" and every time I deploy my application I get flooded with log messages within minutes.

Thank you all for your time and for reading :)


Answer:

Transaction management in MDBs is tricky.

What is happening is that a java.lang.RuntimeException is being thrown from your JPA Facade service method and this automatically marks the current transaction for rollback. Therefore it's already too late by the time it reaches the catch clause in your MDB. JMS will retry failed transactions.

Any java.lang.RuntimeException escaping from your service method will mark the MDB's transaction for rollback. You prevent this by catching it in the service method.

You may need to annotate the service method in your JPA Facade with @ TransactionAttribute(REQUIRES_NEW) in case JPA marks the Tx (not sure about that) or another EJB call from within the service method fails.

You can follow the instructions at To Purge Messages From a Physical Destination to clear your queue.