Skip to content

Commit

Permalink
Make PulsarConnectionFactory Serializable (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Jun 17, 2022
1 parent c580096 commit fcb209a
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 70 deletions.
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@
<configuration>
<tasks>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</tasks>
</configuration>
</execution>
Expand Down
4 changes: 2 additions & 2 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@
<configuration>
<tasks>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-filters-${project.version}.jar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-filters-${project.version}.jar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</tasks>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
package com.datastax.oss.pulsar.jms;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -57,7 +63,13 @@

@Slf4j
public class PulsarConnectionFactory
implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, AutoCloseable {
implements ConnectionFactory,
QueueConnectionFactory,
TopicConnectionFactory,
AutoCloseable,
Serializable {

private static final long serialVersionUID = 1231231L;

private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
private static final String SHADED_PREFIX = "com.datastax.oss.pulsar.jms.shaded.";
Expand All @@ -66,35 +78,38 @@ public class PulsarConnectionFactory

private static final Set<String> clientIdentifiers = new ConcurrentSkipListSet<>();

private final Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<>();
private final Set<PulsarConnection> connections = Collections.synchronizedSet(new HashSet<>());
private final List<Consumer<?>> consumers = new CopyOnWriteArrayList<>();
private final List<Reader<?>> readers = new CopyOnWriteArrayList<>();
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private Map<String, Object> producerConfiguration;
private Map<String, Object> consumerConfiguration;
private Schema<?> consumerSchema;
private String systemNamespace = "public/default";
private String defaultClientId = null;
private boolean enableTransaction = false;
private boolean emulateTransactions = false;
private boolean enableClientSideEmulation = false;
private boolean useServerSideFiltering = false;
private boolean forceDeleteTemporaryDestinations = false;
private boolean useExclusiveSubscriptionsForSimpleConsumers = false;
private boolean acknowledgeRejectedMessages = false;
private String tckUsername = "";
private String tckPassword = "";
private String queueSubscriptionName = "jms-queue";
private SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared;
private long waitForServerStartupTimeout = 60000;
private boolean usePulsarAdmin = true;
private int precreateQueueSubscriptionConsumerQueueSize = 0;
private boolean initialized;
private boolean closed;

private Map<String, Object> configuration;
// see resetDefaultValues for final fields
private final transient Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<>();
private final transient Set<PulsarConnection> connections =
Collections.synchronizedSet(new HashSet<>());
private final transient List<Consumer<?>> consumers = new CopyOnWriteArrayList<>();
private final transient List<Reader<?>> readers = new CopyOnWriteArrayList<>();

private transient PulsarClient pulsarClient;
private transient PulsarAdmin pulsarAdmin;
private transient Map<String, Object> producerConfiguration;
private transient Map<String, Object> consumerConfiguration;
private transient Schema<?> consumerSchema;
private transient String systemNamespace = "public/default";
private transient String defaultClientId = null;
private transient boolean enableTransaction = false;
private transient boolean emulateTransactions = false;
private transient boolean enableClientSideEmulation = false;
private transient boolean useServerSideFiltering = false;
private transient boolean forceDeleteTemporaryDestinations = false;
private transient boolean useExclusiveSubscriptionsForSimpleConsumers = false;
private transient boolean acknowledgeRejectedMessages = false;
private transient String tckUsername = "";
private transient String tckPassword = "";
private transient String queueSubscriptionName = "jms-queue";
private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared;
private transient long waitForServerStartupTimeout = 60000;
private transient boolean usePulsarAdmin = true;
private transient int precreateQueueSubscriptionConsumerQueueSize = 0;
private transient boolean initialized;
private transient boolean closed;

private transient Map<String, Object> configuration = Collections.emptyMap();

public PulsarConnectionFactory() throws JMSException {
this(new HashMap<>());
Expand Down Expand Up @@ -132,7 +147,7 @@ public void setJsonConfiguration(String json) {
}

public synchronized Map<String, Object> getConfiguration() {
return new HashMap<>(configuration);
return Utils.deepCopyMap(configuration);
}

public synchronized void setConfiguration(Map<String, Object> configuration) {
Expand Down Expand Up @@ -192,10 +207,11 @@ private synchronized void ensureInitialized() throws JMSException {
if (closed) {
throw new IllegalStateException("This ConnectionFactory is closed");
}
Map<String, Object> configurationCopy = Utils.deepCopyMap(this.configuration);
try {

Map<String, Object> producerConfiguration =
(Map<String, Object>) configuration.remove("producerConfig");
(Map<String, Object>) configurationCopy.remove("producerConfig");
if (producerConfiguration != null) {
Object batcherBuilder = producerConfiguration.get("batcherBuilder");
if (batcherBuilder != null) {
Expand Down Expand Up @@ -223,7 +239,7 @@ private synchronized void ensureInitialized() throws JMSException {

this.consumerSchema = Schema.BYTES;
Map<String, Object> consumerConfigurationM =
(Map<String, Object>) configuration.remove("consumerConfig");
(Map<String, Object>) configurationCopy.remove("consumerConfig");
if (consumerConfigurationM != null) {
this.consumerConfiguration = new HashMap(consumerConfigurationM);
boolean useSchema =
Expand All @@ -235,27 +251,27 @@ private synchronized void ensureInitialized() throws JMSException {
this.consumerConfiguration = Collections.emptyMap();
}
this.systemNamespace =
getAndRemoveString("jms.systemNamespace", "public/default", configuration);
getAndRemoveString("jms.systemNamespace", "public/default", configurationCopy);

this.tckUsername = getAndRemoveString("jms.tckUsername", "", configuration);
this.tckPassword = getAndRemoveString("jms.tckPassword", "", configuration);
this.tckUsername = getAndRemoveString("jms.tckUsername", "", configurationCopy);
this.tckPassword = getAndRemoveString("jms.tckPassword", "", configurationCopy);

this.defaultClientId = getAndRemoveString("jms.clientId", null, configuration);
this.defaultClientId = getAndRemoveString("jms.clientId", null, configurationCopy);

this.queueSubscriptionName =
getAndRemoveString("jms.queueSubscriptionName", "jms-queue", configuration);
getAndRemoveString("jms.queueSubscriptionName", "jms-queue", configurationCopy);

this.usePulsarAdmin =
Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configuration));
Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy));

this.precreateQueueSubscriptionConsumerQueueSize =
Integer.parseInt(
getAndRemoveString(
"jms.precreateQueueSubscriptionConsumerQueueSize", "0", configuration));
"jms.precreateQueueSubscriptionConsumerQueueSize", "0", configurationCopy));

final String rawTopicSharedSubscriptionType =
getAndRemoveString(
"jms.topicSharedSubscriptionType", SubscriptionType.Shared.name(), configuration);
"jms.topicSharedSubscriptionType", SubscriptionType.Shared.name(), configurationCopy);
this.topicSharedSubscriptionType =
Stream.of(SubscriptionType.values())
.filter(
Expand All @@ -278,23 +294,23 @@ private synchronized void ensureInitialized() throws JMSException {

this.waitForServerStartupTimeout =
Long.parseLong(
getAndRemoveString("jms.waitForServerStartupTimeout", "60000", configuration));
getAndRemoveString("jms.waitForServerStartupTimeout", "60000", configurationCopy));

this.enableClientSideEmulation =
Boolean.parseBoolean(
getAndRemoveString("jms.enableClientSideEmulation", "false", configuration));
getAndRemoveString("jms.enableClientSideEmulation", "false", configurationCopy));

this.useServerSideFiltering =
Boolean.parseBoolean(
getAndRemoveString("jms.useServerSideFiltering", "false", configuration));
getAndRemoveString("jms.useServerSideFiltering", "false", configurationCopy));

// in Exclusive mode Pulsar does not support delayed messages
// with this flag you force to not use Exclusive subscription and so to support
// delayed messages are well
this.useExclusiveSubscriptionsForSimpleConsumers =
Boolean.parseBoolean(
getAndRemoveString(
"jms.useExclusiveSubscriptionsForSimpleConsumers", "true", configuration));
"jms.useExclusiveSubscriptionsForSimpleConsumers", "true", configurationCopy));

// This flag is to force acknowledgement for messages that are rejected due to
// client side filtering in case of Shared subscription.
Expand All @@ -305,58 +321,61 @@ private synchronized void ensureInitialized() throws JMSException {
// it out, we acknowledge the message, this way it won't be consumed anymore.
this.acknowledgeRejectedMessages =
Boolean.parseBoolean(
getAndRemoveString("jms.acknowledgeRejectedMessages", "false", configuration));
getAndRemoveString("jms.acknowledgeRejectedMessages", "false", configurationCopy));
// default is false
this.forceDeleteTemporaryDestinations =
Boolean.parseBoolean(
getAndRemoveString("jms.forceDeleteTemporaryDestinations", "false", configuration));
getAndRemoveString(
"jms.forceDeleteTemporaryDestinations", "false", configurationCopy));

this.enableTransaction =
Boolean.parseBoolean(configuration.getOrDefault("enableTransaction", "false").toString());
Boolean.parseBoolean(
configurationCopy.getOrDefault("enableTransaction", "false").toString());

this.emulateTransactions =
Boolean.parseBoolean(
getAndRemoveString("jms.emulateTransactions", "false", configuration).toString());
getAndRemoveString("jms.emulateTransactions", "false", configurationCopy).toString());

if (emulateTransactions && enableTransaction) {
throw new IllegalStateException(
"You cannot set both enableTransaction and jms.emulateTransactions");
}

String webServiceUrl =
getAndRemoveString("webServiceUrl", "http://localhost:8080", configuration);
getAndRemoveString("webServiceUrl", "http://localhost:8080", configurationCopy);

String brokenServiceUrl = getAndRemoveString("brokerServiceUrl", "", configuration);
String brokenServiceUrl = getAndRemoveString("brokerServiceUrl", "", configurationCopy);

PulsarClient pulsarClient = null;
PulsarAdmin pulsarAdmin = null;
try {

// must be the same as
// https://pulsar.apache.org/docs/en/security-tls-keystore/#configuring-clients
String authPluginClassName = getAndRemoveString("authPlugin", "", configuration);
String authParamsString = getAndRemoveString("authParams", "", configuration);
String authPluginClassName = getAndRemoveString("authPlugin", "", configurationCopy);
String authParamsString = getAndRemoveString("authParams", "", configurationCopy);
Authentication authentication =
AuthenticationFactory.create(authPluginClassName, authParamsString);
if (log.isDebugEnabled()) {
log.debug("Authentication {}", authentication);
}
boolean tlsAllowInsecureConnection =
Boolean.parseBoolean(
getAndRemoveString("tlsAllowInsecureConnection", "false", configuration));
getAndRemoveString("tlsAllowInsecureConnection", "false", configurationCopy));

boolean tlsEnableHostnameVerification =
Boolean.parseBoolean(
getAndRemoveString("tlsEnableHostnameVerification", "false", configuration));
getAndRemoveString("tlsEnableHostnameVerification", "false", configurationCopy));
final String tlsTrustCertsFilePath =
(String) getAndRemoveString("tlsTrustCertsFilePath", "", configuration);
(String) getAndRemoveString("tlsTrustCertsFilePath", "", configurationCopy);

boolean useKeyStoreTls =
Boolean.parseBoolean(getAndRemoveString("useKeyStoreTls", "false", configuration));
String tlsTrustStoreType = getAndRemoveString("tlsTrustStoreType", "JKS", configuration);
String tlsTrustStorePath = getAndRemoveString("tlsTrustStorePath", "", configuration);
Boolean.parseBoolean(getAndRemoveString("useKeyStoreTls", "false", configurationCopy));
String tlsTrustStoreType =
getAndRemoveString("tlsTrustStoreType", "JKS", configurationCopy);
String tlsTrustStorePath = getAndRemoveString("tlsTrustStorePath", "", configurationCopy);
String tlsTrustStorePassword =
getAndRemoveString("tlsTrustStorePassword", "", configuration);
getAndRemoveString("tlsTrustStorePassword", "", configurationCopy);

pulsarAdmin =
PulsarAdmin.builder()
Expand All @@ -373,7 +392,7 @@ private synchronized void ensureInitialized() throws JMSException {

ClientBuilder clientBuilder =
PulsarClient.builder()
.loadConf(configuration)
.loadConf(configurationCopy)
.tlsTrustStorePassword(tlsTrustStorePassword)
.tlsTrustStorePath(tlsTrustStorePath)
.tlsTrustCertsFilePath(tlsTrustCertsFilePath)
Expand Down Expand Up @@ -1236,4 +1255,67 @@ public synchronized boolean isClosed() {
private synchronized int getPrecreateQueueSubscriptionConsumerQueueSize() {
return precreateQueueSubscriptionConsumerQueueSize;
}

private synchronized void writeObject(ObjectOutputStream out) throws IOException {
String serialisedConfiguration = new ObjectMapper().writeValueAsString(configuration);
if (log.isDebugEnabled()) {
log.debug("Serializing this PulsarConnectionFactory as {}", serialisedConfiguration);
}
out.writeUTF(serialisedConfiguration);
}

// this method must not be synchronizes, see RS_READOBJECT_SYNC
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
resetDefaultValues();
String readConfiguration = in.readUTF();
if (log.isDebugEnabled()) {
log.debug("Deserialize configuration as {}", configuration);
}
try {
setJsonConfiguration(readConfiguration);
} catch (Exception err) {
throw new IOException("Cannot decode JSON configuration " + configuration);
}
}

private void setFinalField(String name, Object value) {
try {
Field field = this.getClass().getDeclaredField(name);
boolean accessible = field.isAccessible();
if (!accessible) {
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
}
try {
field.set(this, value);
} finally {
if (!accessible) {
field.setAccessible(false);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() | Modifier.FINAL);
}
}
} catch (Exception err) {
log.error("Error while setting final field {}", name, err);
throw new RuntimeException(err);
}
}

private synchronized void resetDefaultValues() {
if (initialized) {
throw new java.lang.IllegalStateException();
}

// final fields
setFinalField("producers", new ConcurrentHashMap<>());
setFinalField("connections", Collections.synchronizedSet(new HashSet<>()));
setFinalField("consumers", new CopyOnWriteArrayList<>());
setFinalField("readers", new CopyOnWriteArrayList<>());

this.initialized = false;
this.closed = false;
}
}
Loading

0 comments on commit fcb209a

Please sign in to comment.