Re: ProtocolParser Advice

View: New views
5 Messages — Rating Filter:   Alert me  

Re: ProtocolParser Advice

by John ROM :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,
just found some bugs in my parser.
here is a better version:

package util;

import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.util.ByteBufferFactory;
import com.sun.grizzly.util.WorkerThread;

import java.nio.ByteBuffer;



/**
 * ProtocolParser Filter which parses a Custom Protocol
 * into a MessageParser. These IncomingMessages can then be used
 * by other Protocolchain Filters
 *
 * So basicly implement abstract method
  * "public boolean parseHeader(ByteBuffer bb)"  of
  * IncommingMessage and retrieve these message up by another above Filter in chain.
  *
 * The Messages can be arbitrary long.
 * The Protocol has the following format
 * (n params + Data Size  + Data):

 * n bytes : n number of header params
 * ----------------------------------------------
 * 4 bytes Data length  - The length of Data
 * n bytes Data         - Message
 *

 * The Protocol asumes that on average Messages are not too large (Gigs) otherwise
 * this implementation might not be effective.

 * Tries to avoid bytearray copying and therfore maps the IncomingMessageeee onto
 * the orginal Grizzly Bytebuffer.  If Messages do not fit into an default sized Grizzly
 * ByteBuffer a larger one  with enough capacity  is temporarly given to the framework.
 *
 * This class is still under construction  and for example Error Handling,
 * Timeouts, Recovery has to be greatly improved!!!
 * John Vieten
 */


public abstract class MyProtocolParser implements ProtocolParser<MyProtocolParser.MessageParser> {
    private boolean startBufferExecuted = false;
    private boolean modeWithoutReadFilter;


    protected MyProtocolParser(boolean modeWithoutReadFilter) {
        this.state = modeWithoutReadFilter ? State.HAS_MORE_BYTES_TO_PARSE : State.START;
        this.modeWithoutReadFilter = modeWithoutReadFilter;
    }

    public static final int DEFAULT_BUFFER_SIZE = 8192;
    /**
     * Handle to byteBuffer which gets filled in by grizzly
     */
    private ByteBuffer grizzlyBuffer;
    /**
     * If Message are larger than DEFAULT_BUFFER_SIZE  a new larger Grizzly ByteBuffer is created
     * This handle is used to give Grizzly back its original default ByteBuffer .
     */
    private ByteBuffer restoreHandle;

    /**
     * The length of the data depicted by Protocol Header
     */
    private int dataLength = 0;

    /**
     * Just dataLength +  Header Length
     */
    private int messageLength = 0;
    /*
      Keeps track of the bytes read by the Grizzly ReadFilter
    */
    private int trackBytesRead = 0;


    /**
     * Keeps track of the start position of the current message in the Grizzly ByteBuffer.
     * Needed because an Grizzly ByteBuffer can contain several Messages at once
     * The messageStart always reflects the start Boundary of the current Message
     * And is either zero or set when a message was fully parsed and following one is still
     * in the buffer
     */
    int messageStart = 0;

    /**
     * Holds the state of this Statemachine
     */
    private State state = null;

    /**
     * The Message given to other Filters up the chain
     */

    private MessageParser messageParser;

    public abstract MessageParser newInstance();

    public static enum State {
        START,
        EXPECTING_MORE_HEADER_BYTES,
        EXPECTING_MORE_DATA_BYTES,
        HAS_MORE_BYTES_TO_PARSE,
        MEASSAGE_PARSED,
        MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE,
    }


    public boolean isExpectingMoreData() {
        boolean result = state == State.EXPECTING_MORE_DATA_BYTES ||
                state == State.EXPECTING_MORE_HEADER_BYTES;

        return result;

    }


    public boolean hasMoreBytesToParse() {
        return state == State.HAS_MORE_BYTES_TO_PARSE;
    }

    public MessageParser getNextMessage() {

        switch (state) {
            case MEASSAGE_PARSED:
                resetState();
                break;
            case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
                state = State.HAS_MORE_BYTES_TO_PARSE;

        }
        MessageParser tmp = messageParser;
        messageParser.unattach();
        messageParser = null;
        return tmp;
    }


    public boolean hasNextMessage() {
        if (!startBufferExecuted) {
            startBuffer(((WorkerThread) Thread.currentThread()).getByteBuffer());
        }
        parseMessage();
        switch (state) {
            case MEASSAGE_PARSED:
            case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
                return true;
            default:
                return false;
        }
    }


    public void parseMessage() {
        switch (state) {
            case MEASSAGE_PARSED:
            case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
                return;
        }


        switch (state) {

            case HAS_MORE_BYTES_TO_PARSE:

            case START:
                messageParser = newInstance();

            case EXPECTING_MORE_HEADER_BYTES:
                if ((trackBytesRead - messageStart) < messageParser.getHeaderLength()) {
                    state = State.EXPECTING_MORE_HEADER_BYTES;
                    grizzlyBuffer.position(messageStart);
                     // doesn't happen often so we can compact
                    grizzlyBuffer.compact();
                    trackBytesRead=trackBytesRead-messageStart;
                    messageStart=0;
                    grizzlyBuffer.position(trackBytesRead);
                    return;
                }
                grizzlyBuffer.position(messageStart);
                try {
                    // expects a position messageStart
                    boolean isKMProtocol = messageParser.parseHeader(grizzlyBuffer);
                    if (!isKMProtocol) {
                        messageParser.setException(new Exception("Bad Startmark"));
                        state = State.MEASSAGE_PARSED;
                        return;

                    }
                } catch (Exception e) {
                    // System.out.println(e.getMessage());
                    messageParser.setException(e);
                    state = State.MEASSAGE_PARSED;
                    return;
                }

                dataLength = messageParser.getDataSize();
                messageLength = dataLength + MyIncomingMessage.HEADER_LENGTH;

                // position should by at data start
                if ((grizzlyBuffer.capacity() - grizzlyBuffer.position()) < dataLength) {
                    // Not enough room to store message so we need a bigger buffer and also some extra room for
                    // any new message chunks

                    // maybe I should do a compact to use a smaller buffer here but I am not sure about performance

                    int newCapacity = messageLength + grizzlyBuffer.position() + DEFAULT_BUFFER_SIZE;
                    grizzlyBuffer.position(0);
                    ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
                    newBuffer.put(grizzlyBuffer);
                    WorkerThread workerThread = (WorkerThread) Thread.currentThread();
                    workerThread.setByteBuffer(grizzlyBuffer = newBuffer);

                }

                // Now create a sliced copy of GrizzlyBuffer so that
                // we have start and end pointers on our wanted message
                grizzlyBuffer.position(MyIncomingMessage.HEADER_LENGTH + messageStart);


                grizzlyBuffer.limit(messageLength + messageStart);

                ByteBuffer slicedHandle = grizzlyBuffer.slice();
                grizzlyBuffer.limit(grizzlyBuffer.capacity());
                // Now Buffer has enough space for message

                messageParser.setByteBuffer(slicedHandle);


            case EXPECTING_MORE_DATA_BYTES:
                int remaining = trackBytesRead - messageLength - messageStart;
                if (remaining == 0) {
                    // Ok we found a message
                    state = State.MEASSAGE_PARSED;

                } else if (remaining > 0) {

                    state = State.MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE;
                    messageStart = messageStart + messageLength;


                } else {
                    // we have to keep on ready bytes from the net
                    state = State.EXPECTING_MORE_DATA_BYTES;
                    grizzlyBuffer.position(trackBytesRead);



                }
        }

    }

    public boolean releaseBuffer() {
        boolean saveCurrentParserState = false;
        if (isExpectingMoreData()) {
            saveCurrentParserState = true;
        } else {
            if (restoreHandle == null) {

                Exception e = new Exception();
                e.printStackTrace();
                return false;
            }
            WorkerThread workerThread = (WorkerThread) Thread.currentThread();
            restoreHandle.clear();

            workerThread.setByteBuffer(restoreHandle);
            grizzlyBuffer = null;
            restoreHandle = null;
        }
        return saveCurrentParserState;

    }


    public void startBuffer(ByteBuffer bb) {
        restoreHandle = bb;
        // reflects the actual physically read byte by ReadFilter
        trackBytesRead = bb.position();


        grizzlyBuffer = bb;
        if (!isExpectingMoreData()) {
            grizzlyBuffer.position(0);
            resetState();
        }
        startBufferExecuted = true;
    }


    private void resetState() {
        state = State.START;
        dataLength = 0;
        messageLength = 0;
        messageStart = 0;
    }

     /**
     * Used with ProtocolParser to parse Messages and
     * fill an byteBuffer with the message data.
     */
    public abstract class MessageParser {

        private int dataSize;
        private ByteBuffer byteBuffer;

        public abstract boolean parseHeader(ByteBuffer bb) throws Exception;

        private byte[] bytes;



        final public ByteBuffer getByteBuffer() {
            return byteBuffer;
        }

        final public byte[] toByteArray() {
            if(hasException()) return new byte[0];
            return bytes;
        }


        final public void setByteBuffer(ByteBuffer byteBuffer) {
           // System.out.println("in s"+this.hashCode()+" "+(byteBuffer!=null));
            this.byteBuffer = byteBuffer;
        }

        public abstract int getHeaderLength();

        final public void setDataSize(int dataSize) {
            this.dataSize = dataSize;
        }

        final public int getDataSize() {
            return dataSize;
        }

        public byte[] getBytes() {
            return bytes;
        }

        public void setBytes(byte[] bytes) {
            this.bytes = bytes;
        }



        private Exception exception;

        final public Exception getException() {
            return exception;
        }

        final public void setException(Exception exception) {
            this.exception = exception;

        }

        final public boolean hasException() {
            return exception != null;
        }

        final public void unattach() {

            //System.out.println("in u"+this.hashCode()+" "+(byteBuffer!=null));
            if(hasException()) return;

            bytes = new byte[dataSize];
            byteBuffer.get(bytes);
            byteBuffer.clear();
            byteBuffer=null;

        }



    }
    public class MyIncomingMessage extends MessageParser {
    final static int HEADER_LENGTH=12;
    int START_MARK = 0x77335434;
    private boolean statusOk;



    public boolean parseHeader(ByteBuffer bb) throws Exception {


        int startmark = bb.getInt();
        int status = bb.getInt();

        int size = bb.getInt();
        if (startmark !=START_MARK) {
            return false;

        }
        statusOk =status== 1;

        setDataSize(size);

        if (getDataSize() <= 0) throw new Exception("Bad Message Length");
        return true;

    }



    public int getHeaderLength() {
        return HEADER_LENGTH;
    }


    public boolean isStatusOk() {
        return statusOk;
    }
}

}
--
Ist Ihr Browser Vista-kompatibel? Jetzt die neuesten
Browser-Versionen downloaden: http://www.gmx.net/de/go/browser

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@...
For additional commands, e-mail: users-help@...


Re: ProtocolParser Advice

by Jeanfrancois Arcand-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Salut,

would you mind signing and faxing this

https://glassfish.dev.java.net/public/GovernancePolicy.html#SCA_Policy

agreement? This way I can add your code to the Grizzly distribution (as
a tutorial or if you can document the entire code, we might add it
directly to Grizzly framework). Shipping a documented version of this
class will makes peoples life easy.

We can also create a new modules calles grizzly-extra where we can put
those classes. What peoples thinks?

Thanks!!!!

-- Jeanfrancois

John ROM wrote:

> Hi,
> just found some bugs in my parser.
> here is a better version:
>
> package util;
>
> import com.sun.grizzly.ProtocolParser;
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
>
> import java.nio.ByteBuffer;
>
>
>
> /**
>  * ProtocolParser Filter which parses a Custom Protocol
>  * into a MessageParser. These IncomingMessages can then be used
>  * by other Protocolchain Filters
>  *
>  * So basicly implement abstract method
>   * "public boolean parseHeader(ByteBuffer bb)"  of
>   * IncommingMessage and retrieve these message up by another above Filter in chain.
>   *
>  * The Messages can be arbitrary long.
>  * The Protocol has the following format
>  * (n params + Data Size  + Data):
>
>  * n bytes : n number of header params
>  * ----------------------------------------------
>  * 4 bytes Data length  - The length of Data
>  * n bytes Data         - Message
>  *
>
>  * The Protocol asumes that on average Messages are not too large (Gigs) otherwise
>  * this implementation might not be effective.
>
>  * Tries to avoid bytearray copying and therfore maps the IncomingMessageeee onto
>  * the orginal Grizzly Bytebuffer.  If Messages do not fit into an default sized Grizzly
>  * ByteBuffer a larger one  with enough capacity  is temporarly given to the framework.
>  *
>  * This class is still under construction  and for example Error Handling,
>  * Timeouts, Recovery has to be greatly improved!!!
>  * John Vieten
>  */
>
>
> public abstract class MyProtocolParser implements ProtocolParser<MyProtocolParser.MessageParser> {
>     private boolean startBufferExecuted = false;
>     private boolean modeWithoutReadFilter;
>
>
>     protected MyProtocolParser(boolean modeWithoutReadFilter) {
>         this.state = modeWithoutReadFilter ? State.HAS_MORE_BYTES_TO_PARSE : State.START;
>         this.modeWithoutReadFilter = modeWithoutReadFilter;
>     }
>
>     public static final int DEFAULT_BUFFER_SIZE = 8192;
>     /**
>      * Handle to byteBuffer which gets filled in by grizzly
>      */
>     private ByteBuffer grizzlyBuffer;
>     /**
>      * If Message are larger than DEFAULT_BUFFER_SIZE  a new larger Grizzly ByteBuffer is created
>      * This handle is used to give Grizzly back its original default ByteBuffer .
>      */
>     private ByteBuffer restoreHandle;
>
>     /**
>      * The length of the data depicted by Protocol Header
>      */
>     private int dataLength = 0;
>
>     /**
>      * Just dataLength +  Header Length
>      */
>     private int messageLength = 0;
>     /*
>       Keeps track of the bytes read by the Grizzly ReadFilter
>     */
>     private int trackBytesRead = 0;
>
>
>     /**
>      * Keeps track of the start position of the current message in the Grizzly ByteBuffer.
>      * Needed because an Grizzly ByteBuffer can contain several Messages at once
>      * The messageStart always reflects the start Boundary of the current Message
>      * And is either zero or set when a message was fully parsed and following one is still
>      * in the buffer
>      */
>     int messageStart = 0;
>
>     /**
>      * Holds the state of this Statemachine
>      */
>     private State state = null;
>
>     /**
>      * The Message given to other Filters up the chain
>      */
>
>     private MessageParser messageParser;
>
>     public abstract MessageParser newInstance();
>
>     public static enum State {
>         START,
>         EXPECTING_MORE_HEADER_BYTES,
>         EXPECTING_MORE_DATA_BYTES,
>         HAS_MORE_BYTES_TO_PARSE,
>         MEASSAGE_PARSED,
>         MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE,
>     }
>
>
>     public boolean isExpectingMoreData() {
>         boolean result = state == State.EXPECTING_MORE_DATA_BYTES ||
>                 state == State.EXPECTING_MORE_HEADER_BYTES;
>
>         return result;
>
>     }
>
>
>     public boolean hasMoreBytesToParse() {
>         return state == State.HAS_MORE_BYTES_TO_PARSE;
>     }
>
>     public MessageParser getNextMessage() {
>
>         switch (state) {
>             case MEASSAGE_PARSED:
>                 resetState();
>                 break;
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 state = State.HAS_MORE_BYTES_TO_PARSE;
>
>         }
>         MessageParser tmp = messageParser;
>         messageParser.unattach();
>         messageParser = null;
>         return tmp;
>     }
>
>
>     public boolean hasNextMessage() {
>         if (!startBufferExecuted) {
>             startBuffer(((WorkerThread) Thread.currentThread()).getByteBuffer());
>         }
>         parseMessage();
>         switch (state) {
>             case MEASSAGE_PARSED:
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 return true;
>             default:
>                 return false;
>         }
>     }
>
>
>     public void parseMessage() {
>         switch (state) {
>             case MEASSAGE_PARSED:
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 return;
>         }
>
>
>         switch (state) {
>
>             case HAS_MORE_BYTES_TO_PARSE:
>
>             case START:
>                 messageParser = newInstance();
>
>             case EXPECTING_MORE_HEADER_BYTES:
>                 if ((trackBytesRead - messageStart) < messageParser.getHeaderLength()) {
>                     state = State.EXPECTING_MORE_HEADER_BYTES;
>                     grizzlyBuffer.position(messageStart);
>                      // doesn't happen often so we can compact
>                     grizzlyBuffer.compact();
>                     trackBytesRead=trackBytesRead-messageStart;
>                     messageStart=0;
>                     grizzlyBuffer.position(trackBytesRead);
>                     return;
>                 }
>                 grizzlyBuffer.position(messageStart);
>                 try {
>                     // expects a position messageStart
>                     boolean isKMProtocol = messageParser.parseHeader(grizzlyBuffer);
>                     if (!isKMProtocol) {
>                         messageParser.setException(new Exception("Bad Startmark"));
>                         state = State.MEASSAGE_PARSED;
>                         return;
>
>                     }
>                 } catch (Exception e) {
>                     // System.out.println(e.getMessage());
>                     messageParser.setException(e);
>                     state = State.MEASSAGE_PARSED;
>                     return;
>                 }
>
>                 dataLength = messageParser.getDataSize();
>                 messageLength = dataLength + MyIncomingMessage.HEADER_LENGTH;
>
>                 // position should by at data start
>                 if ((grizzlyBuffer.capacity() - grizzlyBuffer.position()) < dataLength) {
>                     // Not enough room to store message so we need a bigger buffer and also some extra room for
>                     // any new message chunks
>
>                     // maybe I should do a compact to use a smaller buffer here but I am not sure about performance
>
>                     int newCapacity = messageLength + grizzlyBuffer.position() + DEFAULT_BUFFER_SIZE;
>                     grizzlyBuffer.position(0);
>                     ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
>                     newBuffer.put(grizzlyBuffer);
>                     WorkerThread workerThread = (WorkerThread) Thread.currentThread();
>                     workerThread.setByteBuffer(grizzlyBuffer = newBuffer);
>
>                 }
>
>                 // Now create a sliced copy of GrizzlyBuffer so that
>                 // we have start and end pointers on our wanted message
>                 grizzlyBuffer.position(MyIncomingMessage.HEADER_LENGTH + messageStart);
>
>
>                 grizzlyBuffer.limit(messageLength + messageStart);
>
>                 ByteBuffer slicedHandle = grizzlyBuffer.slice();
>                 grizzlyBuffer.limit(grizzlyBuffer.capacity());
>                 // Now Buffer has enough space for message
>
>                 messageParser.setByteBuffer(slicedHandle);
>
>
>             case EXPECTING_MORE_DATA_BYTES:
>                 int remaining = trackBytesRead - messageLength - messageStart;
>                 if (remaining == 0) {
>                     // Ok we found a message
>                     state = State.MEASSAGE_PARSED;
>
>                 } else if (remaining > 0) {
>
>                     state = State.MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE;
>                     messageStart = messageStart + messageLength;
>
>
>                 } else {
>                     // we have to keep on ready bytes from the net
>                     state = State.EXPECTING_MORE_DATA_BYTES;
>                     grizzlyBuffer.position(trackBytesRead);
>
>
>
>                 }
>         }
>
>     }
>
>     public boolean releaseBuffer() {
>         boolean saveCurrentParserState = false;
>         if (isExpectingMoreData()) {
>             saveCurrentParserState = true;
>         } else {
>             if (restoreHandle == null) {
>
>                 Exception e = new Exception();
>                 e.printStackTrace();
>                 return false;
>             }
>             WorkerThread workerThread = (WorkerThread) Thread.currentThread();
>             restoreHandle.clear();
>
>             workerThread.setByteBuffer(restoreHandle);
>             grizzlyBuffer = null;
>             restoreHandle = null;
>         }
>         return saveCurrentParserState;
>
>     }
>
>
>     public void startBuffer(ByteBuffer bb) {
>         restoreHandle = bb;
>         // reflects the actual physically read byte by ReadFilter
>         trackBytesRead = bb.position();
>
>
>         grizzlyBuffer = bb;
>         if (!isExpectingMoreData()) {
>             grizzlyBuffer.position(0);
>             resetState();
>         }
>         startBufferExecuted = true;
>     }
>
>
>     private void resetState() {
>         state = State.START;
>         dataLength = 0;
>         messageLength = 0;
>         messageStart = 0;
>     }
>
>      /**
>      * Used with ProtocolParser to parse Messages and
>      * fill an byteBuffer with the message data.
>      */
>     public abstract class MessageParser {
>
>         private int dataSize;
>         private ByteBuffer byteBuffer;
>
>         public abstract boolean parseHeader(ByteBuffer bb) throws Exception;
>
>         private byte[] bytes;
>
>
>
>         final public ByteBuffer getByteBuffer() {
>             return byteBuffer;
>         }
>
>         final public byte[] toByteArray() {
>             if(hasException()) return new byte[0];
>             return bytes;
>         }
>
>
>         final public void setByteBuffer(ByteBuffer byteBuffer) {
>            // System.out.println("in s"+this.hashCode()+" "+(byteBuffer!=null));
>             this.byteBuffer = byteBuffer;
>         }
>
>         public abstract int getHeaderLength();
>
>         final public void setDataSize(int dataSize) {
>             this.dataSize = dataSize;
>         }
>
>         final public int getDataSize() {
>             return dataSize;
>         }
>
>         public byte[] getBytes() {
>             return bytes;
>         }
>
>         public void setBytes(byte[] bytes) {
>             this.bytes = bytes;
>         }
>
>
>
>         private Exception exception;
>
>         final public Exception getException() {
>             return exception;
>         }
>
>         final public void setException(Exception exception) {
>             this.exception = exception;
>
>         }
>
>         final public boolean hasException() {
>             return exception != null;
>         }
>
>         final public void unattach() {
>
>             //System.out.println("in u"+this.hashCode()+" "+(byteBuffer!=null));
>             if(hasException()) return;
>
>             bytes = new byte[dataSize];
>             byteBuffer.get(bytes);
>             byteBuffer.clear();
>             byteBuffer=null;
>
>         }
>
>
>
>     }
>     public class MyIncomingMessage extends MessageParser {
>     final static int HEADER_LENGTH=12;
>     int START_MARK = 0x77335434;
>     private boolean statusOk;
>
>
>
>     public boolean parseHeader(ByteBuffer bb) throws Exception {
>
>
>         int startmark = bb.getInt();
>         int status = bb.getInt();
>
>         int size = bb.getInt();
>         if (startmark !=START_MARK) {
>             return false;
>
>         }
>         statusOk =status== 1;
>
>         setDataSize(size);
>
>         if (getDataSize() <= 0) throw new Exception("Bad Message Length");
>         return true;
>
>     }
>
>
>
>     public int getHeaderLength() {
>         return HEADER_LENGTH;
>     }
>
>
>     public boolean isStatusOk() {
>         return statusOk;
>     }
> }
>
> }

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@...
For additional commands, e-mail: users-help@...


Re: ProtocolParser Advice

by Jeanfrancois Arcand-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Salut,

it took a while, but I've added you class under the samples directory:

https://grizzly.dev.java.net/issues/show_bug.cgi?id=157

Next steps is to add documentation (and blogs if you can :-)) and
evaluate if we want to add it to:

+ the core classes

or

+ a new module called grizzly-extra

What peoples's think?

A+

-- Jeanfrancois

John ROM wrote:

> Hi,
> just found some bugs in my parser.
> here is a better version:
>
> package util;
>
> import com.sun.grizzly.ProtocolParser;
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
>
> import java.nio.ByteBuffer;
>
>
>
> /**
>  * ProtocolParser Filter which parses a Custom Protocol
>  * into a MessageParser. These IncomingMessages can then be used
>  * by other Protocolchain Filters
>  *
>  * So basicly implement abstract method
>   * "public boolean parseHeader(ByteBuffer bb)"  of
>   * IncommingMessage and retrieve these message up by another above Filter in chain.
>   *
>  * The Messages can be arbitrary long.
>  * The Protocol has the following format
>  * (n params + Data Size  + Data):
>
>  * n bytes : n number of header params
>  * ----------------------------------------------
>  * 4 bytes Data length  - The length of Data
>  * n bytes Data         - Message
>  *
>
>  * The Protocol asumes that on average Messages are not too large (Gigs) otherwise
>  * this implementation might not be effective.
>
>  * Tries to avoid bytearray copying and therfore maps the IncomingMessageeee onto
>  * the orginal Grizzly Bytebuffer.  If Messages do not fit into an default sized Grizzly
>  * ByteBuffer a larger one  with enough capacity  is temporarly given to the framework.
>  *
>  * This class is still under construction  and for example Error Handling,
>  * Timeouts, Recovery has to be greatly improved!!!
>  * John Vieten
>  */
>
>
> public abstract class MyProtocolParser implements ProtocolParser<MyProtocolParser.MessageParser> {
>     private boolean startBufferExecuted = false;
>     private boolean modeWithoutReadFilter;
>
>
>     protected MyProtocolParser(boolean modeWithoutReadFilter) {
>         this.state = modeWithoutReadFilter ? State.HAS_MORE_BYTES_TO_PARSE : State.START;
>         this.modeWithoutReadFilter = modeWithoutReadFilter;
>     }
>
>     public static final int DEFAULT_BUFFER_SIZE = 8192;
>     /**
>      * Handle to byteBuffer which gets filled in by grizzly
>      */
>     private ByteBuffer grizzlyBuffer;
>     /**
>      * If Message are larger than DEFAULT_BUFFER_SIZE  a new larger Grizzly ByteBuffer is created
>      * This handle is used to give Grizzly back its original default ByteBuffer .
>      */
>     private ByteBuffer restoreHandle;
>
>     /**
>      * The length of the data depicted by Protocol Header
>      */
>     private int dataLength = 0;
>
>     /**
>      * Just dataLength +  Header Length
>      */
>     private int messageLength = 0;
>     /*
>       Keeps track of the bytes read by the Grizzly ReadFilter
>     */
>     private int trackBytesRead = 0;
>
>
>     /**
>      * Keeps track of the start position of the current message in the Grizzly ByteBuffer.
>      * Needed because an Grizzly ByteBuffer can contain several Messages at once
>      * The messageStart always reflects the start Boundary of the current Message
>      * And is either zero or set when a message was fully parsed and following one is still
>      * in the buffer
>      */
>     int messageStart = 0;
>
>     /**
>      * Holds the state of this Statemachine
>      */
>     private State state = null;
>
>     /**
>      * The Message given to other Filters up the chain
>      */
>
>     private MessageParser messageParser;
>
>     public abstract MessageParser newInstance();
>
>     public static enum State {
>         START,
>         EXPECTING_MORE_HEADER_BYTES,
>         EXPECTING_MORE_DATA_BYTES,
>         HAS_MORE_BYTES_TO_PARSE,
>         MEASSAGE_PARSED,
>         MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE,
>     }
>
>
>     public boolean isExpectingMoreData() {
>         boolean result = state == State.EXPECTING_MORE_DATA_BYTES ||
>                 state == State.EXPECTING_MORE_HEADER_BYTES;
>
>         return result;
>
>     }
>
>
>     public boolean hasMoreBytesToParse() {
>         return state == State.HAS_MORE_BYTES_TO_PARSE;
>     }
>
>     public MessageParser getNextMessage() {
>
>         switch (state) {
>             case MEASSAGE_PARSED:
>                 resetState();
>                 break;
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 state = State.HAS_MORE_BYTES_TO_PARSE;
>
>         }
>         MessageParser tmp = messageParser;
>         messageParser.unattach();
>         messageParser = null;
>         return tmp;
>     }
>
>
>     public boolean hasNextMessage() {
>         if (!startBufferExecuted) {
>             startBuffer(((WorkerThread) Thread.currentThread()).getByteBuffer());
>         }
>         parseMessage();
>         switch (state) {
>             case MEASSAGE_PARSED:
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 return true;
>             default:
>                 return false;
>         }
>     }
>
>
>     public void parseMessage() {
>         switch (state) {
>             case MEASSAGE_PARSED:
>             case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
>                 return;
>         }
>
>
>         switch (state) {
>
>             case HAS_MORE_BYTES_TO_PARSE:
>
>             case START:
>                 messageParser = newInstance();
>
>             case EXPECTING_MORE_HEADER_BYTES:
>                 if ((trackBytesRead - messageStart) < messageParser.getHeaderLength()) {
>                     state = State.EXPECTING_MORE_HEADER_BYTES;
>                     grizzlyBuffer.position(messageStart);
>                      // doesn't happen often so we can compact
>                     grizzlyBuffer.compact();
>                     trackBytesRead=trackBytesRead-messageStart;
>                     messageStart=0;
>                     grizzlyBuffer.position(trackBytesRead);
>                     return;
>                 }
>                 grizzlyBuffer.position(messageStart);
>                 try {
>                     // expects a position messageStart
>                     boolean isKMProtocol = messageParser.parseHeader(grizzlyBuffer);
>                     if (!isKMProtocol) {
>                         messageParser.setException(new Exception("Bad Startmark"));
>                         state = State.MEASSAGE_PARSED;
>                         return;
>
>                     }
>                 } catch (Exception e) {
>                     // System.out.println(e.getMessage());
>                     messageParser.setException(e);
>                     state = State.MEASSAGE_PARSED;
>                     return;
>                 }
>
>                 dataLength = messageParser.getDataSize();
>                 messageLength = dataLength + MyIncomingMessage.HEADER_LENGTH;
>
>                 // position should by at data start
>                 if ((grizzlyBuffer.capacity() - grizzlyBuffer.position()) < dataLength) {
>                     // Not enough room to store message so we need a bigger buffer and also some extra room for
>                     // any new message chunks
>
>                     // maybe I should do a compact to use a smaller buffer here but I am not sure about performance
>
>                     int newCapacity = messageLength + grizzlyBuffer.position() + DEFAULT_BUFFER_SIZE;
>                     grizzlyBuffer.position(0);
>                     ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
>                     newBuffer.put(grizzlyBuffer);
>                     WorkerThread workerThread = (WorkerThread) Thread.currentThread();
>                     workerThread.setByteBuffer(grizzlyBuffer = newBuffer);
>
>                 }
>
>                 // Now create a sliced copy of GrizzlyBuffer so that
>                 // we have start and end pointers on our wanted message
>                 grizzlyBuffer.position(MyIncomingMessage.HEADER_LENGTH + messageStart);
>
>
>                 grizzlyBuffer.limit(messageLength + messageStart);
>
>                 ByteBuffer slicedHandle = grizzlyBuffer.slice();
>                 grizzlyBuffer.limit(grizzlyBuffer.capacity());
>                 // Now Buffer has enough space for message
>
>                 messageParser.setByteBuffer(slicedHandle);
>
>
>             case EXPECTING_MORE_DATA_BYTES:
>                 int remaining = trackBytesRead - messageLength - messageStart;
>                 if (remaining == 0) {
>                     // Ok we found a message
>                     state = State.MEASSAGE_PARSED;
>
>                 } else if (remaining > 0) {
>
>                     state = State.MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE;
>                     messageStart = messageStart + messageLength;
>
>
>                 } else {
>                     // we have to keep on ready bytes from the net
>                     state = State.EXPECTING_MORE_DATA_BYTES;
>                     grizzlyBuffer.position(trackBytesRead);
>
>
>
>                 }
>         }
>
>     }
>
>     public boolean releaseBuffer() {
>         boolean saveCurrentParserState = false;
>         if (isExpectingMoreData()) {
>             saveCurrentParserState = true;
>         } else {
>             if (restoreHandle == null) {
>
>                 Exception e = new Exception();
>                 e.printStackTrace();
>                 return false;
>             }
>             WorkerThread workerThread = (WorkerThread) Thread.currentThread();
>             restoreHandle.clear();
>
>             workerThread.setByteBuffer(restoreHandle);
>             grizzlyBuffer = null;
>             restoreHandle = null;
>         }
>         return saveCurrentParserState;
>
>     }
>
>
>     public void startBuffer(ByteBuffer bb) {
>         restoreHandle = bb;
>         // reflects the actual physically read byte by ReadFilter
>         trackBytesRead = bb.position();
>
>
>         grizzlyBuffer = bb;
>         if (!isExpectingMoreData()) {
>             grizzlyBuffer.position(0);
>             resetState();
>         }
>         startBufferExecuted = true;
>     }
>
>
>     private void resetState() {
>         state = State.START;
>         dataLength = 0;
>         messageLength = 0;
>         messageStart = 0;
>     }
>
>      /**
>      * Used with ProtocolParser to parse Messages and
>      * fill an byteBuffer with the message data.
>      */
>     public abstract class MessageParser {
>
>         private int dataSize;
>         private ByteBuffer byteBuffer;
>
>         public abstract boolean parseHeader(ByteBuffer bb) throws Exception;
>
>         private byte[] bytes;
>
>
>
>         final public ByteBuffer getByteBuffer() {
>             return byteBuffer;
>         }
>
>         final public byte[] toByteArray() {
>             if(hasException()) return new byte[0];
>             return bytes;
>         }
>
>
>         final public void setByteBuffer(ByteBuffer byteBuffer) {
>            // System.out.println("in s"+this.hashCode()+" "+(byteBuffer!=null));
>             this.byteBuffer = byteBuffer;
>         }
>
>         public abstract int getHeaderLength();
>
>         final public void setDataSize(int dataSize) {
>             this.dataSize = dataSize;
>         }
>
>         final public int getDataSize() {
>             return dataSize;
>         }
>
>         public byte[] getBytes() {
>             return bytes;
>         }
>
>         public void setBytes(byte[] bytes) {
>             this.bytes = bytes;
>         }
>
>
>
>         private Exception exception;
>
>         final public Exception getException() {
>             return exception;
>         }
>
>         final public void setException(Exception exception) {
>             this.exception = exception;
>
>         }
>
>         final public boolean hasException() {
>             return exception != null;
>         }
>
>         final public void unattach() {
>
>             //System.out.println("in u"+this.hashCode()+" "+(byteBuffer!=null));
>             if(hasException()) return;
>
>             bytes = new byte[dataSize];
>             byteBuffer.get(bytes);
>             byteBuffer.clear();
>             byteBuffer=null;
>
>         }
>
>
>
>     }
>     public class MyIncomingMessage extends MessageParser {
>     final static int HEADER_LENGTH=12;
>     int START_MARK = 0x77335434;
>     private boolean statusOk;
>
>
>
>     public boolean parseHeader(ByteBuffer bb) throws Exception {
>
>
>         int startmark = bb.getInt();
>         int status = bb.getInt();
>
>         int size = bb.getInt();
>         if (startmark !=START_MARK) {
>             return false;
>
>         }
>         statusOk =status== 1;
>
>         setDataSize(size);
>
>         if (getDataSize() <= 0) throw new Exception("Bad Message Length");
>         return true;
>
>     }
>
>
>
>     public int getHeaderLength() {
>         return HEADER_LENGTH;
>     }
>
>
>     public boolean isStatusOk() {
>         return statusOk;
>     }
> }
>
> }

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@...
For additional commands, e-mail: users-help@...


Parent Message unknown Re: ProtocolParser Advice

by John ROM :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi Jeanfrancois Arcand
Wow I actually didn't expect you to be so fast (8-:

I have a big bug in line 209
ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());

this line should be replaced with :

 ByteBuffer newBuffer = ByteBufferFactory.allocate(ByteBufferFactory.ByteBufferType.HEAP,newCapacity);


So in future what is the best way for me to change something in the published code.

For example I want to change the javadocs and also how the bytbuffer is treated...

Many Greetings John






--
Psssst! Schon vom neuen GMX MultiMessenger gehört?
Der kann`s mit allen: http://www.gmx.net/de/go/multimessenger

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@...
For additional commands, e-mail: users-help@...


Re: ProtocolParser Advice

by Jeanfrancois Arcand-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi John,

John ROM wrote:

> Hi Jeanfrancois Arcand
> Wow I actually didn't expect you to be so fast (8-:
>
> I have a big bug in line 209
> ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
>
> this line should be replaced with :
>
>  ByteBuffer newBuffer = ByteBufferFactory.allocate(ByteBufferFactory.ByteBufferType.HEAP,newCapacity);
>
>
> So in future what is the best way for me to change something in the published code.
>
> For example I want to change the javadocs and also how the bytbuffer is treated...

Just submit patch by doing:

svn diff > patch.txt

Since you are more and more involved with the community, a couple a
patches and I can propose you as a commiter :-) That way you can changes
things as you want :-)

A+

-- Jeanfrancois



>
> Many Greetings John
>
>
>
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@...
For additional commands, e-mail: users-help@...