diff --git a/services/azure-servicebus/pom.xml b/services/azure-servicebus/pom.xml index d65c0169aea6..e96b1890550a 100644 --- a/services/azure-servicebus/pom.xml +++ b/services/azure-servicebus/pom.xml @@ -112,6 +112,11 @@ bcprov-jdk16 test + + io.reactivex + rxjava + 1.0.17 + diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java index 2b0c276442cd..5db61df2355d 100644 --- a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java +++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContract.java @@ -50,6 +50,8 @@ public interface ServiceBusContract extends JerseyFilterableService { + ServiceBusContractAsync async(); + /** * Sends a queue message. * diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContractAsync.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContractAsync.java new file mode 100644 index 000000000000..0cc3006f8796 --- /dev/null +++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/ServiceBusContractAsync.java @@ -0,0 +1,566 @@ +package com.microsoft.windowsazure.services.servicebus; + +import com.microsoft.windowsazure.core.pipeline.jersey.JerseyFilterableService; +import com.microsoft.windowsazure.exception.ServiceException; +import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage; +import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult; +import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult; +import com.microsoft.windowsazure.services.servicebus.models.CreateSubscriptionResult; +import com.microsoft.windowsazure.services.servicebus.models.CreateTopicResult; +import com.microsoft.windowsazure.services.servicebus.models.GetQueueResult; +import com.microsoft.windowsazure.services.servicebus.models.GetRuleResult; +import com.microsoft.windowsazure.services.servicebus.models.GetSubscriptionResult; +import com.microsoft.windowsazure.services.servicebus.models.GetTopicResult; +import com.microsoft.windowsazure.services.servicebus.models.ListQueuesOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListQueuesResult; +import com.microsoft.windowsazure.services.servicebus.models.ListRulesOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListRulesResult; +import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsResult; +import com.microsoft.windowsazure.services.servicebus.models.ListTopicsOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListTopicsResult; +import com.microsoft.windowsazure.services.servicebus.models.QueueInfo; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageResult; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveSubscriptionMessageResult; +import com.microsoft.windowsazure.services.servicebus.models.RuleInfo; +import com.microsoft.windowsazure.services.servicebus.models.SubscriptionInfo; +import com.microsoft.windowsazure.services.servicebus.models.TopicInfo; +import com.sun.jersey.api.client.ClientResponse; + +import rx.Observable; + +public interface ServiceBusContractAsync extends +JerseyFilterableService { + +/** +* Sends a queue message. +* +* @param queuePath +* A String object that represents the name of the +* queue to which the message will be sent. +* @param message +* A Message object that represents the message to +* send. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable sendQueueMessage(String queuePath, BrokeredMessage message); + +/** +* Receives a queue message. +* +* @param queuePath +* A String object that represents the name of the +* queue from which to receive the message. +* @return A Observable<ReceiveQueueMessageResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable receiveQueueMessage(String queuePath); + +/** +* Receives a queue message using the specified receive message options. +* +* @param queuePath +* A String object that represents the name of the +* queue from which to receive the message. +* @param options +* A ReceiveMessageOptions object that represents +* the receive message options. +* @return A Observable<ReceiveQueueMessageResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable receiveQueueMessage(String queuePath, + ReceiveMessageOptions options); + +/** +* Sends a topic message. +* +* @param topicPath +* A String object that represents the name of the +* topic to which the message will be sent. +* @param message +* A Message object that represents the message to +* send. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable sendTopicMessage(String topicPath, BrokeredMessage message); + +/** +* Receives a subscription message. +* +* @param topicPath +* A String object that represents the name of the +* topic to receive. +* @param subscriptionName +* A String object that represents the name of the +* subscription from the message will be received. +* @return A Observable<ReceiveSubscriptionMessageResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable receiveSubscriptionMessage( + String topicPath, String subscriptionName); + +/** +* Receives a subscription message using the specified receive message +* options. +* +* @param topicPath +* A String object that represents the name of the +* topic to receive. +* @param subscriptionName +* A String object that represents the name of the +* subscription from the message will be received. +* @param options +* A ReceiveMessageOptions object that represents +* the receive message options. +* @return A Observable<ReceiveSubscriptionMessageResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable receiveSubscriptionMessage( + String topicPath, String subscriptionName, + ReceiveMessageOptions options); + +/** +* Unlocks a message. +* +* @param message +* A Message object that represents the message to +* unlock. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable unlockMessage(BrokeredMessage message); + +/** +* Sends a message. +* +* @param path +* A String object that represents the path to which +* the message will be sent. This may be the value of a queuePath +* or a topicPath. +* @param message +* A Message object that represents the message to +* send. +* +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable sendMessage(String path, BrokeredMessage message); + +/** +* Receives a message. +* +* @param path +* A String object that represents the path from +* which a message will be received. This may either be the value +* of queuePath or a combination of the topicPath + +* "/subscriptions/" + subscriptionName. +* @return A Observable<ReceiveMessageResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable receiveMessage(String path); + +/** +* Receives a message using the specified receive message options. +* +* @param path +* A String object that represents the path from +* which a message will be received. This may either be the value +* of queuePath or a combination of the topicPath + +* "/subscriptions/" + subscriptionName. +* @param options +* A ReceiveMessageOptions object that represents +* the receive message options. +* @return A Observable<ReceiveMessageResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable receiveMessage(String path, + ReceiveMessageOptions options); + +/** +* Deletes a message. +* +* @param message +* A Message object that represents the message to +* delete. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable deleteMessage(BrokeredMessage message); + +/** +* Creates a queue. +* +* @param queueInfo +* A QueueInfo object that represents the queue to +* create. +* @return A Observable<CreateQueueResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable createQueue(QueueInfo queueInfo); + +/** +* Deletes a queue. +* +* @param queuePath +* A String object that represents the name of the +* queue to delete. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable deleteQueue(String queuePath); + +/** +* Retrieves a queue. +* +* @param queuePath +* A String object that represents the name of the +* queue to retrieve. +* @return A Observable<GetQueueResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable getQueue(String queuePath); + +/** +* Returns a list of queues. +* +* @return A Observable<ListQueuesResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listQueues(); + +/** +* Returns a list of queues. +* +* @param options +* A ListQueueOptions object that represents the +* options to list the queue. +* @return A Observable<ListQueuesResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listQueues(ListQueuesOptions options); + +/** +* Updates the information of a queue. +* +* @param queueInfo +* The information of a queue to be updated. +* +* @return A Observable<QueueInfo> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable updateQueue(QueueInfo queueInfo); + +/** +* Creates a topic. +* +* @param topic +* A Topic object that represents the topic to +* create. +* @return A Observable<CreateTopicResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable createTopic(TopicInfo topic); + +/** +* Deletes a topic. +* +* @param topicPath +* A String object that represents the name of the +* queue to delete. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable deleteTopic(String topicPath); + +/** +* Retrieves a topic. +* +* @param topicPath +* A String object that represents the name of the +* topic to retrieve. +* @return A Observable<GetTopicResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable getTopic(String topicPath); + +/** +* Returns a list of topics. +* +* @return A Observable<ListTopicsResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listTopics(); + +/** +* Returns a list of topics. +* +* @param options +* A ListTopicsOptions object that represents the +* options to list the topic. +* @return A Observable<ListTopicsResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listTopics(ListTopicsOptions options); + +/** +* Updates a topic. +* +* @param topicInfo +* A TopicInfo object that represents the topic to +* be updated. +* +* @return A Observable<TopicInfo> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable updateTopic(TopicInfo topicInfo); + +/** +* Creates a subscription. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscription +* A Subscription object that represents the +* subscription to create. +* @return A Observable<CreateSubscriptionResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable createSubscription(String topicPath, + SubscriptionInfo subscription); + +/** +* Deletes a subscription. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscriptionName +* A String object that represents the name of the +* subscription to delete. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable deleteSubscription(String topicPath, String subscriptionName); + +/** +* Retrieves a subscription. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscriptionName +* A String object that represents the name of the +* subscription to retrieve. +* @return A Observable<GetSubscriptionResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable getSubscription(String topicPath, + String subscriptionName); + +/** +* Returns a list of subscriptions. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscriptions to retrieve. +* @return A Observable<ListSubscriptionsResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listSubscriptions(String topicPath); + +/** +* Returns a list of subscriptions. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscriptions to retrieve. +* +* @param options +* A ListSubscriptionsOptions object that represents +* the options to list subscriptions. +* +* @return A Observable<ListSubscriptionsResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listSubscriptions(String topicPath, + ListSubscriptionsOptions options); + +/** +* Updates a subscription. +* +* @param topicName +* A String option which represents the name of the +* topic. +* @param subscriptionInfo +* A SubscriptionInfo option which represents the +* information of the subscription. +* @return A Observable<SubscriptionInfo> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable updateSubscription(String topicName, + SubscriptionInfo subscriptionInfo); + +/** +* Creates a rule. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscriptionName +* A String object that represents the name of the +* subscription for which the rule will be created. +* @param rule +* A Rule object that represents the rule to create. +* @return A Observable<CreateRuleResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable createRule(String topicPath, String subscriptionName, + RuleInfo rule); + +/** +* Deletes a rule. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscriptionName +* A String object that represents the name of the +* subscription for which the rule will be deleted. +* @param ruleName +* A String object that represents the name of the +* rule to delete. +* @return A Observable<Void> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable deleteRule(String topicPath, String subscriptionName, String ruleName); + +/** +* Retrieves a rule. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscriptionName +* A String object that represents the name of the +* subscription for which the rule will be retrieved. +* @param ruleName +* A String object that represents the name of the +* rule to retrieve. +* @return A Observable<GetRuleResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable getRule(String topicPath, String subscriptionName, + String ruleName); + +/** +* Returns a list of rules. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscriptionName +* A String object that represents the name of the +* subscription whose rules are being retrieved. +* @return A Observable<ListRulesResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listRules(String topicPath, String subscriptionName); + +/** +* Returns a list of rules. +* +* @param topicPath +* A String object that represents the name of the +* topic for the subscription. +* @param subscriptionName +* A String object that represents the name of the +* subscription whose rules are being retrieved. +* @param options +* A ListRulesOptions object that represents the +* options to retrieve rules. +* @return A Observable<ListRulesResult> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable listRules(String topicPath, String subscriptionName, + ListRulesOptions options); + +/** +* Renew queue lock. +* +* @param queueName +* A String object that represents the name of the +* queue. +* @param messageId +* A String object that represents the ID of the +* message. +* @param lockToken +* A String object that represents the token of the +* lock. +* @return A Observable<ClientResponse> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable renewQueueLock(String queueName, String messageId, String lockToken) + throws ServiceException; + +/** +* Renew subscription lock. +* +* @param topicName +* A String object that represents the name of the +* topic. +* @param queueName +* A String object that represents the name of the +* queue. +* @param messageId +* A String object that represents the ID of the +* message. +* @param lockToken +* A String object that represents the token of the +* lock. +* @return A Observable<ClientResponse> object that represents +* the result. +* If a service exception is encountered the returned Observable invokes onError passing ServiceException into it. +*/ +Observable renewSubscriptionLock(String topicName, String subscriptionName, + String messageId, String lockToken) throws ServiceException; +} diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java index 2b6f15f4adbb..f806f9a67500 100644 --- a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java +++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusExceptionProcessor.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import com.microsoft.windowsazure.services.servicebus.ServiceBusContract; +import com.microsoft.windowsazure.services.servicebus.ServiceBusContractAsync; import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage; import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult; import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult; @@ -68,6 +69,11 @@ public ServiceBusExceptionProcessor(ServiceBusRestProxy next) { this.next = next; } + @Override + public ServiceBusContractAsync async() { + return this.next.async(); + } + @Override public ServiceBusContract withFilter(ServiceFilter filter) { return new ServiceBusExceptionProcessor(next.withFilter(filter)); diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java index 6f7d4a61bdca..587c39bf4ff1 100644 --- a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java +++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxy.java @@ -14,26 +14,15 @@ */ package com.microsoft.windowsazure.services.servicebus.implementation; +import javax.inject.Inject; + import com.microsoft.windowsazure.core.UserAgentFilter; -import com.microsoft.windowsazure.core.pipeline.PipelineHelpers; import com.microsoft.windowsazure.core.pipeline.filter.ServiceRequestFilter; import com.microsoft.windowsazure.core.pipeline.filter.ServiceResponseFilter; -import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterAdapter; -import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterRequestAdapter; -import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterResponseAdapter; import com.microsoft.windowsazure.core.pipeline.jersey.ServiceFilter; import com.microsoft.windowsazure.exception.ServiceException; -import java.io.InputStream; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; - -import javax.inject.Inject; -import javax.ws.rs.core.MediaType; - import com.microsoft.windowsazure.services.servicebus.ServiceBusContract; -import com.microsoft.windowsazure.services.servicebus.models.AbstractListOptions; +import com.microsoft.windowsazure.services.servicebus.ServiceBusContractAsync; import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage; import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult; import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult; @@ -60,268 +49,116 @@ import com.microsoft.windowsazure.services.servicebus.models.SubscriptionInfo; import com.microsoft.windowsazure.services.servicebus.models.TopicInfo; import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.WebResource.Builder; import com.sun.jersey.api.client.filter.ClientFilter; -public class ServiceBusRestProxy implements ServiceBusContract { +import rx.Observable; - private Client channel; - private final String uri; - private final BrokerPropertiesMapper mapper; - private final CustomPropertiesMapper customPropertiesMapper; +public class ServiceBusRestProxy implements ServiceBusContract { - private ClientFilter[] filters; + private ServiceBusRestProxyAsync asyncProxy; @Inject - public ServiceBusRestProxy(Client channel, WrapFilter authFilter, - SasFilter sasAuthFilter, - UserAgentFilter userAgentFilter, - ServiceBusConnectionSettings connectionSettings, + public ServiceBusRestProxy(Client channel, WrapFilter authFilter, SasFilter sasAuthFilter, + UserAgentFilter userAgentFilter, ServiceBusConnectionSettings connectionSettings, BrokerPropertiesMapper mapper) { - this.channel = channel; - this.filters = new ClientFilter[0]; - this.uri = connectionSettings.getUri(); - this.mapper = mapper; - this.customPropertiesMapper = new CustomPropertiesMapper(); - if (connectionSettings.isSasAuthentication()) { - channel.addFilter(sasAuthFilter); - } else { - channel.addFilter(authFilter); - } - channel.addFilter(new ClientFilterRequestAdapter(userAgentFilter)); + asyncProxy = new ServiceBusRestProxyAsync(channel, authFilter, sasAuthFilter, userAgentFilter, + connectionSettings, mapper); + } + + public ServiceBusRestProxy(Client channel, ClientFilter[] filters, String uri, BrokerPropertiesMapper mapper) { + + asyncProxy = new ServiceBusRestProxyAsync(channel, filters, uri, mapper); + } + + ServiceBusRestProxy(ServiceBusRestProxyAsync async) { + this.asyncProxy = async; } - public ServiceBusRestProxy(Client channel, ClientFilter[] filters, - String uri, BrokerPropertiesMapper mapper) { - this.channel = channel; - this.filters = filters; - this.uri = uri; - this.mapper = mapper; - this.customPropertiesMapper = new CustomPropertiesMapper(); + @Override + public ServiceBusContractAsync async() { + return asyncProxy; } @Override public ServiceBusContract withFilter(ServiceFilter filter) { - ClientFilter[] newFilters = Arrays.copyOf(filters, filters.length + 1); - newFilters[filters.length] = new ClientFilterAdapter(filter); - return new ServiceBusRestProxy(channel, newFilters, uri, mapper); + return this.async().withFilter(filter); } @Override - public ServiceBusContract withRequestFilterFirst( - ServiceRequestFilter serviceRequestFilter) { - ClientFilter[] currentFilters = filters; - ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1]; - System.arraycopy(currentFilters, 0, newFilters, 1, - currentFilters.length); - newFilters[0] = new ClientFilterRequestAdapter(serviceRequestFilter); - return new ServiceBusRestProxy(channel, newFilters, uri, mapper); + public ServiceBusContract withRequestFilterFirst(ServiceRequestFilter serviceRequestFilter) { + return this.async().withRequestFilterFirst(serviceRequestFilter); } @Override - public ServiceBusContract withRequestFilterLast( - ServiceRequestFilter serviceRequestFilter) { - ClientFilter[] currentFilters = filters; - ClientFilter[] newFilters = Arrays.copyOf(currentFilters, - currentFilters.length + 1); - newFilters[currentFilters.length] = new ClientFilterRequestAdapter( - serviceRequestFilter); - return new ServiceBusRestProxy(channel, newFilters, uri, mapper); + public ServiceBusContract withRequestFilterLast(ServiceRequestFilter serviceRequestFilter) { + return this.async().withRequestFilterLast(serviceRequestFilter); } @Override - public ServiceBusContract withResponseFilterFirst( - ServiceResponseFilter serviceResponseFilter) { - ClientFilter[] currentFilters = filters; - ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1]; - System.arraycopy(currentFilters, 0, newFilters, 1, - currentFilters.length); - newFilters[0] = new ClientFilterResponseAdapter(serviceResponseFilter); - return new ServiceBusRestProxy(channel, newFilters, uri, mapper); + public ServiceBusContract withResponseFilterFirst(ServiceResponseFilter serviceResponseFilter) { + return this.async().withResponseFilterFirst(serviceResponseFilter); } @Override - public ServiceBusContract withResponseFilterLast( - ServiceResponseFilter serviceResponseFilter) { - ClientFilter[] currentFilters = filters; - ClientFilter[] newFilters = Arrays.copyOf(currentFilters, - currentFilters.length + 1); - newFilters[currentFilters.length] = new ClientFilterResponseAdapter( - serviceResponseFilter); - return new ServiceBusRestProxy(channel, newFilters, uri, mapper); + public ServiceBusContract withResponseFilterLast(ServiceResponseFilter serviceResponseFilter) { + return this.async().withResponseFilterLast(serviceResponseFilter); } public Client getChannel() { - return channel; + return this.asyncProxy.getChannel(); } public void setChannel(Client channel) { - this.channel = channel; - } - - private WebResource getResource() { - WebResource resource = getChannel().resource(uri).queryParam( - "api-version", "2013-07"); - for (ClientFilter filter : filters) { - resource.addFilter(filter); - } - return resource; + this.asyncProxy.setChannel(channel); } @Override - public void sendMessage(String path, BrokeredMessage message) { - Builder request = getResource().path(path).path("messages") - .getRequestBuilder(); - - if (message.getContentType() != null) { - request = request.type(message.getContentType()); - } - - if (message.getBrokerProperties() != null) { - request = request.header("BrokerProperties", - mapper.toString(message.getBrokerProperties())); - } - - for (java.util.Map.Entry entry : message - .getProperties().entrySet()) { - request.header(entry.getKey(), - customPropertiesMapper.toString(entry.getValue())); - } - - request.post(message.getBody()); + public void sendMessage(String path, BrokeredMessage message) throws ServiceException { + resolve(this.async().sendMessage(path, message)); } @Override - public void sendQueueMessage(String path, BrokeredMessage message) - throws ServiceException { + public void sendQueueMessage(String path, BrokeredMessage message) throws ServiceException { sendMessage(path, message); } @Override - public ReceiveQueueMessageResult receiveQueueMessage(String queueName) - throws ServiceException { + public ReceiveQueueMessageResult receiveQueueMessage(String queueName) throws ServiceException { return receiveQueueMessage(queueName, ReceiveMessageOptions.DEFAULT); } @Override - public ReceiveQueueMessageResult receiveQueueMessage(String queuePath, - ReceiveMessageOptions options) throws ServiceException { - - WebResource resource = getResource().path(queuePath).path("messages") - .path("head"); - - BrokeredMessage message = receiveMessage(options, resource); - return new ReceiveQueueMessageResult(message); + public ReceiveQueueMessageResult receiveQueueMessage(String queuePath, ReceiveMessageOptions options) + throws ServiceException { + return resolve(this.async().receiveQueueMessage(queuePath, options)); } @Override - public ReceiveMessageResult receiveMessage(String path) - throws ServiceException { + public ReceiveMessageResult receiveMessage(String path) throws ServiceException { return receiveMessage(path, ReceiveMessageOptions.DEFAULT); } @Override - public ReceiveMessageResult receiveMessage(String path, - ReceiveMessageOptions options) throws ServiceException { + public ReceiveMessageResult receiveMessage(String path, ReceiveMessageOptions options) throws ServiceException { + return resolve(this.async().receiveMessage(path, options)); + } - WebResource resource = getResource().path(path).path("messages") - .path("head"); - - BrokeredMessage message = receiveMessage(options, resource); - return new ReceiveMessageResult(message); - } - - private BrokeredMessage receiveMessage(ReceiveMessageOptions options, - WebResource resource) { - if (options.getTimeout() != null) { - resource = resource.queryParam("timeout", - Integer.toString(options.getTimeout())); - } - - ClientResponse clientResult; - if (options.isReceiveAndDelete()) { - clientResult = resource.delete(ClientResponse.class); - } else if (options.isPeekLock()) { - clientResult = resource.post(ClientResponse.class, ""); - } else { - throw new RuntimeException("Unknown ReceiveMode"); - } - - if (clientResult.getStatus() == 204) { - return null; - } - - BrokerProperties brokerProperties; - if (clientResult.getHeaders().containsKey("BrokerProperties")) { - brokerProperties = mapper.fromString(clientResult.getHeaders() - .getFirst("BrokerProperties")); - } else { - brokerProperties = new BrokerProperties(); - } - - String location = clientResult.getHeaders().getFirst("Location"); - if (location != null) { - brokerProperties.setLockLocation(location); - } - - BrokeredMessage message = new BrokeredMessage(brokerProperties); - - MediaType contentType = clientResult.getType(); - if (contentType != null) { - message.setContentType(contentType.toString()); - } - - Date date = clientResult.getResponseDate(); - if (date != null) { - message.setDate(date); - } - - InputStream body = clientResult.getEntityInputStream(); - if (body != null) { - message.setBody(body); - } - - for (String key : clientResult.getHeaders().keySet()) { - Object value = clientResult.getHeaders().getFirst(key); - try { - value = customPropertiesMapper.fromString(value.toString()); - message.setProperty(key, value); - } catch (ParseException e) { - // log.warn("Unable to parse custom header", e); - } catch (NumberFormatException e) { - // log.warn("Unable to parse custom header", e); - } - } - - return message; - } - - @Override - public void sendTopicMessage(String topicName, BrokeredMessage message) - throws ServiceException { + @Override + public void sendTopicMessage(String topicName, BrokeredMessage message) throws ServiceException { sendMessage(topicName, message); } @Override - public ReceiveSubscriptionMessageResult receiveSubscriptionMessage( - String topicName, String subscriptionName) throws ServiceException { - return receiveSubscriptionMessage(topicName, subscriptionName, - ReceiveMessageOptions.DEFAULT); + public ReceiveSubscriptionMessageResult receiveSubscriptionMessage(String topicName, String subscriptionName) + throws ServiceException { + return receiveSubscriptionMessage(topicName, subscriptionName, ReceiveMessageOptions.DEFAULT); } @Override - public ReceiveSubscriptionMessageResult receiveSubscriptionMessage( - String topicName, String subscriptionName, + public ReceiveSubscriptionMessageResult receiveSubscriptionMessage(String topicName, String subscriptionName, ReceiveMessageOptions options) throws ServiceException { - WebResource resource = getResource().path(topicName) - .path("subscriptions").path(subscriptionName).path("messages") - .path("head"); - - BrokeredMessage message = receiveMessage(options, resource); - return new ReceiveSubscriptionMessageResult(message); + return resolve(this.async().receiveSubscriptionMessage(topicName, subscriptionName, options)); } @Override @@ -335,210 +172,99 @@ public void deleteMessage(BrokeredMessage message) throws ServiceException { } @Override - public CreateQueueResult createQueue(QueueInfo queueInfo) - throws ServiceException { - Builder webResourceBuilder = getResource().path(queueInfo.getPath()) - .type("application/atom+xml;type=entry;charset=utf-8"); - if ((queueInfo.getForwardTo() != null) - && !queueInfo.getForwardTo().isEmpty()) { - webResourceBuilder.header("ServiceBusSupplementaryAuthorization", - queueInfo.getForwardTo()); - } - return new CreateQueueResult(webResourceBuilder.put(QueueInfo.class, - queueInfo)); + public CreateQueueResult createQueue(QueueInfo queueInfo) throws ServiceException { + return resolve(this.async().createQueue(queueInfo)); } @Override public void deleteQueue(String queuePath) throws ServiceException { - getResource().path(queuePath).delete(); + resolve(this.async().deleteQueue(queuePath)); } @Override public GetQueueResult getQueue(String queuePath) throws ServiceException { - return new GetQueueResult(getResource().path(queuePath).get( - QueueInfo.class)); + return resolve(this.async().getQueue(queuePath)); } @Override - public ListQueuesResult listQueues(ListQueuesOptions options) - throws ServiceException { - Feed feed = listOptions(options, - getResource().path("$Resources/Queues")).get(Feed.class); - ArrayList queues = new ArrayList(); - for (Entry entry : feed.getEntries()) { - queues.add(new QueueInfo(entry)); - } - ListQueuesResult result = new ListQueuesResult(); - result.setItems(queues); - return result; + public ListQueuesResult listQueues(ListQueuesOptions options) throws ServiceException { + return resolve(this.async().listQueues(options)); } @Override public QueueInfo updateQueue(QueueInfo queueInfo) throws ServiceException { - Builder webResourceBuilder = getResource().path(queueInfo.getPath()) - .type("application/atom+xml;type=entry;charset=utf-8") - .header("If-Match", "*"); - if ((queueInfo.getForwardTo() != null) - && !queueInfo.getForwardTo().isEmpty()) { - webResourceBuilder.header("ServiceBusSupplementaryAuthorization", - queueInfo.getForwardTo()); - } - return webResourceBuilder.put(QueueInfo.class, queueInfo); - } - - private WebResource listOptions(AbstractListOptions options, - WebResource path) { - if (options.getTop() != null) { - path = path.queryParam("$top", options.getTop().toString()); - } - if (options.getSkip() != null) { - path = path.queryParam("$skip", options.getSkip().toString()); - } - if (options.getFilter() != null) { - path = path.queryParam("$filter", options.getFilter()); - } - return path; - } - - @Override - public CreateTopicResult createTopic(TopicInfo entry) - throws ServiceException { - return new CreateTopicResult(getResource().path(entry.getPath()) - .type("application/atom+xml;type=entry;charset=utf-8") - .put(TopicInfo.class, entry)); + return resolve(this.async().updateQueue(queueInfo)); + } + + @Override + public CreateTopicResult createTopic(TopicInfo entry) throws ServiceException { + return resolve(this.async().createTopic(entry)); } @Override public void deleteTopic(String topicPath) throws ServiceException { - getResource().path(topicPath).delete(); + resolve(this.async().deleteTopic(topicPath)); } @Override public GetTopicResult getTopic(String topicPath) throws ServiceException { - return new GetTopicResult(getResource().path(topicPath).get( - TopicInfo.class)); + return resolve(this.async().getTopic(topicPath)); } @Override - public ListTopicsResult listTopics(ListTopicsOptions options) - throws ServiceException { - Feed feed = listOptions(options, - getResource().path("$Resources/Topics")).get(Feed.class); - ArrayList topics = new ArrayList(); - for (Entry entry : feed.getEntries()) { - topics.add(new TopicInfo(entry)); - } - ListTopicsResult result = new ListTopicsResult(); - result.setItems(topics); - return result; + public ListTopicsResult listTopics(ListTopicsOptions options) throws ServiceException { + return resolve(this.async().listTopics(options)); } @Override public TopicInfo updateTopic(TopicInfo topicInfo) throws ServiceException { - return getResource().path(topicInfo.getPath()) - .type("application/atom+xml;type=entry;charset=utf-8") - .header("If-Match", "*").put(TopicInfo.class, topicInfo); + return resolve(this.async().updateTopic(topicInfo)); } @Override - public CreateSubscriptionResult createSubscription(String topicPath, - SubscriptionInfo subscriptionInfo) { - Builder webResourceBuilder = getResource().path(topicPath) - .path("subscriptions").path(subscriptionInfo.getName()) - .type("application/atom+xml;type=entry;charset=utf-8"); - if ((subscriptionInfo.getForwardTo() != null) - && (!subscriptionInfo.getForwardTo().isEmpty())) { - webResourceBuilder.header("ServiceBusSupplementaryAuthorization", - subscriptionInfo.getForwardTo()); - - } - return new CreateSubscriptionResult(webResourceBuilder.put( - SubscriptionInfo.class, subscriptionInfo)); + public CreateSubscriptionResult createSubscription(String topicPath, SubscriptionInfo subscriptionInfo) { + return resolve(this.async().createSubscription(topicPath, subscriptionInfo)); } @Override public void deleteSubscription(String topicPath, String subscriptionName) { - getResource().path(topicPath).path("subscriptions") - .path(subscriptionName).delete(); + resolve(this.async().deleteSubscription(topicPath, subscriptionName)); } @Override - public GetSubscriptionResult getSubscription(String topicPath, - String subscriptionName) { - return new GetSubscriptionResult(getResource().path(topicPath) - .path("subscriptions").path(subscriptionName) - .get(SubscriptionInfo.class)); + public GetSubscriptionResult getSubscription(String topicPath, String subscriptionName) { + return resolve(this.async().getSubscription(topicPath, subscriptionName)); } @Override - public ListSubscriptionsResult listSubscriptions(String topicPath, - ListSubscriptionsOptions options) { - Feed feed = listOptions(options, - getResource().path(topicPath).path("subscriptions")).get( - Feed.class); - ArrayList list = new ArrayList(); - for (Entry entry : feed.getEntries()) { - list.add(new SubscriptionInfo(entry)); - } - ListSubscriptionsResult result = new ListSubscriptionsResult(); - result.setItems(list); - return result; + public ListSubscriptionsResult listSubscriptions(String topicPath, ListSubscriptionsOptions options) { + return resolve(this.async().listSubscriptions(topicPath, options)); } @Override - public SubscriptionInfo updateSubscription(String topicName, - SubscriptionInfo subscriptionInfo) throws ServiceException { - Builder webResourceBuilder = getResource().path(topicName) - .path("subscriptions").path(subscriptionInfo.getName()) - .type("application/atom+xml;type=entry;charset=utf-8") - .header("If-Match", "*"); - if ((subscriptionInfo.getForwardTo() != null) - && !subscriptionInfo.getForwardTo().isEmpty()) { - webResourceBuilder.header("ServiceBusSupplementaryAuthorization", - subscriptionInfo.getForwardTo()); - } - return webResourceBuilder.put(SubscriptionInfo.class, subscriptionInfo); + public SubscriptionInfo updateSubscription(String topicName, SubscriptionInfo subscriptionInfo) + throws ServiceException { + return resolve(this.async().updateSubscription(topicName, subscriptionInfo)); } @Override - public CreateRuleResult createRule(String topicPath, - String subscriptionName, RuleInfo rule) { - return new CreateRuleResult(getResource().path(topicPath) - .path("subscriptions").path(subscriptionName).path("rules") - .path(rule.getName()) - .type("application/atom+xml;type=entry;charset=utf-8") - .put(RuleInfo.class, rule)); + public CreateRuleResult createRule(String topicPath, String subscriptionName, RuleInfo rule) { + return resolve(this.async().createRule(topicPath, subscriptionName, rule)); } @Override - public void deleteRule(String topicPath, String subscriptionName, - String ruleName) { - getResource().path(topicPath).path("subscriptions") - .path(subscriptionName).path("rules").path(ruleName).delete(); + public void deleteRule(String topicPath, String subscriptionName, String ruleName) { + resolve(this.async().deleteRule(topicPath, subscriptionName, ruleName)); } @Override - public GetRuleResult getRule(String topicPath, String subscriptionName, - String ruleName) { - return new GetRuleResult(getResource().path(topicPath) - .path("subscriptions").path(subscriptionName).path("rules") - .path(ruleName).get(RuleInfo.class)); + public GetRuleResult getRule(String topicPath, String subscriptionName, String ruleName) { + return resolve(this.async().getRule(topicPath, subscriptionName, ruleName)); } @Override - public ListRulesResult listRules(String topicPath, String subscriptionName, - ListRulesOptions options) { - Feed feed = listOptions( - options, - getResource().path(topicPath).path("subscriptions") - .path(subscriptionName).path("rules")).get(Feed.class); - ArrayList list = new ArrayList(); - for (Entry entry : feed.getEntries()) { - list.add(new RuleInfo(entry)); - } - ListRulesResult result = new ListRulesResult(); - result.setItems(list); - return result; + public ListRulesResult listRules(String topicPath, String subscriptionName, ListRulesOptions options) { + return resolve(this.async().listRules(topicPath, subscriptionName, options)); } @Override @@ -552,34 +278,28 @@ public ListTopicsResult listTopics() throws ServiceException { } @Override - public ListSubscriptionsResult listSubscriptions(String topicName) - throws ServiceException { + public ListSubscriptionsResult listSubscriptions(String topicName) throws ServiceException { return listSubscriptions(topicName, ListSubscriptionsOptions.DEFAULT); } @Override - public ListRulesResult listRules(String topicName, String subscriptionName) - throws ServiceException { + public ListRulesResult listRules(String topicName, String subscriptionName) throws ServiceException { return listRules(topicName, subscriptionName, ListRulesOptions.DEFAULT); } @Override - public void renewQueueLock(String queueName, String messageId, - String lockToken) throws ServiceException { - ClientResponse clientResponse = getResource().path(queueName) - .path("messages").path(messageId).path(lockToken) - .post(ClientResponse.class, ""); - PipelineHelpers.throwIfNotSuccess(clientResponse); + public void renewQueueLock(String queueName, String messageId, String lockToken) throws ServiceException { + resolve(this.async().renewQueueLock(queueName, messageId, lockToken)); } @Override - public void renewSubscriptionLock(String topicName, - String subscriptionName, String messageId, String lockToken) + public void renewSubscriptionLock(String topicName, String subscriptionName, String messageId, String lockToken) throws ServiceException { - ClientResponse clientResponse = getResource().path(topicName) - .path("Subscriptions").path(subscriptionName).path("messages") - .path(messageId).path(lockToken).post(ClientResponse.class, ""); - PipelineHelpers.throwIfNotSuccess(clientResponse); + resolve(this.async().renewSubscriptionLock(topicName, subscriptionName, messageId, lockToken)); + } + + private T resolve(Observable o) { + return o.toBlocking().single(); } } diff --git a/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxyAsync.java b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxyAsync.java new file mode 100644 index 000000000000..cd50e664783b --- /dev/null +++ b/services/azure-servicebus/src/main/java/com/microsoft/windowsazure/services/servicebus/implementation/ServiceBusRestProxyAsync.java @@ -0,0 +1,643 @@ +package com.microsoft.windowsazure.services.servicebus.implementation; + +import java.io.InputStream; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.concurrent.Future; + +import javax.inject.Inject; +import javax.ws.rs.core.MediaType; + +import com.microsoft.windowsazure.core.UserAgentFilter; +import com.microsoft.windowsazure.core.pipeline.PipelineHelpers; +import com.microsoft.windowsazure.core.pipeline.filter.ServiceRequestFilter; +import com.microsoft.windowsazure.core.pipeline.filter.ServiceResponseFilter; +import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterAdapter; +import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterRequestAdapter; +import com.microsoft.windowsazure.core.pipeline.jersey.ClientFilterResponseAdapter; +import com.microsoft.windowsazure.core.pipeline.jersey.ServiceFilter; +import com.microsoft.windowsazure.exception.ServiceException; +import com.microsoft.windowsazure.services.servicebus.ServiceBusContract; +import com.microsoft.windowsazure.services.servicebus.ServiceBusContractAsync; +import com.microsoft.windowsazure.services.servicebus.models.AbstractListOptions; +import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage; +import com.microsoft.windowsazure.services.servicebus.models.CreateQueueResult; +import com.microsoft.windowsazure.services.servicebus.models.CreateRuleResult; +import com.microsoft.windowsazure.services.servicebus.models.CreateSubscriptionResult; +import com.microsoft.windowsazure.services.servicebus.models.CreateTopicResult; +import com.microsoft.windowsazure.services.servicebus.models.GetQueueResult; +import com.microsoft.windowsazure.services.servicebus.models.GetRuleResult; +import com.microsoft.windowsazure.services.servicebus.models.GetSubscriptionResult; +import com.microsoft.windowsazure.services.servicebus.models.GetTopicResult; +import com.microsoft.windowsazure.services.servicebus.models.ListQueuesOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListQueuesResult; +import com.microsoft.windowsazure.services.servicebus.models.ListRulesOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListRulesResult; +import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListSubscriptionsResult; +import com.microsoft.windowsazure.services.servicebus.models.ListTopicsOptions; +import com.microsoft.windowsazure.services.servicebus.models.ListTopicsResult; +import com.microsoft.windowsazure.services.servicebus.models.QueueInfo; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageResult; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult; +import com.microsoft.windowsazure.services.servicebus.models.ReceiveSubscriptionMessageResult; +import com.microsoft.windowsazure.services.servicebus.models.RuleInfo; +import com.microsoft.windowsazure.services.servicebus.models.SubscriptionInfo; +import com.microsoft.windowsazure.services.servicebus.models.TopicInfo; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.AsyncWebResource; +import com.sun.jersey.api.client.AsyncWebResource.Builder; +import com.sun.jersey.api.client.filter.ClientFilter; + +import rx.Observable; +import rx.functions.Func1; + +public class ServiceBusRestProxyAsync implements ServiceBusContractAsync { + + private Client channel; + private final String uri; + private final BrokerPropertiesMapper mapper; + private final CustomPropertiesMapper customPropertiesMapper; + + private ClientFilter[] filters; + + @Inject + public ServiceBusRestProxyAsync(Client channel, WrapFilter authFilter, SasFilter sasAuthFilter, + UserAgentFilter userAgentFilter, ServiceBusConnectionSettings connectionSettings, + BrokerPropertiesMapper mapper) { + + this.channel = channel; + this.filters = new ClientFilter[0]; + this.uri = connectionSettings.getUri(); + this.mapper = mapper; + this.customPropertiesMapper = new CustomPropertiesMapper(); + if (connectionSettings.isSasAuthentication()) { + channel.addFilter(sasAuthFilter); + } else { + channel.addFilter(authFilter); + } + channel.addFilter(new ClientFilterRequestAdapter(userAgentFilter)); + } + + public ServiceBusRestProxyAsync(Client channel, ClientFilter[] filters, String uri, BrokerPropertiesMapper mapper) { + this.channel = channel; + this.filters = filters; + this.uri = uri; + this.mapper = mapper; + this.customPropertiesMapper = new CustomPropertiesMapper(); + } + + @Override + public ServiceBusContract withFilter(ServiceFilter filter) { + ClientFilter[] newFilters = Arrays.copyOf(filters, filters.length + 1); + newFilters[filters.length] = new ClientFilterAdapter(filter); + return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper)); + } + + @Override + public ServiceBusContract withRequestFilterFirst(ServiceRequestFilter serviceRequestFilter) { + ClientFilter[] currentFilters = filters; + ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1]; + System.arraycopy(currentFilters, 0, newFilters, 1, currentFilters.length); + newFilters[0] = new ClientFilterRequestAdapter(serviceRequestFilter); + return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper)); + } + + @Override + public ServiceBusContract withRequestFilterLast(ServiceRequestFilter serviceRequestFilter) { + ClientFilter[] currentFilters = filters; + ClientFilter[] newFilters = Arrays.copyOf(currentFilters, currentFilters.length + 1); + newFilters[currentFilters.length] = new ClientFilterRequestAdapter(serviceRequestFilter); + return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper)); + } + + @Override + public ServiceBusContract withResponseFilterFirst(ServiceResponseFilter serviceResponseFilter) { + ClientFilter[] currentFilters = filters; + ClientFilter[] newFilters = new ClientFilter[currentFilters.length + 1]; + System.arraycopy(currentFilters, 0, newFilters, 1, currentFilters.length); + newFilters[0] = new ClientFilterResponseAdapter(serviceResponseFilter); + return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper)); + } + + @Override + public ServiceBusContract withResponseFilterLast(ServiceResponseFilter serviceResponseFilter) { + ClientFilter[] currentFilters = filters; + ClientFilter[] newFilters = Arrays.copyOf(currentFilters, currentFilters.length + 1); + newFilters[currentFilters.length] = new ClientFilterResponseAdapter(serviceResponseFilter); + return new ServiceBusRestProxy(new ServiceBusRestProxyAsync(channel, newFilters, uri, mapper)); + } + + public Client getChannel() { + return channel; + } + + public void setChannel(Client channel) { + this.channel = channel; + } + + private AsyncWebResource getResource() { + AsyncWebResource resource = getChannel().asyncResource(uri).queryParam("api-version", "2013-07"); + for (ClientFilter filter : filters) { + resource.addFilter(filter); + } + return resource; + } + + @Override + public Observable sendMessage(String path, BrokeredMessage message) { + Builder request = getResource().path(path).path("messages").getRequestBuilder(); + + if (message.getContentType() != null) { + request = request.type(message.getContentType()); + } + + if (message.getBrokerProperties() != null) { + request = request.header("BrokerProperties", mapper.toString(message.getBrokerProperties())); + } + + for (java.util.Map.Entry entry : message.getProperties().entrySet()) { + request.header(entry.getKey(), customPropertiesMapper.toString(entry.getValue())); + } + + Observable ret = Observable.from(request.post(Void.class, message.getBody())); + + return ret; + } + + @Override + public Observable sendQueueMessage(String path, BrokeredMessage message) { + return sendMessage(path, message); + } + + @Override + public Observable receiveQueueMessage(String queueName) { + return receiveQueueMessage(queueName, ReceiveMessageOptions.DEFAULT); + } + + @Override + public Observable receiveQueueMessage(String queuePath, ReceiveMessageOptions options) { + + AsyncWebResource resource = getResource().path(queuePath).path("messages").path("head"); + + Observable message = receiveMessage(options, resource); + return message.map(new Func1() { + @Override + public ReceiveQueueMessageResult call(BrokeredMessage msg) { + return new ReceiveQueueMessageResult(msg); + } + }); + } + + @Override + public Observable receiveMessage(String path) { + return receiveMessage(path, ReceiveMessageOptions.DEFAULT); + } + + @Override + public Observable receiveMessage(String path, ReceiveMessageOptions options) { + AsyncWebResource resource = getResource().path(path).path("messages").path("head"); + + Observable message = receiveMessage(options, resource); + return message.map(new Func1() { + @Override + public ReceiveMessageResult call(BrokeredMessage msg) { + return new ReceiveMessageResult(msg); + } + }); + } + + private Observable receiveMessage(ReceiveMessageOptions options, AsyncWebResource resource) { + if (options.getTimeout() != null) { + resource = resource.queryParam("timeout", Integer.toString(options.getTimeout())); + } + + Future clientResult; + if (options.isReceiveAndDelete()) { + clientResult = resource.delete(ClientResponse.class); + } else if (options.isPeekLock()) { + clientResult = resource.post(ClientResponse.class, ""); + } else { + throw new RuntimeException("Unknown ReceiveMode"); + } + + Observable observableResult = Observable.from(clientResult) + .map(new Func1() { + @Override + public BrokeredMessage call(ClientResponse clientResult) { + if (clientResult.getStatus() == 204) { + return null; + } + + BrokerProperties brokerProperties; + if (clientResult.getHeaders().containsKey("BrokerProperties")) { + brokerProperties = mapper + .fromString(clientResult.getHeaders().getFirst("BrokerProperties")); + } else { + brokerProperties = new BrokerProperties(); + } + + String location = clientResult.getHeaders().getFirst("Location"); + if (location != null) { + brokerProperties.setLockLocation(location); + } + + BrokeredMessage message = new BrokeredMessage(brokerProperties); + + MediaType contentType = clientResult.getType(); + if (contentType != null) { + message.setContentType(contentType.toString()); + } + + Date date = clientResult.getResponseDate(); + if (date != null) { + message.setDate(date); + } + + InputStream body = clientResult.getEntityInputStream(); + if (body != null) { + message.setBody(body); + } + + for (String key : clientResult.getHeaders().keySet()) { + Object value = clientResult.getHeaders().getFirst(key); + try { + value = customPropertiesMapper.fromString(value.toString()); + message.setProperty(key, value); + } catch (ParseException e) { + // log.warn("Unable to parse custom header", e); + } catch (NumberFormatException e) { + // log.warn("Unable to parse custom header", e); + } + } + + return message; + } + }); + + return observableResult; + } + + @Override + public Observable sendTopicMessage(String topicName, BrokeredMessage message) { + return sendMessage(topicName, message); + } + + @Override + public Observable receiveSubscriptionMessage(String topicName, + String subscriptionName) { + return receiveSubscriptionMessage(topicName, subscriptionName, ReceiveMessageOptions.DEFAULT); + } + + @Override + public Observable receiveSubscriptionMessage(String topicName, + String subscriptionName, ReceiveMessageOptions options) { + AsyncWebResource resource = getResource().path(topicName).path("subscriptions").path(subscriptionName) + .path("messages").path("head"); + + Observable message = receiveMessage(options, resource); + return message.map(new Func1() { + @Override + public ReceiveSubscriptionMessageResult call(BrokeredMessage msg) { + return new ReceiveSubscriptionMessageResult(msg); + } + }); + + } + + @Override + public Observable unlockMessage(BrokeredMessage message) { + return Observable.from(getChannel().asyncResource(message.getLockLocation()).put(Void.class, "")); + } + + @Override + public Observable deleteMessage(BrokeredMessage message) { + return Observable.from(getChannel().asyncResource(message.getLockLocation()).delete(Void.class)); + } + + @Override + public Observable createQueue(QueueInfo queueInfo) { + Builder webResourceBuilder = getResource().path(queueInfo.getPath()) + .type("application/atom+xml;type=entry;charset=utf-8"); + if ((queueInfo.getForwardTo() != null) && !queueInfo.getForwardTo().isEmpty()) { + webResourceBuilder.header("ServiceBusSupplementaryAuthorization", queueInfo.getForwardTo()); + } + + Future queueInfoResult = webResourceBuilder.put(QueueInfo.class, queueInfo); + + Observable queueInfoObservable = Observable.from(queueInfoResult) + .map(new Func1() { + @Override + public CreateQueueResult call(QueueInfo msg) { + return new CreateQueueResult(msg); + } + }); + + return queueInfoObservable; + } + + @Override + public Observable deleteQueue(String queuePath) { + return Observable.from(getResource().path(queuePath).delete(Void.class)); + } + + @Override + public Observable getQueue(String queuePath) { + Future queueInfoResult = getResource().path(queuePath).get(QueueInfo.class); + + Observable getQueueObservable = Observable.from(queueInfoResult) + .map(new Func1() { + @Override + public GetQueueResult call(QueueInfo msg) { + return new GetQueueResult(msg); + } + }); + + return getQueueObservable; + } + + @Override + public Observable listQueues(ListQueuesOptions options) { + Future feed = listOptions(options, getResource().path("$Resources/Queues")).get(Feed.class); + + Observable listQueuesObservable = Observable.from(feed) + .map(new Func1() { + @Override + public ListQueuesResult call(Feed feed) { + ArrayList queues = new ArrayList(); + for (Entry entry : feed.getEntries()) { + queues.add(new QueueInfo(entry)); + } + ListQueuesResult result = new ListQueuesResult(); + result.setItems(queues); + return result; + } + }); + + return listQueuesObservable; + + } + + @Override + public Observable updateQueue(QueueInfo queueInfo) { + Builder webResourceBuilder = getResource().path(queueInfo.getPath()) + .type("application/atom+xml;type=entry;charset=utf-8").header("If-Match", "*"); + if ((queueInfo.getForwardTo() != null) && !queueInfo.getForwardTo().isEmpty()) { + webResourceBuilder.header("ServiceBusSupplementaryAuthorization", queueInfo.getForwardTo()); + } + return Observable.from(webResourceBuilder.put(QueueInfo.class, queueInfo)); + } + + private AsyncWebResource listOptions(AbstractListOptions options, AsyncWebResource path) { + if (options.getTop() != null) { + path = path.queryParam("$top", options.getTop().toString()); + } + if (options.getSkip() != null) { + path = path.queryParam("$skip", options.getSkip().toString()); + } + if (options.getFilter() != null) { + path = path.queryParam("$filter", options.getFilter()); + } + return path; + } + + @Override + public Observable createTopic(TopicInfo entry) { + return Observable.from(getResource().path(entry.getPath()).type("application/atom+xml;type=entry;charset=utf-8") + .put(TopicInfo.class, entry)).map(new Func1() { + @Override + public CreateTopicResult call(TopicInfo info) { + return new CreateTopicResult(info); + } + }); + } + + @Override + public Observable deleteTopic(String topicPath) { + return Observable.from(getResource().path(topicPath).delete(Void.class)); + } + + @Override + public Observable getTopic(String topicPath) { + return Observable.from(getResource().path(topicPath).get(TopicInfo.class)) + .map(new Func1() { + @Override + public GetTopicResult call(TopicInfo info) { + return new GetTopicResult(info); + } + }); + } + + @Override + public Observable listTopics(ListTopicsOptions options) { + Future feed = listOptions(options, getResource().path("$Resources/Topics")).get(Feed.class); + + Observable feedObservable = Observable.from(feed).map(new Func1() { + @Override + public ListTopicsResult call(Feed feed) { + ArrayList topics = new ArrayList(); + for (Entry entry : feed.getEntries()) { + topics.add(new TopicInfo(entry)); + } + ListTopicsResult result = new ListTopicsResult(); + result.setItems(topics); + return result; + } + }); + + return feedObservable; + } + + @Override + public Observable updateTopic(TopicInfo topicInfo) { + return Observable + .from(getResource().path(topicInfo.getPath()).type("application/atom+xml;type=entry;charset=utf-8") + .header("If-Match", "*").put(TopicInfo.class, topicInfo)); + } + + @Override + public Observable createSubscription(String topicPath, + SubscriptionInfo subscriptionInfo) { + Builder webResourceBuilder = getResource().path(topicPath).path("subscriptions") + .path(subscriptionInfo.getName()).type("application/atom+xml;type=entry;charset=utf-8"); + if ((subscriptionInfo.getForwardTo() != null) && (!subscriptionInfo.getForwardTo().isEmpty())) { + webResourceBuilder.header("ServiceBusSupplementaryAuthorization", subscriptionInfo.getForwardTo()); + + } + + return Observable.from(webResourceBuilder.put(SubscriptionInfo.class, subscriptionInfo)) + .map(new Func1() { + @Override + public CreateSubscriptionResult call(SubscriptionInfo info) { + return new CreateSubscriptionResult(info); + } + }); + } + + @Override + public Observable deleteSubscription(String topicPath, String subscriptionName) { + return Observable.from(getResource().path(topicPath).path("subscriptions").path(subscriptionName).delete(Void.class)); + } + + @Override + public Observable getSubscription(String topicPath, String subscriptionName) { + + return Observable.from( + getResource().path(topicPath).path("subscriptions").path(subscriptionName).get(SubscriptionInfo.class)) + .map(new Func1() { + @Override + public GetSubscriptionResult call(SubscriptionInfo info) { + return new GetSubscriptionResult(info); + } + }); + } + + @Override + public Observable listSubscriptions(String topicPath, ListSubscriptionsOptions options) { + Future feed = listOptions(options, getResource().path(topicPath).path("subscriptions")).get(Feed.class); + + Observable feedObservable = Observable.from(feed) + .map(new Func1() { + @Override + public ListSubscriptionsResult call(Feed feed) { + ArrayList list = new ArrayList(); + for (Entry entry : feed.getEntries()) { + list.add(new SubscriptionInfo(entry)); + } + ListSubscriptionsResult result = new ListSubscriptionsResult(); + result.setItems(list); + return result; + } + }); + + return feedObservable; + } + + @Override + public Observable updateSubscription(String topicName, SubscriptionInfo subscriptionInfo) { + Builder webResourceBuilder = getResource().path(topicName).path("subscriptions") + .path(subscriptionInfo.getName()).type("application/atom+xml;type=entry;charset=utf-8") + .header("If-Match", "*"); + if ((subscriptionInfo.getForwardTo() != null) && !subscriptionInfo.getForwardTo().isEmpty()) { + webResourceBuilder.header("ServiceBusSupplementaryAuthorization", subscriptionInfo.getForwardTo()); + } + return Observable.from(webResourceBuilder.put(SubscriptionInfo.class, subscriptionInfo)); + } + + @Override + public Observable createRule(String topicPath, String subscriptionName, RuleInfo rule) { + + Observable ret = Observable.from(getResource().path(topicPath).path("subscriptions") + .path(subscriptionName).path("rules").path(rule.getName()) + .type("application/atom+xml;type=entry;charset=utf-8").put(RuleInfo.class, rule)) + .map(new Func1() { + @Override + public CreateRuleResult call(RuleInfo info) { + return new CreateRuleResult(info); + } + }); + + return ret; + } + + @Override + public Observable deleteRule(String topicPath, String subscriptionName, String ruleName) { + return Observable.from(getResource().path(topicPath).path("subscriptions").path(subscriptionName).path("rules") + .path(ruleName).delete(Void.class)); + } + + @Override + public Observable getRule(String topicPath, String subscriptionName, String ruleName) { + + Observable resultObservable = Observable.from(getResource().path(topicPath).path("subscriptions") + .path(subscriptionName).path("rules").path(ruleName).get(RuleInfo.class)) + .map(new Func1() { + @Override + public GetRuleResult call(RuleInfo info) { + return new GetRuleResult(info); + } + }); + + return resultObservable; + } + + @Override + public Observable listRules(String topicPath, String subscriptionName, ListRulesOptions options) { + Future feed = listOptions(options, + getResource().path(topicPath).path("subscriptions").path(subscriptionName).path("rules")) + .get(Feed.class); + + Observable resultObservable = Observable.from(feed).map(new Func1() { + @Override + public ListRulesResult call(Feed feed) { + ArrayList list = new ArrayList(); + for (Entry entry : feed.getEntries()) { + list.add(new RuleInfo(entry)); + } + ListRulesResult result = new ListRulesResult(); + result.setItems(list); + return result; + } + }); + + return resultObservable; + } + + @Override + public Observable listQueues() { + return listQueues(ListQueuesOptions.DEFAULT); + } + + @Override + public Observable listTopics() { + return listTopics(ListTopicsOptions.DEFAULT); + } + + @Override + public Observable listSubscriptions(String topicName) { + return listSubscriptions(topicName, ListSubscriptionsOptions.DEFAULT); + } + + @Override + public Observable listRules(String topicName, String subscriptionName) { + return listRules(topicName, subscriptionName, ListRulesOptions.DEFAULT); + } + + @Override + public Observable renewQueueLock(String queueName, String messageId, String lockToken) + throws ServiceException { + Observable clientResponse = Observable.from(getResource().path(queueName).path("messages") + .path(messageId).path(lockToken).post(ClientResponse.class, "")) + .map(new Func1() { + @Override + public ClientResponse call(ClientResponse clientResponse) { + PipelineHelpers.throwIfNotSuccess(clientResponse); + return clientResponse; + } + }); + + return clientResponse; + } + + @Override + public Observable renewSubscriptionLock(String topicName, String subscriptionName, String messageId, + String lockToken) throws ServiceException { + Observable clientResponse = Observable + .from(getResource().path(topicName).path("Subscriptions").path(subscriptionName).path("messages") + .path(messageId).path(lockToken).post(ClientResponse.class, "")) + .map(new Func1() { + @Override + public ClientResponse call(ClientResponse clientResponse) { + PipelineHelpers.throwIfNotSuccess(clientResponse); + return clientResponse; + } + }); + + return clientResponse; + } + +}