gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

rpc.go (5831B)


      1 // Copyright (c) 2012-2020 Ugorji Nwoke. All rights reserved.
      2 // Use of this source code is governed by a MIT license found in the LICENSE file.
      3 
      4 package codec
      5 
      6 import (
      7 	"bufio"
      8 	"errors"
      9 	"io"
     10 	"net/rpc"
     11 )
     12 
     13 var (
     14 	errRpcIsClosed = errors.New("rpc - connection has been closed")
     15 	errRpcNoConn   = errors.New("rpc - no connection")
     16 
     17 	rpcSpaceArr = [1]byte{' '}
     18 )
     19 
     20 // Rpc provides a rpc Server or Client Codec for rpc communication.
     21 type Rpc interface {
     22 	ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
     23 	ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
     24 }
     25 
     26 // RPCOptions holds options specific to rpc functionality
     27 type RPCOptions struct {
     28 	// RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
     29 	//
     30 	// Set RPCNoBuffer=true to turn buffering off.
     31 	// Buffering can still be done if buffered connections are passed in, or
     32 	// buffering is configured on the handle.
     33 	RPCNoBuffer bool
     34 }
     35 
     36 // rpcCodec defines the struct members and common methods.
     37 type rpcCodec struct {
     38 	c io.Closer
     39 	r io.Reader
     40 	w io.Writer
     41 	f ioFlusher
     42 
     43 	dec *Decoder
     44 	enc *Encoder
     45 	h   Handle
     46 
     47 	cls atomicClsErr
     48 }
     49 
     50 func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
     51 	return newRPCCodec2(conn, conn, conn, h)
     52 }
     53 
     54 func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
     55 	bh := h.getBasicHandle()
     56 	// if the writer can flush, ensure we leverage it, else
     57 	// we may hang waiting on read if write isn't flushed.
     58 	// var f ioFlusher
     59 	f, ok := w.(ioFlusher)
     60 	if !bh.RPCNoBuffer {
     61 		if bh.WriterBufferSize <= 0 {
     62 			if !ok { // a flusher means there's already a buffer
     63 				bw := bufio.NewWriter(w)
     64 				f, w = bw, bw
     65 			}
     66 		}
     67 		if bh.ReaderBufferSize <= 0 {
     68 			if _, ok = w.(ioBuffered); !ok {
     69 				r = bufio.NewReader(r)
     70 			}
     71 		}
     72 	}
     73 	return rpcCodec{
     74 		c:   c,
     75 		w:   w,
     76 		r:   r,
     77 		f:   f,
     78 		h:   h,
     79 		enc: NewEncoder(w, h),
     80 		dec: NewDecoder(r, h),
     81 	}
     82 }
     83 
     84 func (c *rpcCodec) write(obj ...interface{}) (err error) {
     85 	err = c.ready()
     86 	if err != nil {
     87 		return
     88 	}
     89 	if c.f != nil {
     90 		defer func() {
     91 			flushErr := c.f.Flush()
     92 			if flushErr != nil && err == nil {
     93 				err = flushErr
     94 			}
     95 		}()
     96 	}
     97 
     98 	for _, o := range obj {
     99 		err = c.enc.Encode(o)
    100 		if err != nil {
    101 			return
    102 		}
    103 		// defensive: ensure a space is always written after each encoding,
    104 		// in case the value was a number, and encoding a value right after
    105 		// without a space will lead to invalid output.
    106 		if c.h.isJson() {
    107 			_, err = c.w.Write(rpcSpaceArr[:])
    108 			if err != nil {
    109 				return
    110 			}
    111 		}
    112 	}
    113 	return
    114 }
    115 
    116 func (c *rpcCodec) read(obj interface{}) (err error) {
    117 	err = c.ready()
    118 	if err == nil {
    119 		//If nil is passed in, we should read and discard
    120 		if obj == nil {
    121 			// return c.dec.Decode(&obj)
    122 			err = c.dec.swallowErr()
    123 		} else {
    124 			err = c.dec.Decode(obj)
    125 		}
    126 	}
    127 	return
    128 }
    129 
    130 func (c *rpcCodec) Close() (err error) {
    131 	if c.c != nil {
    132 		cls := c.cls.load()
    133 		if !cls.closed {
    134 			cls.err = c.c.Close()
    135 			cls.closed = true
    136 			c.cls.store(cls)
    137 		}
    138 		err = cls.err
    139 	}
    140 	return
    141 }
    142 
    143 func (c *rpcCodec) ready() (err error) {
    144 	if c.c == nil {
    145 		err = errRpcNoConn
    146 	} else {
    147 		cls := c.cls.load()
    148 		if cls.closed {
    149 			if err = cls.err; err == nil {
    150 				err = errRpcIsClosed
    151 			}
    152 		}
    153 	}
    154 	return
    155 }
    156 
    157 func (c *rpcCodec) ReadResponseBody(body interface{}) error {
    158 	return c.read(body)
    159 }
    160 
    161 // -------------------------------------
    162 
    163 type goRpcCodec struct {
    164 	rpcCodec
    165 }
    166 
    167 func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
    168 	return c.write(r, body)
    169 }
    170 
    171 func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
    172 	return c.write(r, body)
    173 }
    174 
    175 func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
    176 	return c.read(r)
    177 }
    178 
    179 func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
    180 	return c.read(r)
    181 }
    182 
    183 func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
    184 	return c.read(body)
    185 }
    186 
    187 // -------------------------------------
    188 
    189 // goRpc is the implementation of Rpc that uses the communication protocol
    190 // as defined in net/rpc package.
    191 type goRpc struct{}
    192 
    193 // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
    194 //
    195 // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
    196 //
    197 // For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
    198 // This ensures we use an adequate buffer during reading and writing.
    199 // If not configured, we will internally initialize and use a buffer during reads and writes.
    200 // This can be turned off via the RPCNoBuffer option on the Handle.
    201 //
    202 //	var handle codec.JsonHandle
    203 //	handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
    204 //
    205 // Example 1: one way of configuring buffering explicitly:
    206 //
    207 //	var handle codec.JsonHandle // codec handle
    208 //	handle.ReaderBufferSize = 1024
    209 //	handle.WriterBufferSize = 1024
    210 //	var conn io.ReadWriteCloser // connection got from a socket
    211 //	var serverCodec = GoRpc.ServerCodec(conn, handle)
    212 //	var clientCodec = GoRpc.ClientCodec(conn, handle)
    213 //
    214 // Example 2: you can also explicitly create a buffered connection yourself,
    215 // and not worry about configuring the buffer sizes in the Handle.
    216 //
    217 //	var handle codec.Handle     // codec handle
    218 //	var conn io.ReadWriteCloser // connection got from a socket
    219 //	var bufconn = struct {      // bufconn here is a buffered io.ReadWriteCloser
    220 //	    io.Closer
    221 //	    *bufio.Reader
    222 //	    *bufio.Writer
    223 //	}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
    224 //	var serverCodec = GoRpc.ServerCodec(bufconn, handle)
    225 //	var clientCodec = GoRpc.ClientCodec(bufconn, handle)
    226 var GoRpc goRpc
    227 
    228 func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
    229 	return &goRpcCodec{newRPCCodec(conn, h)}
    230 }
    231 
    232 func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
    233 	return &goRpcCodec{newRPCCodec(conn, h)}
    234 }