« Return to Thread: Re: ProtocolParser Advice

Re: ProtocolParser Advice

by Jeanfrancois Arcand-2 :: Rate this Message:

Reply to Author | View in Thread

Salut,

would you mind signing and faxing this

https://glassfish.dev.java.net/public/GovernancePolicy.html#SCA_Policy

agreement? This way I can add your code to the Grizzly distribution (as
a tutorial or if you can document the entire code, we might add it
directly to Grizzly framework). Shipping a documented version of this
class will makes peoples life easy.

We can also create a new modules calles grizzly-extra where we can put
those classes. What peoples thinks?

Thanks!!!!

-- Jeanfrancois

John ROM wrote:

> Hi,
> just found some bugs in my parser.
> here is a better version:
>
> package util;
>
> import com.sun.grizzly.ProtocolParser;
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
>
> import java.nio.ByteBuffer;
>
>
>
> /**
>  * ProtocolParser Filter which parses a Custom Protocol
>  * into a MessageParser. These IncomingMessages can then be used
>  * by other Protocolchain Filters
>  *
>  * So basicly implement abstract method
>   * "public boolean parseHeader(ByteBuffer bb)"  of
>   * IncommingMessage and retrieve these message up by another above Filter in chain.
>   *
>  * The Messages can be arbitrary long.
>  * The Protocol has the following format
>  * (n params + Data Size  + Data):
>
>  * n bytes : n number of header params
>  * ----------------------------------------------
>  * 4 bytes Data length  - The length of Data
>  * n bytes Data         - Message
>  *
>
>  * The Protocol asumes that on average Messages are not too large (Gigs) otherwise
>  * this implementation might not be effective.
>
>  * Tries to avoid bytearray copying and therfore maps the IncomingMessageeee onto
>  * the orginal Grizzly Bytebuffer.  If Messages do not fit into an default sized Grizzly
>  * ByteBuffer a larger one  with enough capacity  is temporarly given to the framework.
>  *
>  * This class is still under construction  and for example Error Handling,
>  * Timeouts, Recovery has to be greatly improved!!!
>  * John Vieten
>  */
>
>
> public abstract class MyProtocolParser implements ProtocolParser<MyProtocolParser.MessageParser> {
>     private boolean startBufferExecuted = false;
>     private boolean modeWithoutReadFilter;
>
>
>     protected MyProtocolParser(boolean modeWithoutReadFilter) {
>         this.state = modeWithoutReadFilter ? State.HAS_MORE_BYTES_TO_PARSE : State.START;
>         this.modeWithoutReadFilter = modeWithoutReadFilter;
>     }
>
>     public static final int DEFAULT_BUFFER_SIZE = 8192;
>     /**
>      * Handle to byteBuffer which gets filled in by grizzly
>      */
>     private ByteBuffer grizzlyBuffer;
>     /**
>      * If Message are larger than DEFAULT_BUFFER_SIZE  a new larger Grizzly ByteBuffer is created
>      * This handle is used to give Grizzly back its original default ByteBuffer .
>      */
>     private ByteBuffer restoreHandle;
>
>     /**
>      * The length of the data depicted by Protocol Header
>      */
>     private int dataLength = 0;
>
>     /**
>      * Just dataLength +  Header Length
>      */
>     private int messageLength = 0;
>     /*
>       Keeps track of the bytes read by the Grizzly ReadFilter
>     */
>     private int trackBytesRead = 0;
>
>
>     /**
>      * Keeps track of the start position of the current message in the Grizzly ByteBuffer.
>      * Needed because an Grizzly ByteBuffer can contain several Messages at once
>      * The messageStart always reflects the start Boundary of the current Message
>      * And is either zero or set when a message was fully parsed and following one is still
>      * in the buffer
>      */
>     int messageStart = 0;
>
>     /**
>      * Holds the state of this Statemachine
>      */
>     private State state = null;
>
>     /**
>      * The Message given to other Filters up the chain
>      */
>
>     private MessageParser messageParser;
>
>     public abstract MessageParser newInstance();
>
>     public static enum State {
>         START,
>         EXPECTING_MORE_HEADER_BYTES,
>         EXPECTING_MORE_DATA_BYTES,
>         HAS_MORE_BYTES_TO_PARSE,
>         MEASSAGE_PARSED,
>         MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE,
>     }
>
>
>     public boolean isExpectingMoreData() {
>         boolean result = state == State.EXPECTING_MORE_DATA_BYTES ||
>                 state == State.EXPECTING_MORE_HEADER_BYTES;
>
>         return result;
>
>     }
>
>
>     public boolean hasMoreBytesToParse() {
>         return state == State.HAS_MORE_BYTES_TO_PARSE;
>     }
>
>     public MessageParser getNextMessage() {
>
>         switch (state) {
>             case MEASSAGE_PARSED:
>                 resetState();
>                 break;
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 state = State.HAS_MORE_BYTES_TO_PARSE;
>
>         }
>         MessageParser tmp = messageParser;
>         messageParser.unattach();
>         messageParser = null;
>         return tmp;
>     }
>
>
>     public boolean hasNextMessage() {
>         if (!startBufferExecuted) {
>             startBuffer(((WorkerThread) Thread.currentThread()).getByteBuffer());
>         }
>         parseMessage();
>         switch (state) {
>             case MEASSAGE_PARSED:
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 return true;
>             default:
>                 return false;
>         }
>     }
>
>
>     public void parseMessage() {
>         switch (state) {
>             case MEASSAGE_PARSED:
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 return;
>         }
>
>
>         switch (state) {
>
>             case HAS_MORE_BYTES_TO_PARSE:
>
>             case START:
>                 messageParser = newInstance();
>
>             case EXPECTING_MORE_HEADER_BYTES:
>                 if ((trackBytesRead - messageStart) < messageParser.getHeaderLength()) {
>                     state = State.EXPECTING_MORE_HEADER_BYTES;
>                     grizzlyBuffer.position(messageStart);
>                      // doesn't happen often so we can compact
>                     grizzlyBuffer.compact();
>                     trackBytesRead=trackBytesRead-messageStart;
>                     messageStart=0;
>                     grizzlyBuffer.position(trackBytesRead);
>                     return;
>                 }
>                 grizzlyBuffer.position(messageStart);
>                 try {
>                     // expects a position messageStart
>                     boolean isKMProtocol = messageParser.parseHeader(grizzlyBuffer);
>                     if (!isKMProtocol) {
>                         messageParser.setException(new Exception("Bad Startmark"));
>                         state = State.MEASSAGE_PARSED;
>                         return;
>
>                     }
>                 } catch (Exception e) {
>                     // System.out.println(e.getMessage());
>                     messageParser.setException(e);
>                     state = State.MEASSAGE_PARSED;
>                     return;
>                 }
>
>                 dataLength = messageParser.getDataSize();
>                 messageLength = dataLength + MyIncomingMessage.HEADER_LENGTH;
>
>                 // position should by at data start
>                 if ((grizzlyBuffer.capacity() - grizzlyBuffer.position()) < dataLength) {
>                     // Not enough room to store message so we need a bigger buffer and also some extra room for
>                     // any new message chunks
>
>                     // maybe I should do a compact to use a smaller buffer here but I am not sure about performance
>
>                     int newCapacity = messageLength + grizzlyBuffer.position() + DEFAULT_BUFFER_SIZE;
>                     grizzlyBuffer.position(0);
>                     ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
>                     newBuffer.put(grizzlyBuffer);
>                     WorkerThread workerThread = (WorkerThread) Thread.currentThread();
>                     workerThread.setByteBuffer(grizzlyBuffer = newBuffer);
>
>                 }
>
>                 // Now create a sliced copy of GrizzlyBuffer so that
>                 // we have start and end pointers on our wanted message
>                 grizzlyBuffer.position(MyIncomingMessage.HEADER_LENGTH + messageStart);
>
>
>                 grizzlyBuffer.limit(messageLength + messageStart);
>
>                 ByteBuffer slicedHandle = grizzlyBuffer.slice();
>                 grizzlyBuffer.limit(grizzlyBuffer.capacity());
>                 // Now Buffer has enough space for message
>
>                 messageParser.setByteBuffer(slicedHandle);
>
>
>             case EXPECTING_MORE_DATA_BYTES:
>                 int remaining = trackBytesRead - messageLength - messageStart;
>                 if (remaining == 0) {
>                     // Ok we found a message
>                     state = State.MEASSAGE_PARSED;
>
>                 } else if (remaining > 0) {
>
>                     state = State.MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE;
>                     messageStart = messageStart + messageLength;
>
>
>                 } else {
>                     // we have to keep on ready bytes from the net
>                     state = State.EXPECTING_MORE_DATA_BYTES;
>                     grizzlyBuffer.position(trackBytesRead);
>
>
>
>                 }
>         }
>
>     }
>
>     public boolean releaseBuffer() {
>         boolean saveCurrentParserState = false;
>         if (isExpectingMoreData()) {
>             saveCurrentParserState = true;
>         } else {
>             if (restoreHandle == null) {
>
>                 Exception e = new Exception();
>                 e.printStackTrace();
>                 return false;
>             }
>             WorkerThread workerThread = (WorkerThread) Thread.currentThread();
>             restoreHandle.clear();
>
>             workerThread.setByteBuffer(restoreHandle);
>             grizzlyBuffer = null;
>             restoreHandle = null;
>         }
>         return saveCurrentParserState;
>
>     }
>
>
>     public void startBuffer(ByteBuffer bb) {
>         restoreHandle = bb;
>         // reflects the actual physically read byte by ReadFilter
>         trackBytesRead = bb.position();
>
>
>         grizzlyBuffer = bb;
>         if (!isExpectingMoreData()) {
>             grizzlyBuffer.position(0);
>             resetState();
>         }
>         startBufferExecuted = true;
>     }
>
>
>     private void resetState() {
>         state = State.START;
>         dataLength = 0;
>         messageLength = 0;
>         messageStart = 0;
>     }
>
>      /**
>      * Used with ProtocolParser to parse Messages and
>      * fill an byteBuffer with the message data.
>      */
>     public abstract class MessageParser {
>
>         private int dataSize;
>         private ByteBuffer byteBuffer;
>
>         public abstract boolean parseHeader(ByteBuffer bb) throws Exception;
>
>         private byte[] bytes;
>
>
>
>         final public ByteBuffer getByteBuffer() {
>             return byteBuffer;
>         }
>
>         final public byte[] toByteArray() {
>             if(hasException()) return new byte[0];
>             return bytes;
>         }
>
>
>         final public void setByteBuffer(ByteBuffer byteBuffer) {
>            // System.out.println("in s"+this.hashCode()+" "+(byteBuffer!=null));
>             this.byteBuffer = byteBuffer;
>         }
>
>         public abstract int getHeaderLength();
>
>         final public void setDataSize(int dataSize) {
>             this.dataSize = dataSize;
>         }
>
>         final public int getDataSize() {
>             return dataSize;
>         }
>
>         public byte[] getBytes() {
>             return bytes;
>         }
>
>         public void setBytes(byte[] bytes) {
>             this.bytes = bytes;
>         }
>
>
>
>         private Exception exception;
>
>         final public Exception getException() {
>             return exception;
>         }
>
>         final public void setException(Exception exception) {
>             this.exception = exception;
>
>         }
>
>         final public boolean hasException() {
>             return exception != null;
>         }
>
>         final public void unattach() {
>
>             //System.out.println("in u"+this.hashCode()+" "+(byteBuffer!=null));
>             if(hasException()) return;
>
>             bytes = new byte[dataSize];
>             byteBuffer.get(bytes);
>             byteBuffer.clear();
>             byteBuffer=null;
>
>         }
>
>
>
>     }
>     public class MyIncomingMessage extends MessageParser {
>     final static int HEADER_LENGTH=12;
>     int START_MARK = 0x77335434;
>     private boolean statusOk;
>
>
>
>     public boolean parseHeader(ByteBuffer bb) throws Exception {
>
>
>         int startmark = bb.getInt();
>         int status = bb.getInt();
>
>         int size = bb.getInt();
>         if (startmark !=START_MARK) {
>             return false;
>
>         }
>         statusOk =status== 1;
>
>         setDataSize(size);
>
>         if (getDataSize() <= 0) throw new Exception("Bad Message Length");
>         return true;
>
>     }
>
>
>
>     public int getHeaderLength() {
>         return HEADER_LENGTH;
>     }
>
>
>     public boolean isStatusOk() {
>         return statusOk;
>     }
> }
>
> }

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@...
For additional commands, e-mail: users-help@...

 « Return to Thread: Re: ProtocolParser Advice