package durablesubscribers;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Message;
import javax.jms.TopicConnectionFactory;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
*
* @author Pierre
*/
public class DurableSubscriberFactory implements Runnable {
public static final int NUMBER_OF_THREADS = 50;
public static final int WAIT_BETWEEN_RECEIVE = 10000;
private String clientId = null;
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
DurableSubscriberFactory runnable = new DurableSubscriberFactory();
runnable.setClientId("PV_CLIENTID_" + i);
service.submit(runnable);
Thread.sleep(200);
}
}
public void run() {
TopicConnection topicConnection = null;
try {
System.out.println("starting");
// get the initial context, refer to your app server docs for this
Hashtable<String, String> ht = new Hashtable<String, String>();
ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
ht.put(Context.PROVIDER_URL, "t3://myserver:7011");
Context ctx = new InitialContext(ht);
System.out.println("looking up");
// get the connection factory, and open a connection
TopicConnectionFactory qcf = (TopicConnectionFactory) ctx.lookup("myConnectionFactory");
System.out.println("createTopicConnection");
topicConnection = qcf.createTopicConnection();
// must specify, otherwise you get an IllegalStateException
topicConnection.setClientID(getClientId());
// start connection to receive messages
topicConnection.start();
// create topic session off the connection
System.out.println("createTopicSession");
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// get handle on topic, create a durable subscriber and consume messages
System.out.println("lookup topic");
// you can only create a Durable Subscriber to a individual member, not to a Distributed Destination
Topic topic = (Topic) ctx.lookup("JMS_SERVER_01@TOPICNAME");
System.out.println("create durable subscriber");
TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(topic, "PVTEST");
while (true) {
System.out.println("receiving....");
Message message = topicSubscriber.receive();
System.out.println("received message " + message);
Thread.sleep(WAIT_BETWEEN_RECEIVE);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// close the queue connection
if (topicConnection != null) {
try {
topicConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* @return the clientId
*/
public String getClientId() {
return clientId;
}
/**
* @param clientId the clientId to set
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
}
It works really great.... but when you exit the application, the Durable Subscribers will still appear in the Consumers for the Topic.
If you want to remove them, do this:
package durablesubscribers;
import java.util.Hashtable;
import javax.jms.TopicConnectionFactory;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
*
* @author vernetto
*/
public class UnsubscribeAll {
public static final int NUMBER_OF_THREADS = 50;
private String clientId = null;
public static void main(String[] args) throws Exception {
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
UnsubscribeAll runnable = new UnsubscribeAll();
runnable.setClientId("PV_CLIENTID_" + i);
runnable.unsubscribe();
Thread.sleep(200);
}
}
/**
* @param args the command line arguments
*/
public void unsubscribe() {
TopicConnection topicConnection = null;
try {
System.out.println("starting");
// get the initial context, refer to your app server docs for this
Hashtable<String, String> ht = new Hashtable<String, String>();
ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
ht.put(Context.PROVIDER_URL, "t3://myserver:7011");
Context ctx = new InitialContext(ht);
System.out.println("looking up");
// get the connection factory, and open a connection
TopicConnectionFactory qcf = (TopicConnectionFactory) ctx.lookup("myConnectionFactory");
System.out.println("createTopicConnection");
topicConnection = qcf.createTopicConnection();
// must specify, otherwise you get an IllegalStateException
topicConnection.setClientID(getClientId());
// create topic session off the connection
System.out.println("createTopicSession");
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSession.unsubscribe("PVTEST");
} catch (Exception e) {
e.printStackTrace();
} finally {
// close the queue connection
if (topicConnection != null) {
try {
topicConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* @return the clientId
*/
public String getClientId() {
return clientId;
}
/**
* @param clientId the clientId to set
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
}
No comments:
Post a Comment