diff --git a/services/azure-servicebus/pom.xml b/services/azure-servicebus/pom.xml
index d65c0169aea6..e96b1890550a 100644
--- a/services/azure-servicebus/pom.xml
+++ b/services/azure-servicebus/pom.xml
@@ -112,6 +112,11 @@
bcprov-jdk16
test
+
+ io.reactivex
+ rxjava
+ 1.0.17
+
diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java
index 2b0c276442cd..5db61df2355d 100644
--- a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java
+++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java
@@ -50,6 +50,8 @@
public interface ServiceBusContract extends
JerseyFilterableService {
+ ServiceBusContractAsync async();
+
/**
* Sends a queue message.
*
diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContractAsync.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContractAsync.java
new file mode 100644
index 000000000000..0cc3006f8796
--- /dev/null
+++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContractAsync.java
@@ -0,0 +1,566 @@
+package com.microsoft.windowsazure.services.servicebus;
+
+import com.microsoft.windowsazure.core.pipeline.jersey.JerseyFilterableService;
+import com.microsoft.windowsazure.exception.ServiceException;
+import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
+import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult;
+import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult;
+import com.microsoft.windowsazure.services.servicebus.models.CreateSubscriptionResult;
+import com.microsoft.windowsazure.services.servicebus.models.CreateTopicResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetQueueResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetRuleResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetSubscriptionResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetTopicResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListQueuesOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListQueuesResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListRulesOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListRulesResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListTopicsOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListTopicsResult;
+import com.microsoft.windowsazure.services.servicebus.models.QueueInfo;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageResult;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveSubscriptionMessageResult;
+import com.microsoft.windowsazure.services.servicebus.models.RuleInfo;
+import com.microsoft.windowsazure.services.servicebus.models.SubscriptionInfo;
+import com.microsoft.windowsazure.services.servicebus.models.TopicInfo;
+import com.sun.jersey.api.client.ClientResponse;
+
+import rx.Observable;
+
+public interface ServiceBusContractAsync extends
+JerseyFilterableService {
+
+/**
+* Sends a queue message.
+*
+* @param queuePath
+* A String object that represents the name of the
+* queue to which the message will be sent.
+* @param message
+* A Message object that represents the message to
+* send.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable sendQueueMessage(String queuePath, BrokeredMessage message);
+
+/**
+* Receives a queue message.
+*
+* @param queuePath
+* A String object that represents the name of the
+* queue from which to receive the message.
+* @return A Observable<ReceiveQueueMessageResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable receiveQueueMessage(String queuePath);
+
+/**
+* Receives a queue message using the specified receive message options.
+*
+* @param queuePath
+* A String object that represents the name of the
+* queue from which to receive the message.
+* @param options
+* A ReceiveMessageOptions object that represents
+* the receive message options.
+* @return A Observable<ReceiveQueueMessageResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable receiveQueueMessage(String queuePath,
+ ReceiveMessageOptions options);
+
+/**
+* Sends a topic message.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic to which the message will be sent.
+* @param message
+* A Message object that represents the message to
+* send.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable sendTopicMessage(String topicPath, BrokeredMessage message);
+
+/**
+* Receives a subscription message.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic to receive.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription from the message will be received.
+* @return A Observable<ReceiveSubscriptionMessageResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable receiveSubscriptionMessage(
+ String topicPath, String subscriptionName);
+
+/**
+* Receives a subscription message using the specified receive message
+* options.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic to receive.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription from the message will be received.
+* @param options
+* A ReceiveMessageOptions object that represents
+* the receive message options.
+* @return A Observable<ReceiveSubscriptionMessageResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable receiveSubscriptionMessage(
+ String topicPath, String subscriptionName,
+ ReceiveMessageOptions options);
+
+/**
+* Unlocks a message.
+*
+* @param message
+* A Message object that represents the message to
+* unlock.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable unlockMessage(BrokeredMessage message);
+
+/**
+* Sends a message.
+*
+* @param path
+* A String object that represents the path to which
+* the message will be sent. This may be the value of a queuePath
+* or a topicPath.
+* @param message
+* A Message object that represents the message to
+* send.
+*
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable sendMessage(String path, BrokeredMessage message);
+
+/**
+* Receives a message.
+*
+* @param path
+* A String object that represents the path from
+* which a message will be received. This may either be the value
+* of queuePath or a combination of the topicPath +
+* "/subscriptions/" + subscriptionName.
+* @return A Observable<ReceiveMessageResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable receiveMessage(String path);
+
+/**
+* Receives a message using the specified receive message options.
+*
+* @param path
+* A String object that represents the path from
+* which a message will be received. This may either be the value
+* of queuePath or a combination of the topicPath +
+* "/subscriptions/" + subscriptionName.
+* @param options
+* A ReceiveMessageOptions object that represents
+* the receive message options.
+* @return A Observable<ReceiveMessageResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable receiveMessage(String path,
+ ReceiveMessageOptions options);
+
+/**
+* Deletes a message.
+*
+* @param message
+* A Message object that represents the message to
+* delete.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable deleteMessage(BrokeredMessage message);
+
+/**
+* Creates a queue.
+*
+* @param queueInfo
+* A QueueInfo object that represents the queue to
+* create.
+* @return A Observable<CreateQueueResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable createQueue(QueueInfo queueInfo);
+
+/**
+* Deletes a queue.
+*
+* @param queuePath
+* A String object that represents the name of the
+* queue to delete.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable deleteQueue(String queuePath);
+
+/**
+* Retrieves a queue.
+*
+* @param queuePath
+* A String object that represents the name of the
+* queue to retrieve.
+* @return A Observable<GetQueueResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable getQueue(String queuePath);
+
+/**
+* Returns a list of queues.
+*
+* @return A Observable<ListQueuesResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listQueues();
+
+/**
+* Returns a list of queues.
+*
+* @param options
+* A ListQueueOptions object that represents the
+* options to list the queue.
+* @return A Observable<ListQueuesResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listQueues(ListQueuesOptions options);
+
+/**
+* Updates the information of a queue.
+*
+* @param queueInfo
+* The information of a queue to be updated.
+*
+* @return A Observable<QueueInfo> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable updateQueue(QueueInfo queueInfo);
+
+/**
+* Creates a topic.
+*
+* @param topic
+* A Topic object that represents the topic to
+* create.
+* @return A Observable<CreateTopicResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable createTopic(TopicInfo topic);
+
+/**
+* Deletes a topic.
+*
+* @param topicPath
+* A String object that represents the name of the
+* queue to delete.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable deleteTopic(String topicPath);
+
+/**
+* Retrieves a topic.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic to retrieve.
+* @return A Observable<GetTopicResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable getTopic(String topicPath);
+
+/**
+* Returns a list of topics.
+*
+* @return A Observable<ListTopicsResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listTopics();
+
+/**
+* Returns a list of topics.
+*
+* @param options
+* A ListTopicsOptions object that represents the
+* options to list the topic.
+* @return A Observable<ListTopicsResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listTopics(ListTopicsOptions options);
+
+/**
+* Updates a topic.
+*
+* @param topicInfo
+* A TopicInfo object that represents the topic to
+* be updated.
+*
+* @return A Observable<TopicInfo> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable updateTopic(TopicInfo topicInfo);
+
+/**
+* Creates a subscription.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscription
+* A Subscription object that represents the
+* subscription to create.
+* @return A Observable<CreateSubscriptionResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable createSubscription(String topicPath,
+ SubscriptionInfo subscription);
+
+/**
+* Deletes a subscription.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription to delete.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable deleteSubscription(String topicPath, String subscriptionName);
+
+/**
+* Retrieves a subscription.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription to retrieve.
+* @return A Observable<GetSubscriptionResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable getSubscription(String topicPath,
+ String subscriptionName);
+
+/**
+* Returns a list of subscriptions.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscriptions to retrieve.
+* @return A Observable<ListSubscriptionsResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listSubscriptions(String topicPath);
+
+/**
+* Returns a list of subscriptions.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscriptions to retrieve.
+*
+* @param options
+* A ListSubscriptionsOptions object that represents
+* the options to list subscriptions.
+*
+* @return A Observable<ListSubscriptionsResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listSubscriptions(String topicPath,
+ ListSubscriptionsOptions options);
+
+/**
+* Updates a subscription.
+*
+* @param topicName
+* A String option which represents the name of the
+* topic.
+* @param subscriptionInfo
+* A SubscriptionInfo option which represents the
+* information of the subscription.
+* @return A Observable<SubscriptionInfo> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable updateSubscription(String topicName,
+ SubscriptionInfo subscriptionInfo);
+
+/**
+* Creates a rule.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription for which the rule will be created.
+* @param rule
+* A Rule object that represents the rule to create.
+* @return A Observable<CreateRuleResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable createRule(String topicPath, String subscriptionName,
+ RuleInfo rule);
+
+/**
+* Deletes a rule.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription for which the rule will be deleted.
+* @param ruleName
+* A String object that represents the name of the
+* rule to delete.
+* @return A Observable<Void> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable deleteRule(String topicPath, String subscriptionName, String ruleName);
+
+/**
+* Retrieves a rule.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription for which the rule will be retrieved.
+* @param ruleName
+* A String object that represents the name of the
+* rule to retrieve.
+* @return A Observable<GetRuleResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable getRule(String topicPath, String subscriptionName,
+ String ruleName);
+
+/**
+* Returns a list of rules.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription whose rules are being retrieved.
+* @return A Observable<ListRulesResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listRules(String topicPath, String subscriptionName);
+
+/**
+* Returns a list of rules.
+*
+* @param topicPath
+* A String object that represents the name of the
+* topic for the subscription.
+* @param subscriptionName
+* A String object that represents the name of the
+* subscription whose rules are being retrieved.
+* @param options
+* A ListRulesOptions object that represents the
+* options to retrieve rules.
+* @return A Observable<ListRulesResult> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable listRules(String topicPath, String subscriptionName,
+ ListRulesOptions options);
+
+/**
+* Renew queue lock.
+*
+* @param queueName
+* A String object that represents the name of the
+* queue.
+* @param messageId
+* A String object that represents the ID of the
+* message.
+* @param lockToken
+* A String object that represents the token of the
+* lock.
+* @return A Observable<ClientResponse> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable renewQueueLock(String queueName, String messageId, String lockToken)
+ throws ServiceException;
+
+/**
+* Renew subscription lock.
+*
+* @param topicName
+* A String object that represents the name of the
+* topic.
+* @param queueName
+* A String object that represents the name of the
+* queue.
+* @param messageId
+* A String object that represents the ID of the
+* message.
+* @param lockToken
+* A String object that represents the token of the
+* lock.
+* @return A Observable<ClientResponse> object that represents
+* the result.
+* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it.
+*/
+Observable renewSubscriptionLock(String topicName, String subscriptionName,
+ String messageId, String lockToken) throws ServiceException;
+}
diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java
index 2b6f15f4adbb..f806f9a67500 100644
--- a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java
+++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java
@@ -26,6 +26,7 @@
import org.apache.commons.logging.LogFactory;
import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
+import com.microsoft.windowsazure.services.servicebus.ServiceBusContractAsync;
import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult;
import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult;
@@ -68,6 +69,11 @@ public ServiceBusExceptionProcessor(ServiceBusRestProxy next) {
this.next = next;
}
+ @Override
+ public ServiceBusContractAsync async() {
+ return this.next.async();
+ }
+
@Override
public ServiceBusContract withFilter(ServiceFilter filter) {
return new ServiceBusExceptionProcessor(next.withFilter(filter));
diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java
index 6f7d4a61bdca..587c39bf4ff1 100644
--- a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java
+++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java
@@ -14,26 +14,15 @@
*/
package com.microsoft.windowsazure.services.servicebus.implementation;
+import javax.inject.Inject;
+
import com.microsoft.windowsazure.core.UserAgentFilter;
-import com.microsoft.windowsazure.core.pipeline.PipelineHelpers;
import com.microsoft.windowsazure.core.pipeline.filter.ServiceRequestFilter;
import com.microsoft.windowsazure.core.pipeline.filter.ServiceResponseFilter;
-import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterAdapter;
-import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterRequestAdapter;
-import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterResponseAdapter;
import com.microsoft.windowsazure.core.pipeline.jersey.ServiceFilter;
import com.microsoft.windowsazure.exception.ServiceException;
-import java.io.InputStream;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-
-import javax.inject.Inject;
-import javax.ws.rs.core.MediaType;
-
import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
-import com.microsoft.windowsazure.services.servicebus.models.AbstractListOptions;
+import com.microsoft.windowsazure.services.servicebus.ServiceBusContractAsync;
import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult;
import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult;
@@ -60,268 +49,116 @@
import com.microsoft.windowsazure.services.servicebus.models.SubscriptionInfo;
import com.microsoft.windowsazure.services.servicebus.models.TopicInfo;
import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.filter.ClientFilter;
-public class ServiceBusRestProxy implements ServiceBusContract {
+import rx.Observable;
- private Client channel;
- private final String uri;
- private final BrokerPropertiesMapper mapper;
- private final CustomPropertiesMapper customPropertiesMapper;
+public class ServiceBusRestProxy implements ServiceBusContract {
- private ClientFilter[] filters;
+ private ServiceBusRestProxyAsync asyncProxy;
@Inject
- public ServiceBusRestProxy(Client channel, WrapFilter authFilter,
- SasFilter sasAuthFilter,
- UserAgentFilter userAgentFilter,
- ServiceBusConnectionSettings connectionSettings,
+ public ServiceBusRestProxy(Client channel, WrapFilter authFilter, SasFilter sasAuthFilter,
+ UserAgentFilter userAgentFilter, ServiceBusConnectionSettings connectionSettings,
BrokerPropertiesMapper mapper) {
- this.channel = channel;
- this.filters = new ClientFilter[0];
- this.uri = connectionSettings.getUri();
- this.mapper = mapper;
- this.customPropertiesMapper = new CustomPropertiesMapper();
- if (connectionSettings.isSasAuthentication()) {
- channel.addFilter(sasAuthFilter);
- } else {
- channel.addFilter(authFilter);
- }
- channel.addFilter(new ClientFilterRequestAdapter(userAgentFilter));
+ asyncProxy = new ServiceBusRestProxyAsync(channel, authFilter, sasAuthFilter, userAgentFilter,
+ connectionSettings, mapper);
+ }
+
+ public ServiceBusRestProxy(Client channel, ClientFilter[] filters, String uri, BrokerPropertiesMapper mapper) {
+
+ asyncProxy = new ServiceBusRestProxyAsync(channel, filters, uri, mapper);
+ }
+
+ ServiceBusRestProxy(ServiceBusRestProxyAsync async) {
+ this.asyncProxy = async;
}
- public ServiceBusRestProxy(Client channel, ClientFilter[] filters,
- String uri, BrokerPropertiesMapper mapper) {
- this.channel = channel;
- this.filters = filters;
- this.uri = uri;
- this.mapper = mapper;
- this.customPropertiesMapper = new CustomPropertiesMapper();
+ @Override
+ public ServiceBusContractAsync async() {
+ return asyncProxy;
}
@Override
public ServiceBusContract withFilter(ServiceFilter filter) {
- ClientFilter[] newFilters = Arrays.copyOf(filters, filters.length + 1);
- newFilters[filters.length] = new ClientFilterAdapter(filter);
- return new ServiceBusRestProxy(channel, newFilters, uri, mapper);
+ return this.async().withFilter(filter);
}
@Override
- public ServiceBusContract withRequestFilterFirst(
- ServiceRequestFilter serviceRequestFilter) {
- ClientFilter[] currentFilters = filters;
- ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1];
- System.arraycopy(currentFilters, 0, newFilters, 1,
- currentFilters.length);
- newFilters[0] = new ClientFilterRequestAdapter(serviceRequestFilter);
- return new ServiceBusRestProxy(channel, newFilters, uri, mapper);
+ public ServiceBusContract withRequestFilterFirst(ServiceRequestFilter serviceRequestFilter) {
+ return this.async().withRequestFilterFirst(serviceRequestFilter);
}
@Override
- public ServiceBusContract withRequestFilterLast(
- ServiceRequestFilter serviceRequestFilter) {
- ClientFilter[] currentFilters = filters;
- ClientFilter[] newFilters = Arrays.copyOf(currentFilters,
- currentFilters.length + 1);
- newFilters[currentFilters.length] = new ClientFilterRequestAdapter(
- serviceRequestFilter);
- return new ServiceBusRestProxy(channel, newFilters, uri, mapper);
+ public ServiceBusContract withRequestFilterLast(ServiceRequestFilter serviceRequestFilter) {
+ return this.async().withRequestFilterLast(serviceRequestFilter);
}
@Override
- public ServiceBusContract withResponseFilterFirst(
- ServiceResponseFilter serviceResponseFilter) {
- ClientFilter[] currentFilters = filters;
- ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1];
- System.arraycopy(currentFilters, 0, newFilters, 1,
- currentFilters.length);
- newFilters[0] = new ClientFilterResponseAdapter(serviceResponseFilter);
- return new ServiceBusRestProxy(channel, newFilters, uri, mapper);
+ public ServiceBusContract withResponseFilterFirst(ServiceResponseFilter serviceResponseFilter) {
+ return this.async().withResponseFilterFirst(serviceResponseFilter);
}
@Override
- public ServiceBusContract withResponseFilterLast(
- ServiceResponseFilter serviceResponseFilter) {
- ClientFilter[] currentFilters = filters;
- ClientFilter[] newFilters = Arrays.copyOf(currentFilters,
- currentFilters.length + 1);
- newFilters[currentFilters.length] = new ClientFilterResponseAdapter(
- serviceResponseFilter);
- return new ServiceBusRestProxy(channel, newFilters, uri, mapper);
+ public ServiceBusContract withResponseFilterLast(ServiceResponseFilter serviceResponseFilter) {
+ return this.async().withResponseFilterLast(serviceResponseFilter);
}
public Client getChannel() {
- return channel;
+ return this.asyncProxy.getChannel();
}
public void setChannel(Client channel) {
- this.channel = channel;
- }
-
- private WebResource getResource() {
- WebResource resource = getChannel().resource(uri).queryParam(
- "api-version", "2013-07");
- for (ClientFilter filter : filters) {
- resource.addFilter(filter);
- }
- return resource;
+ this.asyncProxy.setChannel(channel);
}
@Override
- public void sendMessage(String path, BrokeredMessage message) {
- Builder request = getResource().path(path).path("messages")
- .getRequestBuilder();
-
- if (message.getContentType() != null) {
- request = request.type(message.getContentType());
- }
-
- if (message.getBrokerProperties() != null) {
- request = request.header("BrokerProperties",
- mapper.toString(message.getBrokerProperties()));
- }
-
- for (java.util.Map.Entry entry : message
- .getProperties().entrySet()) {
- request.header(entry.getKey(),
- customPropertiesMapper.toString(entry.getValue()));
- }
-
- request.post(message.getBody());
+ public void sendMessage(String path, BrokeredMessage message) throws ServiceException {
+ resolve(this.async().sendMessage(path, message));
}
@Override
- public void sendQueueMessage(String path, BrokeredMessage message)
- throws ServiceException {
+ public void sendQueueMessage(String path, BrokeredMessage message) throws ServiceException {
sendMessage(path, message);
}
@Override
- public ReceiveQueueMessageResult receiveQueueMessage(String queueName)
- throws ServiceException {
+ public ReceiveQueueMessageResult receiveQueueMessage(String queueName) throws ServiceException {
return receiveQueueMessage(queueName, ReceiveMessageOptions.DEFAULT);
}
@Override
- public ReceiveQueueMessageResult receiveQueueMessage(String queuePath,
- ReceiveMessageOptions options) throws ServiceException {
-
- WebResource resource = getResource().path(queuePath).path("messages")
- .path("head");
-
- BrokeredMessage message = receiveMessage(options, resource);
- return new ReceiveQueueMessageResult(message);
+ public ReceiveQueueMessageResult receiveQueueMessage(String queuePath, ReceiveMessageOptions options)
+ throws ServiceException {
+ return resolve(this.async().receiveQueueMessage(queuePath, options));
}
@Override
- public ReceiveMessageResult receiveMessage(String path)
- throws ServiceException {
+ public ReceiveMessageResult receiveMessage(String path) throws ServiceException {
return receiveMessage(path, ReceiveMessageOptions.DEFAULT);
}
@Override
- public ReceiveMessageResult receiveMessage(String path,
- ReceiveMessageOptions options) throws ServiceException {
+ public ReceiveMessageResult receiveMessage(String path, ReceiveMessageOptions options) throws ServiceException {
+ return resolve(this.async().receiveMessage(path, options));
+ }
- WebResource resource = getResource().path(path).path("messages")
- .path("head");
-
- BrokeredMessage message = receiveMessage(options, resource);
- return new ReceiveMessageResult(message);
- }
-
- private BrokeredMessage receiveMessage(ReceiveMessageOptions options,
- WebResource resource) {
- if (options.getTimeout() != null) {
- resource = resource.queryParam("timeout",
- Integer.toString(options.getTimeout()));
- }
-
- ClientResponse clientResult;
- if (options.isReceiveAndDelete()) {
- clientResult = resource.delete(ClientResponse.class);
- } else if (options.isPeekLock()) {
- clientResult = resource.post(ClientResponse.class, "");
- } else {
- throw new RuntimeException("Unknown ReceiveMode");
- }
-
- if (clientResult.getStatus() == 204) {
- return null;
- }
-
- BrokerProperties brokerProperties;
- if (clientResult.getHeaders().containsKey("BrokerProperties")) {
- brokerProperties = mapper.fromString(clientResult.getHeaders()
- .getFirst("BrokerProperties"));
- } else {
- brokerProperties = new BrokerProperties();
- }
-
- String location = clientResult.getHeaders().getFirst("Location");
- if (location != null) {
- brokerProperties.setLockLocation(location);
- }
-
- BrokeredMessage message = new BrokeredMessage(brokerProperties);
-
- MediaType contentType = clientResult.getType();
- if (contentType != null) {
- message.setContentType(contentType.toString());
- }
-
- Date date = clientResult.getResponseDate();
- if (date != null) {
- message.setDate(date);
- }
-
- InputStream body = clientResult.getEntityInputStream();
- if (body != null) {
- message.setBody(body);
- }
-
- for (String key : clientResult.getHeaders().keySet()) {
- Object value = clientResult.getHeaders().getFirst(key);
- try {
- value = customPropertiesMapper.fromString(value.toString());
- message.setProperty(key, value);
- } catch (ParseException e) {
- // log.warn("Unable to parse custom header", e);
- } catch (NumberFormatException e) {
- // log.warn("Unable to parse custom header", e);
- }
- }
-
- return message;
- }
-
- @Override
- public void sendTopicMessage(String topicName, BrokeredMessage message)
- throws ServiceException {
+ @Override
+ public void sendTopicMessage(String topicName, BrokeredMessage message) throws ServiceException {
sendMessage(topicName, message);
}
@Override
- public ReceiveSubscriptionMessageResult receiveSubscriptionMessage(
- String topicName, String subscriptionName) throws ServiceException {
- return receiveSubscriptionMessage(topicName, subscriptionName,
- ReceiveMessageOptions.DEFAULT);
+ public ReceiveSubscriptionMessageResult receiveSubscriptionMessage(String topicName, String subscriptionName)
+ throws ServiceException {
+ return receiveSubscriptionMessage(topicName, subscriptionName, ReceiveMessageOptions.DEFAULT);
}
@Override
- public ReceiveSubscriptionMessageResult receiveSubscriptionMessage(
- String topicName, String subscriptionName,
+ public ReceiveSubscriptionMessageResult receiveSubscriptionMessage(String topicName, String subscriptionName,
ReceiveMessageOptions options) throws ServiceException {
- WebResource resource = getResource().path(topicName)
- .path("subscriptions").path(subscriptionName).path("messages")
- .path("head");
-
- BrokeredMessage message = receiveMessage(options, resource);
- return new ReceiveSubscriptionMessageResult(message);
+ return resolve(this.async().receiveSubscriptionMessage(topicName, subscriptionName, options));
}
@Override
@@ -335,210 +172,99 @@ public void deleteMessage(BrokeredMessage message) throws ServiceException {
}
@Override
- public CreateQueueResult createQueue(QueueInfo queueInfo)
- throws ServiceException {
- Builder webResourceBuilder = getResource().path(queueInfo.getPath())
- .type("application/atom+xml;type=entry;charset=utf-8");
- if ((queueInfo.getForwardTo() != null)
- && !queueInfo.getForwardTo().isEmpty()) {
- webResourceBuilder.header("ServiceBusSupplementaryAuthorization",
- queueInfo.getForwardTo());
- }
- return new CreateQueueResult(webResourceBuilder.put(QueueInfo.class,
- queueInfo));
+ public CreateQueueResult createQueue(QueueInfo queueInfo) throws ServiceException {
+ return resolve(this.async().createQueue(queueInfo));
}
@Override
public void deleteQueue(String queuePath) throws ServiceException {
- getResource().path(queuePath).delete();
+ resolve(this.async().deleteQueue(queuePath));
}
@Override
public GetQueueResult getQueue(String queuePath) throws ServiceException {
- return new GetQueueResult(getResource().path(queuePath).get(
- QueueInfo.class));
+ return resolve(this.async().getQueue(queuePath));
}
@Override
- public ListQueuesResult listQueues(ListQueuesOptions options)
- throws ServiceException {
- Feed feed = listOptions(options,
- getResource().path("$Resources/Queues")).get(Feed.class);
- ArrayList queues = new ArrayList();
- for (Entry entry : feed.getEntries()) {
- queues.add(new QueueInfo(entry));
- }
- ListQueuesResult result = new ListQueuesResult();
- result.setItems(queues);
- return result;
+ public ListQueuesResult listQueues(ListQueuesOptions options) throws ServiceException {
+ return resolve(this.async().listQueues(options));
}
@Override
public QueueInfo updateQueue(QueueInfo queueInfo) throws ServiceException {
- Builder webResourceBuilder = getResource().path(queueInfo.getPath())
- .type("application/atom+xml;type=entry;charset=utf-8")
- .header("If-Match", "*");
- if ((queueInfo.getForwardTo() != null)
- && !queueInfo.getForwardTo().isEmpty()) {
- webResourceBuilder.header("ServiceBusSupplementaryAuthorization",
- queueInfo.getForwardTo());
- }
- return webResourceBuilder.put(QueueInfo.class, queueInfo);
- }
-
- private WebResource listOptions(AbstractListOptions> options,
- WebResource path) {
- if (options.getTop() != null) {
- path = path.queryParam("$top", options.getTop().toString());
- }
- if (options.getSkip() != null) {
- path = path.queryParam("$skip", options.getSkip().toString());
- }
- if (options.getFilter() != null) {
- path = path.queryParam("$filter", options.getFilter());
- }
- return path;
- }
-
- @Override
- public CreateTopicResult createTopic(TopicInfo entry)
- throws ServiceException {
- return new CreateTopicResult(getResource().path(entry.getPath())
- .type("application/atom+xml;type=entry;charset=utf-8")
- .put(TopicInfo.class, entry));
+ return resolve(this.async().updateQueue(queueInfo));
+ }
+
+ @Override
+ public CreateTopicResult createTopic(TopicInfo entry) throws ServiceException {
+ return resolve(this.async().createTopic(entry));
}
@Override
public void deleteTopic(String topicPath) throws ServiceException {
- getResource().path(topicPath).delete();
+ resolve(this.async().deleteTopic(topicPath));
}
@Override
public GetTopicResult getTopic(String topicPath) throws ServiceException {
- return new GetTopicResult(getResource().path(topicPath).get(
- TopicInfo.class));
+ return resolve(this.async().getTopic(topicPath));
}
@Override
- public ListTopicsResult listTopics(ListTopicsOptions options)
- throws ServiceException {
- Feed feed = listOptions(options,
- getResource().path("$Resources/Topics")).get(Feed.class);
- ArrayList topics = new ArrayList();
- for (Entry entry : feed.getEntries()) {
- topics.add(new TopicInfo(entry));
- }
- ListTopicsResult result = new ListTopicsResult();
- result.setItems(topics);
- return result;
+ public ListTopicsResult listTopics(ListTopicsOptions options) throws ServiceException {
+ return resolve(this.async().listTopics(options));
}
@Override
public TopicInfo updateTopic(TopicInfo topicInfo) throws ServiceException {
- return getResource().path(topicInfo.getPath())
- .type("application/atom+xml;type=entry;charset=utf-8")
- .header("If-Match", "*").put(TopicInfo.class, topicInfo);
+ return resolve(this.async().updateTopic(topicInfo));
}
@Override
- public CreateSubscriptionResult createSubscription(String topicPath,
- SubscriptionInfo subscriptionInfo) {
- Builder webResourceBuilder = getResource().path(topicPath)
- .path("subscriptions").path(subscriptionInfo.getName())
- .type("application/atom+xml;type=entry;charset=utf-8");
- if ((subscriptionInfo.getForwardTo() != null)
- && (!subscriptionInfo.getForwardTo().isEmpty())) {
- webResourceBuilder.header("ServiceBusSupplementaryAuthorization",
- subscriptionInfo.getForwardTo());
-
- }
- return new CreateSubscriptionResult(webResourceBuilder.put(
- SubscriptionInfo.class, subscriptionInfo));
+ public CreateSubscriptionResult createSubscription(String topicPath, SubscriptionInfo subscriptionInfo) {
+ return resolve(this.async().createSubscription(topicPath, subscriptionInfo));
}
@Override
public void deleteSubscription(String topicPath, String subscriptionName) {
- getResource().path(topicPath).path("subscriptions")
- .path(subscriptionName).delete();
+ resolve(this.async().deleteSubscription(topicPath, subscriptionName));
}
@Override
- public GetSubscriptionResult getSubscription(String topicPath,
- String subscriptionName) {
- return new GetSubscriptionResult(getResource().path(topicPath)
- .path("subscriptions").path(subscriptionName)
- .get(SubscriptionInfo.class));
+ public GetSubscriptionResult getSubscription(String topicPath, String subscriptionName) {
+ return resolve(this.async().getSubscription(topicPath, subscriptionName));
}
@Override
- public ListSubscriptionsResult listSubscriptions(String topicPath,
- ListSubscriptionsOptions options) {
- Feed feed = listOptions(options,
- getResource().path(topicPath).path("subscriptions")).get(
- Feed.class);
- ArrayList list = new ArrayList();
- for (Entry entry : feed.getEntries()) {
- list.add(new SubscriptionInfo(entry));
- }
- ListSubscriptionsResult result = new ListSubscriptionsResult();
- result.setItems(list);
- return result;
+ public ListSubscriptionsResult listSubscriptions(String topicPath, ListSubscriptionsOptions options) {
+ return resolve(this.async().listSubscriptions(topicPath, options));
}
@Override
- public SubscriptionInfo updateSubscription(String topicName,
- SubscriptionInfo subscriptionInfo) throws ServiceException {
- Builder webResourceBuilder = getResource().path(topicName)
- .path("subscriptions").path(subscriptionInfo.getName())
- .type("application/atom+xml;type=entry;charset=utf-8")
- .header("If-Match", "*");
- if ((subscriptionInfo.getForwardTo() != null)
- && !subscriptionInfo.getForwardTo().isEmpty()) {
- webResourceBuilder.header("ServiceBusSupplementaryAuthorization",
- subscriptionInfo.getForwardTo());
- }
- return webResourceBuilder.put(SubscriptionInfo.class, subscriptionInfo);
+ public SubscriptionInfo updateSubscription(String topicName, SubscriptionInfo subscriptionInfo)
+ throws ServiceException {
+ return resolve(this.async().updateSubscription(topicName, subscriptionInfo));
}
@Override
- public CreateRuleResult createRule(String topicPath,
- String subscriptionName, RuleInfo rule) {
- return new CreateRuleResult(getResource().path(topicPath)
- .path("subscriptions").path(subscriptionName).path("rules")
- .path(rule.getName())
- .type("application/atom+xml;type=entry;charset=utf-8")
- .put(RuleInfo.class, rule));
+ public CreateRuleResult createRule(String topicPath, String subscriptionName, RuleInfo rule) {
+ return resolve(this.async().createRule(topicPath, subscriptionName, rule));
}
@Override
- public void deleteRule(String topicPath, String subscriptionName,
- String ruleName) {
- getResource().path(topicPath).path("subscriptions")
- .path(subscriptionName).path("rules").path(ruleName).delete();
+ public void deleteRule(String topicPath, String subscriptionName, String ruleName) {
+ resolve(this.async().deleteRule(topicPath, subscriptionName, ruleName));
}
@Override
- public GetRuleResult getRule(String topicPath, String subscriptionName,
- String ruleName) {
- return new GetRuleResult(getResource().path(topicPath)
- .path("subscriptions").path(subscriptionName).path("rules")
- .path(ruleName).get(RuleInfo.class));
+ public GetRuleResult getRule(String topicPath, String subscriptionName, String ruleName) {
+ return resolve(this.async().getRule(topicPath, subscriptionName, ruleName));
}
@Override
- public ListRulesResult listRules(String topicPath, String subscriptionName,
- ListRulesOptions options) {
- Feed feed = listOptions(
- options,
- getResource().path(topicPath).path("subscriptions")
- .path(subscriptionName).path("rules")).get(Feed.class);
- ArrayList list = new ArrayList();
- for (Entry entry : feed.getEntries()) {
- list.add(new RuleInfo(entry));
- }
- ListRulesResult result = new ListRulesResult();
- result.setItems(list);
- return result;
+ public ListRulesResult listRules(String topicPath, String subscriptionName, ListRulesOptions options) {
+ return resolve(this.async().listRules(topicPath, subscriptionName, options));
}
@Override
@@ -552,34 +278,28 @@ public ListTopicsResult listTopics() throws ServiceException {
}
@Override
- public ListSubscriptionsResult listSubscriptions(String topicName)
- throws ServiceException {
+ public ListSubscriptionsResult listSubscriptions(String topicName) throws ServiceException {
return listSubscriptions(topicName, ListSubscriptionsOptions.DEFAULT);
}
@Override
- public ListRulesResult listRules(String topicName, String subscriptionName)
- throws ServiceException {
+ public ListRulesResult listRules(String topicName, String subscriptionName) throws ServiceException {
return listRules(topicName, subscriptionName, ListRulesOptions.DEFAULT);
}
@Override
- public void renewQueueLock(String queueName, String messageId,
- String lockToken) throws ServiceException {
- ClientResponse clientResponse = getResource().path(queueName)
- .path("messages").path(messageId).path(lockToken)
- .post(ClientResponse.class, "");
- PipelineHelpers.throwIfNotSuccess(clientResponse);
+ public void renewQueueLock(String queueName, String messageId, String lockToken) throws ServiceException {
+ resolve(this.async().renewQueueLock(queueName, messageId, lockToken));
}
@Override
- public void renewSubscriptionLock(String topicName,
- String subscriptionName, String messageId, String lockToken)
+ public void renewSubscriptionLock(String topicName, String subscriptionName, String messageId, String lockToken)
throws ServiceException {
- ClientResponse clientResponse = getResource().path(topicName)
- .path("Subscriptions").path(subscriptionName).path("messages")
- .path(messageId).path(lockToken).post(ClientResponse.class, "");
- PipelineHelpers.throwIfNotSuccess(clientResponse);
+ resolve(this.async().renewSubscriptionLock(topicName, subscriptionName, messageId, lockToken));
+ }
+
+ private T resolve(Observable o) {
+ return o.toBlocking().single();
}
}
diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxyAsync.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxyAsync.java
new file mode 100644
index 000000000000..cd50e664783b
--- /dev/null
+++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxyAsync.java
@@ -0,0 +1,643 @@
+package com.microsoft.windowsazure.services.servicebus.implementation;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.Future;
+
+import javax.inject.Inject;
+import javax.ws.rs.core.MediaType;
+
+import com.microsoft.windowsazure.core.UserAgentFilter;
+import com.microsoft.windowsazure.core.pipeline.PipelineHelpers;
+import com.microsoft.windowsazure.core.pipeline.filter.ServiceRequestFilter;
+import com.microsoft.windowsazure.core.pipeline.filter.ServiceResponseFilter;
+import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterAdapter;
+import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterRequestAdapter;
+import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterResponseAdapter;
+import com.microsoft.windowsazure.core.pipeline.jersey.ServiceFilter;
+import com.microsoft.windowsazure.exception.ServiceException;
+import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
+import com.microsoft.windowsazure.services.servicebus.ServiceBusContractAsync;
+import com.microsoft.windowsazure.services.servicebus.models.AbstractListOptions;
+import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
+import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult;
+import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult;
+import com.microsoft.windowsazure.services.servicebus.models.CreateSubscriptionResult;
+import com.microsoft.windowsazure.services.servicebus.models.CreateTopicResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetQueueResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetRuleResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetSubscriptionResult;
+import com.microsoft.windowsazure.services.servicebus.models.GetTopicResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListQueuesOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListQueuesResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListRulesOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListRulesResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsResult;
+import com.microsoft.windowsazure.services.servicebus.models.ListTopicsOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ListTopicsResult;
+import com.microsoft.windowsazure.services.servicebus.models.QueueInfo;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageResult;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;
+import com.microsoft.windowsazure.services.servicebus.models.ReceiveSubscriptionMessageResult;
+import com.microsoft.windowsazure.services.servicebus.models.RuleInfo;
+import com.microsoft.windowsazure.services.servicebus.models.SubscriptionInfo;
+import com.microsoft.windowsazure.services.servicebus.models.TopicInfo;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.AsyncWebResource;
+import com.sun.jersey.api.client.AsyncWebResource.Builder;
+import com.sun.jersey.api.client.filter.ClientFilter;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+public class ServiceBusRestProxyAsync implements ServiceBusContractAsync {
+
+ private Client channel;
+ private final String uri;
+ private final BrokerPropertiesMapper mapper;
+ private final CustomPropertiesMapper customPropertiesMapper;
+
+ private ClientFilter[] filters;
+
+ @Inject
+ public ServiceBusRestProxyAsync(Client channel, WrapFilter authFilter, SasFilter sasAuthFilter,
+ UserAgentFilter userAgentFilter, ServiceBusConnectionSettings connectionSettings,
+ BrokerPropertiesMapper mapper) {
+
+ this.channel = channel;
+ this.filters = new ClientFilter[0];
+ this.uri = connectionSettings.getUri();
+ this.mapper = mapper;
+ this.customPropertiesMapper = new CustomPropertiesMapper();
+ if (connectionSettings.isSasAuthentication()) {
+ channel.addFilter(sasAuthFilter);
+ } else {
+ channel.addFilter(authFilter);
+ }
+ channel.addFilter(new ClientFilterRequestAdapter(userAgentFilter));
+ }
+
+ public ServiceBusRestProxyAsync(Client channel, ClientFilter[] filters, String uri, BrokerPropertiesMapper mapper) {
+ this.channel = channel;
+ this.filters = filters;
+ this.uri = uri;
+ this.mapper = mapper;
+ this.customPropertiesMapper = new CustomPropertiesMapper();
+ }
+
+ @Override
+ public ServiceBusContract withFilter(ServiceFilter filter) {
+ ClientFilter[] newFilters = Arrays.copyOf(filters, filters.length + 1);
+ newFilters[filters.length] = new ClientFilterAdapter(filter);
+ return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper));
+ }
+
+ @Override
+ public ServiceBusContract withRequestFilterFirst(ServiceRequestFilter serviceRequestFilter) {
+ ClientFilter[] currentFilters = filters;
+ ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1];
+ System.arraycopy(currentFilters, 0, newFilters, 1, currentFilters.length);
+ newFilters[0] = new ClientFilterRequestAdapter(serviceRequestFilter);
+ return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper));
+ }
+
+ @Override
+ public ServiceBusContract withRequestFilterLast(ServiceRequestFilter serviceRequestFilter) {
+ ClientFilter[] currentFilters = filters;
+ ClientFilter[] newFilters = Arrays.copyOf(currentFilters, currentFilters.length + 1);
+ newFilters[currentFilters.length] = new ClientFilterRequestAdapter(serviceRequestFilter);
+ return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper));
+ }
+
+ @Override
+ public ServiceBusContract withResponseFilterFirst(ServiceResponseFilter serviceResponseFilter) {
+ ClientFilter[] currentFilters = filters;
+ ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1];
+ System.arraycopy(currentFilters, 0, newFilters, 1, currentFilters.length);
+ newFilters[0] = new ClientFilterResponseAdapter(serviceResponseFilter);
+ return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper));
+ }
+
+ @Override
+ public ServiceBusContract withResponseFilterLast(ServiceResponseFilter serviceResponseFilter) {
+ ClientFilter[] currentFilters = filters;
+ ClientFilter[] newFilters = Arrays.copyOf(currentFilters, currentFilters.length + 1);
+ newFilters[currentFilters.length] = new ClientFilterResponseAdapter(serviceResponseFilter);
+ return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper));
+ }
+
+ public Client getChannel() {
+ return channel;
+ }
+
+ public void setChannel(Client channel) {
+ this.channel = channel;
+ }
+
+ private AsyncWebResource getResource() {
+ AsyncWebResource resource = getChannel().asyncResource(uri).queryParam("api-version", "2013-07");
+ for (ClientFilter filter : filters) {
+ resource.addFilter(filter);
+ }
+ return resource;
+ }
+
+ @Override
+ public Observable sendMessage(String path, BrokeredMessage message) {
+ Builder request = getResource().path(path).path("messages").getRequestBuilder();
+
+ if (message.getContentType() != null) {
+ request = request.type(message.getContentType());
+ }
+
+ if (message.getBrokerProperties() != null) {
+ request = request.header("BrokerProperties", mapper.toString(message.getBrokerProperties()));
+ }
+
+ for (java.util.Map.Entry entry : message.getProperties().entrySet()) {
+ request.header(entry.getKey(), customPropertiesMapper.toString(entry.getValue()));
+ }
+
+ Observable ret = Observable.from(request.post(Void.class, message.getBody()));
+
+ return ret;
+ }
+
+ @Override
+ public Observable sendQueueMessage(String path, BrokeredMessage message) {
+ return sendMessage(path, message);
+ }
+
+ @Override
+ public Observable receiveQueueMessage(String queueName) {
+ return receiveQueueMessage(queueName, ReceiveMessageOptions.DEFAULT);
+ }
+
+ @Override
+ public Observable receiveQueueMessage(String queuePath, ReceiveMessageOptions options) {
+
+ AsyncWebResource resource = getResource().path(queuePath).path("messages").path("head");
+
+ Observable message = receiveMessage(options, resource);
+ return message.map(new Func1() {
+ @Override
+ public ReceiveQueueMessageResult call(BrokeredMessage msg) {
+ return new ReceiveQueueMessageResult(msg);
+ }
+ });
+ }
+
+ @Override
+ public Observable receiveMessage(String path) {
+ return receiveMessage(path, ReceiveMessageOptions.DEFAULT);
+ }
+
+ @Override
+ public Observable receiveMessage(String path, ReceiveMessageOptions options) {
+ AsyncWebResource resource = getResource().path(path).path("messages").path("head");
+
+ Observable message = receiveMessage(options, resource);
+ return message.map(new Func1() {
+ @Override
+ public ReceiveMessageResult call(BrokeredMessage msg) {
+ return new ReceiveMessageResult(msg);
+ }
+ });
+ }
+
+ private Observable receiveMessage(ReceiveMessageOptions options, AsyncWebResource resource) {
+ if (options.getTimeout() != null) {
+ resource = resource.queryParam("timeout", Integer.toString(options.getTimeout()));
+ }
+
+ Future clientResult;
+ if (options.isReceiveAndDelete()) {
+ clientResult = resource.delete(ClientResponse.class);
+ } else if (options.isPeekLock()) {
+ clientResult = resource.post(ClientResponse.class, "");
+ } else {
+ throw new RuntimeException("Unknown ReceiveMode");
+ }
+
+ Observable observableResult = Observable.from(clientResult)
+ .map(new Func1() {
+ @Override
+ public BrokeredMessage call(ClientResponse clientResult) {
+ if (clientResult.getStatus() == 204) {
+ return null;
+ }
+
+ BrokerProperties brokerProperties;
+ if (clientResult.getHeaders().containsKey("BrokerProperties")) {
+ brokerProperties = mapper
+ .fromString(clientResult.getHeaders().getFirst("BrokerProperties"));
+ } else {
+ brokerProperties = new BrokerProperties();
+ }
+
+ String location = clientResult.getHeaders().getFirst("Location");
+ if (location != null) {
+ brokerProperties.setLockLocation(location);
+ }
+
+ BrokeredMessage message = new BrokeredMessage(brokerProperties);
+
+ MediaType contentType = clientResult.getType();
+ if (contentType != null) {
+ message.setContentType(contentType.toString());
+ }
+
+ Date date = clientResult.getResponseDate();
+ if (date != null) {
+ message.setDate(date);
+ }
+
+ InputStream body = clientResult.getEntityInputStream();
+ if (body != null) {
+ message.setBody(body);
+ }
+
+ for (String key : clientResult.getHeaders().keySet()) {
+ Object value = clientResult.getHeaders().getFirst(key);
+ try {
+ value = customPropertiesMapper.fromString(value.toString());
+ message.setProperty(key, value);
+ } catch (ParseException e) {
+ // log.warn("Unable to parse custom header", e);
+ } catch (NumberFormatException e) {
+ // log.warn("Unable to parse custom header", e);
+ }
+ }
+
+ return message;
+ }
+ });
+
+ return observableResult;
+ }
+
+ @Override
+ public Observable sendTopicMessage(String topicName, BrokeredMessage message) {
+ return sendMessage(topicName, message);
+ }
+
+ @Override
+ public Observable receiveSubscriptionMessage(String topicName,
+ String subscriptionName) {
+ return receiveSubscriptionMessage(topicName, subscriptionName, ReceiveMessageOptions.DEFAULT);
+ }
+
+ @Override
+ public Observable receiveSubscriptionMessage(String topicName,
+ String subscriptionName, ReceiveMessageOptions options) {
+ AsyncWebResource resource = getResource().path(topicName).path("subscriptions").path(subscriptionName)
+ .path("messages").path("head");
+
+ Observable message = receiveMessage(options, resource);
+ return message.map(new Func1() {
+ @Override
+ public ReceiveSubscriptionMessageResult call(BrokeredMessage msg) {
+ return new ReceiveSubscriptionMessageResult(msg);
+ }
+ });
+
+ }
+
+ @Override
+ public Observable unlockMessage(BrokeredMessage message) {
+ return Observable.from(getChannel().asyncResource(message.getLockLocation()).put(Void.class, ""));
+ }
+
+ @Override
+ public Observable deleteMessage(BrokeredMessage message) {
+ return Observable.from(getChannel().asyncResource(message.getLockLocation()).delete(Void.class));
+ }
+
+ @Override
+ public Observable createQueue(QueueInfo queueInfo) {
+ Builder webResourceBuilder = getResource().path(queueInfo.getPath())
+ .type("application/atom+xml;type=entry;charset=utf-8");
+ if ((queueInfo.getForwardTo() != null) && !queueInfo.getForwardTo().isEmpty()) {
+ webResourceBuilder.header("ServiceBusSupplementaryAuthorization", queueInfo.getForwardTo());
+ }
+
+ Future queueInfoResult = webResourceBuilder.put(QueueInfo.class, queueInfo);
+
+ Observable queueInfoObservable = Observable.from(queueInfoResult)
+ .map(new Func1() {
+ @Override
+ public CreateQueueResult call(QueueInfo msg) {
+ return new CreateQueueResult(msg);
+ }
+ });
+
+ return queueInfoObservable;
+ }
+
+ @Override
+ public Observable deleteQueue(String queuePath) {
+ return Observable.from(getResource().path(queuePath).delete(Void.class));
+ }
+
+ @Override
+ public Observable getQueue(String queuePath) {
+ Future queueInfoResult = getResource().path(queuePath).get(QueueInfo.class);
+
+ Observable getQueueObservable = Observable.from(queueInfoResult)
+ .map(new Func1() {
+ @Override
+ public GetQueueResult call(QueueInfo msg) {
+ return new GetQueueResult(msg);
+ }
+ });
+
+ return getQueueObservable;
+ }
+
+ @Override
+ public Observable listQueues(ListQueuesOptions options) {
+ Future feed = listOptions(options, getResource().path("$Resources/Queues")).get(Feed.class);
+
+ Observable listQueuesObservable = Observable.from(feed)
+ .map(new Func1() {
+ @Override
+ public ListQueuesResult call(Feed feed) {
+ ArrayList queues = new ArrayList();
+ for (Entry entry : feed.getEntries()) {
+ queues.add(new QueueInfo(entry));
+ }
+ ListQueuesResult result = new ListQueuesResult();
+ result.setItems(queues);
+ return result;
+ }
+ });
+
+ return listQueuesObservable;
+
+ }
+
+ @Override
+ public Observable updateQueue(QueueInfo queueInfo) {
+ Builder webResourceBuilder = getResource().path(queueInfo.getPath())
+ .type("application/atom+xml;type=entry;charset=utf-8").header("If-Match", "*");
+ if ((queueInfo.getForwardTo() != null) && !queueInfo.getForwardTo().isEmpty()) {
+ webResourceBuilder.header("ServiceBusSupplementaryAuthorization", queueInfo.getForwardTo());
+ }
+ return Observable.from(webResourceBuilder.put(QueueInfo.class, queueInfo));
+ }
+
+ private AsyncWebResource listOptions(AbstractListOptions> options, AsyncWebResource path) {
+ if (options.getTop() != null) {
+ path = path.queryParam("$top", options.getTop().toString());
+ }
+ if (options.getSkip() != null) {
+ path = path.queryParam("$skip", options.getSkip().toString());
+ }
+ if (options.getFilter() != null) {
+ path = path.queryParam("$filter", options.getFilter());
+ }
+ return path;
+ }
+
+ @Override
+ public Observable createTopic(TopicInfo entry) {
+ return Observable.from(getResource().path(entry.getPath()).type("application/atom+xml;type=entry;charset=utf-8")
+ .put(TopicInfo.class, entry)).map(new Func1() {
+ @Override
+ public CreateTopicResult call(TopicInfo info) {
+ return new CreateTopicResult(info);
+ }
+ });
+ }
+
+ @Override
+ public Observable deleteTopic(String topicPath) {
+ return Observable.from(getResource().path(topicPath).delete(Void.class));
+ }
+
+ @Override
+ public Observable getTopic(String topicPath) {
+ return Observable.from(getResource().path(topicPath).get(TopicInfo.class))
+ .map(new Func1() {
+ @Override
+ public GetTopicResult call(TopicInfo info) {
+ return new GetTopicResult(info);
+ }
+ });
+ }
+
+ @Override
+ public Observable listTopics(ListTopicsOptions options) {
+ Future feed = listOptions(options, getResource().path("$Resources/Topics")).get(Feed.class);
+
+ Observable feedObservable = Observable.from(feed).map(new Func1() {
+ @Override
+ public ListTopicsResult call(Feed feed) {
+ ArrayList topics = new ArrayList();
+ for (Entry entry : feed.getEntries()) {
+ topics.add(new TopicInfo(entry));
+ }
+ ListTopicsResult result = new ListTopicsResult();
+ result.setItems(topics);
+ return result;
+ }
+ });
+
+ return feedObservable;
+ }
+
+ @Override
+ public Observable updateTopic(TopicInfo topicInfo) {
+ return Observable
+ .from(getResource().path(topicInfo.getPath()).type("application/atom+xml;type=entry;charset=utf-8")
+ .header("If-Match", "*").put(TopicInfo.class, topicInfo));
+ }
+
+ @Override
+ public Observable createSubscription(String topicPath,
+ SubscriptionInfo subscriptionInfo) {
+ Builder webResourceBuilder = getResource().path(topicPath).path("subscriptions")
+ .path(subscriptionInfo.getName()).type("application/atom+xml;type=entry;charset=utf-8");
+ if ((subscriptionInfo.getForwardTo() != null) && (!subscriptionInfo.getForwardTo().isEmpty())) {
+ webResourceBuilder.header("ServiceBusSupplementaryAuthorization", subscriptionInfo.getForwardTo());
+
+ }
+
+ return Observable.from(webResourceBuilder.put(SubscriptionInfo.class, subscriptionInfo))
+ .map(new Func1() {
+ @Override
+ public CreateSubscriptionResult call(SubscriptionInfo info) {
+ return new CreateSubscriptionResult(info);
+ }
+ });
+ }
+
+ @Override
+ public Observable deleteSubscription(String topicPath, String subscriptionName) {
+ return Observable.from(getResource().path(topicPath).path("subscriptions").path(subscriptionName).delete(Void.class));
+ }
+
+ @Override
+ public Observable getSubscription(String topicPath, String subscriptionName) {
+
+ return Observable.from(
+ getResource().path(topicPath).path("subscriptions").path(subscriptionName).get(SubscriptionInfo.class))
+ .map(new Func1() {
+ @Override
+ public GetSubscriptionResult call(SubscriptionInfo info) {
+ return new GetSubscriptionResult(info);
+ }
+ });
+ }
+
+ @Override
+ public Observable listSubscriptions(String topicPath, ListSubscriptionsOptions options) {
+ Future feed = listOptions(options, getResource().path(topicPath).path("subscriptions")).get(Feed.class);
+
+ Observable feedObservable = Observable.from(feed)
+ .map(new Func1() {
+ @Override
+ public ListSubscriptionsResult call(Feed feed) {
+ ArrayList list = new ArrayList();
+ for (Entry entry : feed.getEntries()) {
+ list.add(new SubscriptionInfo(entry));
+ }
+ ListSubscriptionsResult result = new ListSubscriptionsResult();
+ result.setItems(list);
+ return result;
+ }
+ });
+
+ return feedObservable;
+ }
+
+ @Override
+ public Observable updateSubscription(String topicName, SubscriptionInfo subscriptionInfo) {
+ Builder webResourceBuilder = getResource().path(topicName).path("subscriptions")
+ .path(subscriptionInfo.getName()).type("application/atom+xml;type=entry;charset=utf-8")
+ .header("If-Match", "*");
+ if ((subscriptionInfo.getForwardTo() != null) && !subscriptionInfo.getForwardTo().isEmpty()) {
+ webResourceBuilder.header("ServiceBusSupplementaryAuthorization", subscriptionInfo.getForwardTo());
+ }
+ return Observable.from(webResourceBuilder.put(SubscriptionInfo.class, subscriptionInfo));
+ }
+
+ @Override
+ public Observable createRule(String topicPath, String subscriptionName, RuleInfo rule) {
+
+ Observable ret = Observable.from(getResource().path(topicPath).path("subscriptions")
+ .path(subscriptionName).path("rules").path(rule.getName())
+ .type("application/atom+xml;type=entry;charset=utf-8").put(RuleInfo.class, rule))
+ .map(new Func1() {
+ @Override
+ public CreateRuleResult call(RuleInfo info) {
+ return new CreateRuleResult(info);
+ }
+ });
+
+ return ret;
+ }
+
+ @Override
+ public Observable deleteRule(String topicPath, String subscriptionName, String ruleName) {
+ return Observable.from(getResource().path(topicPath).path("subscriptions").path(subscriptionName).path("rules")
+ .path(ruleName).delete(Void.class));
+ }
+
+ @Override
+ public Observable getRule(String topicPath, String subscriptionName, String ruleName) {
+
+ Observable resultObservable = Observable.from(getResource().path(topicPath).path("subscriptions")
+ .path(subscriptionName).path("rules").path(ruleName).get(RuleInfo.class))
+ .map(new Func1() {
+ @Override
+ public GetRuleResult call(RuleInfo info) {
+ return new GetRuleResult(info);
+ }
+ });
+
+ return resultObservable;
+ }
+
+ @Override
+ public Observable listRules(String topicPath, String subscriptionName, ListRulesOptions options) {
+ Future feed = listOptions(options,
+ getResource().path(topicPath).path("subscriptions").path(subscriptionName).path("rules"))
+ .get(Feed.class);
+
+ Observable resultObservable = Observable.from(feed).map(new Func1() {
+ @Override
+ public ListRulesResult call(Feed feed) {
+ ArrayList list = new ArrayList();
+ for (Entry entry : feed.getEntries()) {
+ list.add(new RuleInfo(entry));
+ }
+ ListRulesResult result = new ListRulesResult();
+ result.setItems(list);
+ return result;
+ }
+ });
+
+ return resultObservable;
+ }
+
+ @Override
+ public Observable listQueues() {
+ return listQueues(ListQueuesOptions.DEFAULT);
+ }
+
+ @Override
+ public Observable listTopics() {
+ return listTopics(ListTopicsOptions.DEFAULT);
+ }
+
+ @Override
+ public Observable listSubscriptions(String topicName) {
+ return listSubscriptions(topicName, ListSubscriptionsOptions.DEFAULT);
+ }
+
+ @Override
+ public Observable listRules(String topicName, String subscriptionName) {
+ return listRules(topicName, subscriptionName, ListRulesOptions.DEFAULT);
+ }
+
+ @Override
+ public Observable renewQueueLock(String queueName, String messageId, String lockToken)
+ throws ServiceException {
+ Observable clientResponse = Observable.from(getResource().path(queueName).path("messages")
+ .path(messageId).path(lockToken).post(ClientResponse.class, ""))
+ .map(new Func1() {
+ @Override
+ public ClientResponse call(ClientResponse clientResponse) {
+ PipelineHelpers.throwIfNotSuccess(clientResponse);
+ return clientResponse;
+ }
+ });
+
+ return clientResponse;
+ }
+
+ @Override
+ public Observable renewSubscriptionLock(String topicName, String subscriptionName, String messageId,
+ String lockToken) throws ServiceException {
+ Observable clientResponse = Observable
+ .from(getResource().path(topicName).path("Subscriptions").path(subscriptionName).path("messages")
+ .path(messageId).path(lockToken).post(ClientResponse.class, ""))
+ .map(new Func1() {
+ @Override
+ public ClientResponse call(ClientResponse clientResponse) {
+ PipelineHelpers.throwIfNotSuccess(clientResponse);
+ return clientResponse;
+ }
+ });
+
+ return clientResponse;
+ }
+
+}