Author: tabish
Date: Thu Nov 5 22:33:48 2009
New Revision: 833220
URL:
http://svn.apache.org/viewvc?rev=833220&view=revLog:
https://issues.apache.org/activemq/browse/AMQNET-205Add support for message compression. Currently can only use the built in DeflateStream classes so compressed messages can only be read by two .NET client talking to each other via a Broker.
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs Thu Nov 5 22:33:48 2009
@@ -20,6 +20,7 @@
using System;
using System.Collections;
using System.IO;
+using System.IO.Compression;
namespace Apache.NMS.ActiveMQ.Commands
{
@@ -30,9 +31,7 @@
private EndianBinaryReader dataIn = null;
private EndianBinaryWriter dataOut = null;
private MemoryStream outputBuffer = null;
-
- // Need this later when we add compression to store true content length.
- private long length = 0;
+ private int length = 0;
public override byte GetDataStructureType()
{
@@ -450,6 +449,27 @@
}
}
+ public new byte[] Content
+ {
+ get
+ {
+ byte[] buffer = null;
+ InitializeReading();
+ if(this.length != 0)
+ {
+ buffer = new byte[this.length];
+ this.dataIn.Read(buffer, 0, buffer.Length);
+ }
+ return buffer;
+ }
+
+ set
+ {
+ InitializeWriting();
+ this.dataOut.Write(value, 0, value.Length);
+ }
+ }
+
public void Reset()
{
StoreContent();
@@ -464,14 +484,28 @@
FailIfWriteOnlyBody();
if(this.dataIn == null)
{
- if(this.Content != null)
+ byte[] data = base.Content;
+
+ if(base.Content == null)
{
- this.length = this.Content.Length;
+ data = new byte[0];
}
-
- // TODO - Add support for Message Compression.
- MemoryStream bytesIn = new MemoryStream(this.Content, false);
- dataIn = new EndianBinaryReader(bytesIn);
+
+ Stream target = new MemoryStream(data, false);
+
+ if(this.Connection != null && this.Compressed == true)
+ {
+ EndianBinaryReader reader = new EndianBinaryReader(target);
+ this.length = reader.ReadInt32();
+
+ target = new DeflateStream(target, CompressionMode.Decompress);
+ }
+ else
+ {
+ this.length = data.Length;
+ }
+
+ this.dataIn = new EndianBinaryReader(target);
}
}
@@ -480,25 +514,138 @@
FailIfReadOnlyBody();
if(this.dataOut == null)
{
- // TODO - Add support for Message Compression.
this.outputBuffer = new MemoryStream();
- this.dataOut = new EndianBinaryWriter(outputBuffer);
+ Stream target = this.outputBuffer;
+
+ if(this.Connection != null && this.Connection.UseCompression)
+ {
+ this.length = 0;
+ this.Compressed = true;
+
+ target = new DeflateStream(target, CompressionMode.Compress);
+ target = new LengthTrackerStream(target, this);
+ }
+
+ this.dataOut = new EndianBinaryWriter(target);
}
}
private void StoreContent()
{
- if( dataOut != null)
+ if(this.dataOut != null)
{
- dataOut.Close();
- // TODO - Add support for Message Compression.
+ if(this.Compressed == true)
+ {
+ MemoryStream final = new MemoryStream();
+ EndianBinaryWriter writer = new EndianBinaryWriter(final);
+
+ this.dataOut.Close();
+ byte[] compressed = this.outputBuffer.ToArray();
+
+ writer.Write(this.length);
+ writer.Write(compressed, 0, compressed.Length);
+ writer.Close();
+
+ base.Content = final.ToArray();
+ }
+ else
+ {
+ this.dataOut.Close();
+ base.Content = outputBuffer.ToArray();
+ }
- this.Content = outputBuffer.ToArray();
this.dataOut = null;
this.outputBuffer = null;
}
}
+ /// <summary>
+ /// Used when the message compression is enabled to track how many bytes
+ /// the EndianBinaryWriter actually writes to the stream before compression
+ /// so that the receiving client can read off the real bodylength from the
+ /// Message before the data is actually read.
+ /// </summary>
+ private class LengthTrackerStream : Stream
+ {
+ private ActiveMQBytesMessage parent;
+ private Stream sink;
+
+ public LengthTrackerStream(Stream sink, ActiveMQBytesMessage parent) : base()
+ {
+ this.sink = sink;
+ this.parent = parent;
+ }
+
+ public override void Close()
+ {
+ this.sink.Close();
+ base.Close();
+ }
+
+ public override long Position
+ {
+ get { return this.sink.Position; }
+ set { this.sink.Position = value; }
+ }
+
+ public override long Length
+ {
+ get { return this.sink.Length; }
+ }
+
+ public override bool CanSeek
+ {
+ get { return this.sink.CanSeek; }
+ }
+
+ public override bool CanRead
+ {
+ get { return this.sink.CanRead; }
+ }
+
+ public override bool CanWrite
+ {
+ get { return this.sink.CanWrite; }
+ }
+
+ public override int ReadByte()
+ {
+ return this.sink.ReadByte();
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return this.sink.Read(buffer, offset, count);
+ }
+
+ public override void WriteByte(byte value)
+ {
+ this.parent.length++;
+ this.sink.WriteByte(value);
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ this.parent.length += count;
+ this.sink.Write(buffer, offset, count);
+ }
+
+ public override void Flush()
+ {
+ this.sink.Flush();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ return this.sink.Seek(offset, origin);
+ }
+
+ public override void SetLength(long value)
+ {
+ this.sink.SetLength(value);
+ }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs Thu Nov 5 22:33:48 2009
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+using System;
+using System.IO;
+using System.IO.Compression;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.OpenWire;
@@ -62,13 +65,29 @@
{
get
{
- if(body == null)
- {
- body = PrimitiveMap.Unmarshal(Content);
- typeConverter = new PrimitiveMapInterceptor(this, body);
+ if(this.body == null)
+ {
+ if(this.Content != null && this.Content.Length > 0)
+ {
+ MemoryStream buffer = new MemoryStream(this.Content);
+ Stream source = buffer;
+
+ if(this.Connection != null && this.Compressed)
+ {
+ source = new DeflateStream(source, CompressionMode.Decompress);
+ }
+
+ this.body = PrimitiveMap.Unmarshal(source);
+ }
+ else
+ {
+ this.body = new PrimitiveMap();
+ }
+
+ this.typeConverter = new PrimitiveMapInterceptor(this, this.body);
}
-
- return typeConverter;
+
+ return this.typeConverter;
}
}
@@ -80,7 +99,20 @@
}
else
{
- Content = body.Marshal();
+ MemoryStream buffer = new MemoryStream();
+ Stream target = buffer;
+
+ if(this.Connection != null && this.Connection.UseCompression)
+ {
+ target = new DeflateStream(target, CompressionMode.Compress);
+
+ this.Compressed = true;
+ }
+
+ this.body.Marshal(target);
+ target.Close();
+
+ this.Content = buffer.ToArray();
}
Tracer.Debug("BeforeMarshalling, content is: " + Content);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs Thu Nov 5 22:33:48 2009
@@ -30,6 +30,7 @@
private MessagePropertyIntercepter propertyHelper;
private PrimitiveMap properties;
+ private Connection connection;
public event AcknowledgeHandler Acknowledger;
@@ -38,11 +39,7 @@
return (ActiveMQMessage) message;
}
- // TODO generate Equals method
- // TODO generate GetHashCode method
-
- public ActiveMQMessage()
- : base()
+ public ActiveMQMessage() : base()
{
Timestamp = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
}
@@ -97,8 +94,8 @@
public virtual void ClearBody()
{
- this.Content = null;
this.ReadOnlyBody = false;
+ this.Content = null;
}
public virtual void ClearProperties()
@@ -159,6 +156,12 @@
get { return Destination; }
set { this.Destination = ActiveMQDestination.Transform(value); }
}
+
+ public Connection Connection
+ {
+ get { return this.connection; }
+ set { this.connection = value; }
+ }
/// <summary>
/// The correlation ID used to correlate messages with conversations or long running business processes
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs Thu Nov 5 22:33:48 2009
@@ -17,6 +17,7 @@
using System;
using System.IO;
+using System.IO.Compression;
using System.Collections;
using Apache.NMS;
using Apache.NMS.Util;
@@ -882,9 +883,15 @@
FailIfWriteOnlyBody();
if(this.dataIn == null)
{
- // TODO - Add support for Message Compression.
this.byteBuffer = new MemoryStream(this.Content, false);
- dataIn = new EndianBinaryReader(byteBuffer);
+
+ Stream target = this.byteBuffer;
+ if(this.Connection != null && this.Compressed == true)
+ {
+ target = new DeflateStream(this.byteBuffer, CompressionMode.Decompress);
+ }
+
+ this.dataIn = new EndianBinaryReader(target);
}
}
@@ -893,9 +900,17 @@
FailIfReadOnlyBody();
if(this.dataOut == null)
{
- // TODO - Add support for Message Compression.
- this.byteBuffer = new MemoryStream();
- this.dataOut = new EndianBinaryWriter(byteBuffer);
+ this.byteBuffer = new MemoryStream();
+
+ Stream target = this.byteBuffer;
+ if(this.Connection != null && this.Connection.UseCompression)
+ {
+ target = new DeflateStream(this.byteBuffer, CompressionMode.Compress);
+
+ this.Compressed = true;
+ }
+
+ this.dataOut = new EndianBinaryWriter(target);
}
}
@@ -904,7 +919,6 @@
if( dataOut != null)
{
dataOut.Close();
- // TODO - Add support for Message Compression.
this.Content = byteBuffer.ToArray();
this.dataOut = null;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs Thu Nov 5 22:33:48 2009
@@ -17,6 +17,7 @@
using System;
using System.IO;
+using System.IO.Compression;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.OpenWire;
@@ -70,8 +71,13 @@
{
if(this.text == null && this.Content != null)
{
- // TODO - Handle Compression
- MemoryStream stream = new MemoryStream(this.Content);
+ Stream stream = new MemoryStream(this.Content);
+
+ if(this.Connection != null && this.Compressed == true)
+ {
+ stream = new DeflateStream(stream, CompressionMode.Decompress);
+ }
+
EndianBinaryReader reader = new EndianBinaryReader(stream);
this.text = reader.ReadString32();
this.Content = null;
@@ -99,15 +105,23 @@
if(this.Content == null && text != null)
{
byte[] data = null;
-
- // TODO - Deal with Compressoin.
// Set initial size to the size of the string the UTF-8 encode could
// result in more if there are chars that encode to multibye values.
- MemoryStream stream = new MemoryStream(text.Length);
- EndianBinaryWriter writer = new EndianBinaryWriter(stream);
+ MemoryStream buffer = new MemoryStream(text.Length);
+ Stream target = buffer;
+
+ if(this.Connection != null && this.Connection.UseCompression)
+ {
+ target = new DeflateStream(target, CompressionMode.Compress);
+
+ this.Compressed = true;
+ }
+
+ EndianBinaryWriter writer = new EndianBinaryWriter(target);
writer.WriteString32(text);
- data = stream.GetBuffer();
+ target.Close();
+ data = buffer.ToArray();
this.Content = data;
this.text = null;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Nov 5 22:33:48 2009
@@ -44,6 +44,7 @@
private bool asyncSend = false;
private bool alwaysSyncSend = false;
private bool asyncClose = true;
+ private bool useCompression = false;
private bool copyMessageOnSend = true;
private int producerWindowSize = 0;
private bool connected = false;
@@ -182,6 +183,19 @@
set { copyMessageOnSend = value; }
}
+ /// <summary>
+ /// Enable or Disable the use of Compression on Message bodies. When enabled all
+ /// messages have their body compressed using the Deflate compression algorithm.
+ /// The recipient of the message must support the use of message compression as well
+ /// otherwise the receiving client will receive a message whose body appears in the
+ /// compressed form.
+ /// </summary>
+ public bool UseCompression
+ {
+ get { return this.useCompression; }
+ set { this.useCompression = value; }
+ }
+
public IConnectionMetaData MetaData
{
get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Thu Nov 5 22:33:48 2009
@@ -37,6 +37,7 @@
private string connectionUserName;
private string connectionPassword;
private string clientId;
+ private bool useCompression;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -97,15 +98,18 @@
ITransport transport = TransportFactory.CreateTransport(uri);
Connection connection = new Connection(uri, transport, info);
+ // Set the Factory level configuration to the Connection, this can be overriden by
+ // the params on the Connection URI so we do this before applying the params.
+ connection.UseCompression = this.useCompression;
+ connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+ connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+
// Set properties on connection using parameters prefixed with "connection."
// Since this could be a composite Uri, assume the connection-specific parameters
// are associated with the outer-most specification of the composite Uri. What's nice
// is that this works with simple Uri as well.
URISupport.CompositeData c = URISupport.parseComposite(uri);
URISupport.SetProperties(connection, c.Parameters, "connection.");
-
- connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
- connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
connection.ITransport.Start();
return connection;
@@ -140,6 +144,12 @@
set { clientId = value; }
}
+ public bool UseCompression
+ {
+ get { return this.useCompression; }
+ set { this.useCompression = value; }
+ }
+
public IRedeliveryPolicy RedeliveryPolicy
{
get { return this.redeliveryPolicy; }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Nov 5 22:33:48 2009
@@ -584,7 +584,7 @@
}
else if(dispatch.Message.IsExpired())
{
- Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch);
+ Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
BeforeMessageIsConsumed(dispatch);
AfterMessageIsConsumed(dispatch, true);
@@ -596,7 +596,6 @@
}
else
{
- Tracer.DebugFormat("{0} received message: {1}", info.ConsumerId, dispatch);
return dispatch;
}
}
@@ -920,6 +919,8 @@
private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch)
{
ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;
+
+ message.Connection = this.session.Connection;
if(this.session.IsClientAcknowledge)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Nov 5 22:33:48 2009
@@ -514,48 +514,48 @@
public IMessage CreateMessage()
{
ActiveMQMessage answer = new ActiveMQMessage();
- return answer;
+ return ConfigureMessage(answer) as IMessage;
}
public ITextMessage CreateTextMessage()
{
ActiveMQTextMessage answer = new ActiveMQTextMessage();
- return answer;
+ return ConfigureMessage(answer) as ITextMessage;
}
public ITextMessage CreateTextMessage(string text)
{
ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
- return answer;
+ return ConfigureMessage(answer) as ITextMessage;
}
public IMapMessage CreateMapMessage()
{
- return new ActiveMQMapMessage();
+ return ConfigureMessage(new ActiveMQMapMessage()) as IMapMessage;
}
public IBytesMessage CreateBytesMessage()
{
- return new ActiveMQBytesMessage();
+ return ConfigureMessage(new ActiveMQBytesMessage()) as IBytesMessage;
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
answer.Content = body;
- return answer;
+ return ConfigureMessage(answer) as IBytesMessage;
}
public IStreamMessage CreateStreamMessage()
{
- return new ActiveMQStreamMessage();
+ return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage;
}
public IObjectMessage CreateObjectMessage(object body)
{
ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
answer.Body = body;
- return answer;
+ return ConfigureMessage(answer) as IObjectMessage;
}
public void Commit()
@@ -804,6 +804,32 @@
}
}
}
+
+ private ActiveMQMessage ConfigureMessage(ActiveMQMessage message)
+ {
+ message.Connection = this.connection;
+
+ if(this.IsTransacted)
+ {
+ // Allows Acknowledge to be called in a transaction with no effect per JMS Spec.
+ message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+ }
+
+ return message;
+ }
+
+ /// <summary>
+ /// Prevents message from throwing an exception if a client calls Acknoweldge on
+ /// a message that is part of a transaction either being produced or consumed. The
+ /// JMS Spec indicates that users should be able to call Acknowledge with no effect
+ /// if the message is in a transaction.
+ /// </summary>
+ /// <param name="message">
+ /// A <see cref="ActiveMQMessage"/>
+ /// </param>
+ private void DoNothingAcknowledge(ActiveMQMessage message)
+ {
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs Thu Nov 5 22:33:48 2009
@@ -30,9 +30,24 @@
{
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
- Assert.IsNull( message.Content );
+ // Test that a BytesMessage is created in WriteOnly mode.
+ try
+ {
+ byte[] content = message.Content;
+ content.SetValue(0, 0);
+ Assert.Fail("Should have thrown an exception");
+ }
+ catch
+ {
+ }
+
Assert.IsTrue( !message.ReadOnlyBody );
Assert.IsTrue( !message.ReadOnlyProperties );
+
+ message.Reset();
+
+ Assert.IsNull( message.Content );
+ Assert.IsTrue( message.ReadOnlyBody );
}
[Test]
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs?rev=833220&r1=833219&r2=833220&view=diff==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs Thu Nov 5 22:33:48 2009
@@ -445,7 +445,6 @@
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
message.ClearBody();
Assert.IsFalse(message.ReadOnlyBody);
- Assert.IsNull(message.Content);
}
[Test]
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs?rev=833220&view=auto==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs Thu Nov 5 22:33:48 2009
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+ [TestFixture]
+ public class MessageCompressionTest : NMSTestSupport
+ {
+ protected static string TEST_CLIENT_ID = "MessageCompressionTestClientId";
+ protected static string DESTINATION_NAME = "MessageCompressionTestDest";
+
+ // The following text should compress well
+ private const string TEXT = "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ + "The quick red fox jumped over the lazy brown dog. ";
+
+ protected bool a = true;
+ protected byte b = 123;
+ protected char c = 'c';
+ protected short d = 0x1234;
+ protected int e = 0x12345678;
+ protected long f = 0x1234567812345678;
+ protected string g = "Hello World!";
+ protected bool h = false;
+ protected byte i = 0xFF;
+ protected short j = -0x1234;
+ protected int k = -0x12345678;
+ protected long l = -0x1234567812345678;
+ protected float m = 2.1F;
+ protected double n = 2.3;
+
+ [Test]
+ public void TestTextMessageCompression()
+ {
+ using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+ {
+ connection.UseCompression = true;
+ connection.Start();
+
+ Assert.IsTrue(connection.UseCompression);
+
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ ITextMessage message = session.CreateTextMessage(TEXT);
+
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer = session.CreateProducer(destination);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ producer.Send(message);
+
+ message = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as ITextMessage;
+
+ Assert.IsNotNull(message);
+ Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+ Assert.AreEqual(TEXT, message.Text);
+ }
+ }
+ }
+
+ [Test]
+ public void TestStreamMessageCompression()
+ {
+ using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+ {
+ connection.UseCompression = true;
+ connection.Start();
+
+ Assert.IsTrue(connection.UseCompression);
+
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IStreamMessage message = session.CreateStreamMessage();
+
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer = session.CreateProducer(destination);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ message.WriteBoolean(a);
+ message.WriteByte(b);
+ message.WriteChar(c);
+ message.WriteInt16(d);
+ message.WriteInt32(e);
+ message.WriteInt64(f);
+ message.WriteString(g);
+ message.WriteBoolean(h);
+ message.WriteByte(i);
+ message.WriteInt16(j);
+ message.WriteInt32(k);
+ message.WriteInt64(l);
+ message.WriteSingle(m);
+ message.WriteDouble(n);
+
+ producer.Send(message);
+
+ message = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as IStreamMessage;
+
+ Assert.IsNotNull(message);
+ Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+
+ // use generic API to access entries
+ Assert.AreEqual(a, message.ReadBoolean(), "Stream Boolean Value: a");
+ Assert.AreEqual(b, message.ReadByte(), "Stream Byte Value: b");
+ Assert.AreEqual(c, message.ReadChar(), "Stream Char Value: c");
+ Assert.AreEqual(d, message.ReadInt16(), "Stream Int16 Value: d");
+ Assert.AreEqual(e, message.ReadInt32(), "Stream Int32 Value: e");
+ Assert.AreEqual(f, message.ReadInt64(), "Stream Int64 Value: f");
+ Assert.AreEqual(g, message.ReadString(), "Stream String Value: g");
+ Assert.AreEqual(h, message.ReadBoolean(), "Stream Boolean Value: h");
+ Assert.AreEqual(i, message.ReadByte(), "Stream Byte Value: i");
+ Assert.AreEqual(j, message.ReadInt16(), "Stream Int16 Value: j");
+ Assert.AreEqual(k, message.ReadInt32(), "Stream Int32 Value: k");
+ Assert.AreEqual(l, message.ReadInt64(), "Stream Int64 Value: l");
+ Assert.AreEqual(m, message.ReadSingle(), "Stream Single Value: m");
+ Assert.AreEqual(n, message.ReadDouble(), "Stream Double Value: n");
+ }
+ }
+ }
+
+ [Test]
+ public void TestMapMessageCompression()
+ {
+ using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+ {
+ connection.UseCompression = true;
+ connection.Start();
+
+ Assert.IsTrue(connection.UseCompression);
+
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IMapMessage message = session.CreateMapMessage();
+
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer = session.CreateProducer(destination);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ message.Body["a"] = a;
+ message.Body["b"] = b;
+ message.Body["c"] = c;
+ message.Body["d"] = d;
+ message.Body["e"] = e;
+ message.Body["f"] = f;
+ message.Body["g"] = g;
+ message.Body["h"] = h;
+ message.Body["i"] = i;
+ message.Body["j"] = j;
+ message.Body["k"] = k;
+ message.Body["l"] = l;
+ message.Body["m"] = m;
+ message.Body["n"] = n;
+
+ producer.Send(message);
+
+ message = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as IMapMessage;
+
+ Assert.IsNotNull(message);
+ Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+
+ Assert.AreEqual(a, message.Body.GetBool("a"), "map entry: a");
+ Assert.AreEqual(b, message.Body.GetByte("b"), "map entry: b");
+ Assert.AreEqual(c, message.Body.GetChar("c"), "map entry: c");
+ Assert.AreEqual(d, message.Body.GetShort("d"), "map entry: d");
+ Assert.AreEqual(e, message.Body.GetInt("e"), "map entry: e");
+ Assert.AreEqual(f, message.Body.GetLong("f"), "map entry: f");
+ Assert.AreEqual(g, message.Body.GetString("g"), "map entry: g");
+ Assert.AreEqual(h, message.Body.GetBool("h"), "map entry: h");
+ Assert.AreEqual(i, message.Body.GetByte("i"), "map entry: i");
+ Assert.AreEqual(j, message.Body.GetShort("j"), "map entry: j");
+ Assert.AreEqual(k, message.Body.GetInt("k"), "map entry: k");
+ Assert.AreEqual(l, message.Body.GetLong("l"), "map entry: l");
+ Assert.AreEqual(m, message.Body.GetFloat("m"), "map entry: m");
+ Assert.AreEqual(n, message.Body.GetDouble("n"), "map entry: n");
+ }
+ }
+ }
+
+ [Test]
+ public void TestBytesMessageCompression()
+ {
+ using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+ {
+ connection.UseCompression = true;
+ connection.Start();
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IDestination destination = session.CreateTemporaryQueue();
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ using(IMessageProducer producer = session.CreateProducer(destination))
+ {
+ IBytesMessage message = session.CreateBytesMessage();
+
+ message.WriteBoolean(a);
+ message.WriteByte(b);
+ message.WriteChar(c);
+ message.WriteInt16(d);
+ message.WriteInt32(e);
+ message.WriteInt64(f);
+ message.WriteString(g);
+ message.WriteBoolean(h);
+ message.WriteByte(i);
+ message.WriteInt16(j);
+ message.WriteInt32(k);
+ message.WriteInt64(l);
+ message.WriteSingle(m);
+ message.WriteDouble(n);
+
+ producer.Send(message);
+
+ message = consumer.Receive(receiveTimeout) as IBytesMessage;
+
+ Assert.IsNotNull(message);
+ Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+
+ Assert.AreEqual(a, message.ReadBoolean(), "Stream Boolean Value: a");
+ Assert.AreEqual(b, message.ReadByte(), "Stream Byte Value: b");
+ Assert.AreEqual(c, message.ReadChar(), "Stream Char Value: c");
+ Assert.AreEqual(d, message.ReadInt16(), "Stream Int16 Value: d");
+ Assert.AreEqual(e, message.ReadInt32(), "Stream Int32 Value: e");
+ Assert.AreEqual(f, message.ReadInt64(), "Stream Int64 Value: f");
+ Assert.AreEqual(g, message.ReadString(), "Stream String Value: g");
+ Assert.AreEqual(h, message.ReadBoolean(), "Stream Boolean Value: h");
+ Assert.AreEqual(i, message.ReadByte(), "Stream Byte Value: i");
+ Assert.AreEqual(j, message.ReadInt16(), "Stream Int16 Value: j");
+ Assert.AreEqual(k, message.ReadInt32(), "Stream Int32 Value: k");
+ Assert.AreEqual(l, message.ReadInt64(), "Stream Int64 Value: l");
+ Assert.AreEqual(m, message.ReadSingle(), "Stream Single Value: m");
+ Assert.AreEqual(n, message.ReadDouble(), "Stream Double Value: n");
+ }
+ }
+ }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs
------------------------------------------------------------------------------
svn:eol-style = native