rpc.go (5831B)
1 // Copyright (c) 2012-2020 Ugorji Nwoke. All rights reserved. 2 // Use of this source code is governed by a MIT license found in the LICENSE file. 3 4 package codec 5 6 import ( 7 "bufio" 8 "errors" 9 "io" 10 "net/rpc" 11 ) 12 13 var ( 14 errRpcIsClosed = errors.New("rpc - connection has been closed") 15 errRpcNoConn = errors.New("rpc - no connection") 16 17 rpcSpaceArr = [1]byte{' '} 18 ) 19 20 // Rpc provides a rpc Server or Client Codec for rpc communication. 21 type Rpc interface { 22 ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec 23 ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec 24 } 25 26 // RPCOptions holds options specific to rpc functionality 27 type RPCOptions struct { 28 // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls. 29 // 30 // Set RPCNoBuffer=true to turn buffering off. 31 // Buffering can still be done if buffered connections are passed in, or 32 // buffering is configured on the handle. 33 RPCNoBuffer bool 34 } 35 36 // rpcCodec defines the struct members and common methods. 37 type rpcCodec struct { 38 c io.Closer 39 r io.Reader 40 w io.Writer 41 f ioFlusher 42 43 dec *Decoder 44 enc *Encoder 45 h Handle 46 47 cls atomicClsErr 48 } 49 50 func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec { 51 return newRPCCodec2(conn, conn, conn, h) 52 } 53 54 func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec { 55 bh := h.getBasicHandle() 56 // if the writer can flush, ensure we leverage it, else 57 // we may hang waiting on read if write isn't flushed. 58 // var f ioFlusher 59 f, ok := w.(ioFlusher) 60 if !bh.RPCNoBuffer { 61 if bh.WriterBufferSize <= 0 { 62 if !ok { // a flusher means there's already a buffer 63 bw := bufio.NewWriter(w) 64 f, w = bw, bw 65 } 66 } 67 if bh.ReaderBufferSize <= 0 { 68 if _, ok = w.(ioBuffered); !ok { 69 r = bufio.NewReader(r) 70 } 71 } 72 } 73 return rpcCodec{ 74 c: c, 75 w: w, 76 r: r, 77 f: f, 78 h: h, 79 enc: NewEncoder(w, h), 80 dec: NewDecoder(r, h), 81 } 82 } 83 84 func (c *rpcCodec) write(obj ...interface{}) (err error) { 85 err = c.ready() 86 if err != nil { 87 return 88 } 89 if c.f != nil { 90 defer func() { 91 flushErr := c.f.Flush() 92 if flushErr != nil && err == nil { 93 err = flushErr 94 } 95 }() 96 } 97 98 for _, o := range obj { 99 err = c.enc.Encode(o) 100 if err != nil { 101 return 102 } 103 // defensive: ensure a space is always written after each encoding, 104 // in case the value was a number, and encoding a value right after 105 // without a space will lead to invalid output. 106 if c.h.isJson() { 107 _, err = c.w.Write(rpcSpaceArr[:]) 108 if err != nil { 109 return 110 } 111 } 112 } 113 return 114 } 115 116 func (c *rpcCodec) read(obj interface{}) (err error) { 117 err = c.ready() 118 if err == nil { 119 //If nil is passed in, we should read and discard 120 if obj == nil { 121 // return c.dec.Decode(&obj) 122 err = c.dec.swallowErr() 123 } else { 124 err = c.dec.Decode(obj) 125 } 126 } 127 return 128 } 129 130 func (c *rpcCodec) Close() (err error) { 131 if c.c != nil { 132 cls := c.cls.load() 133 if !cls.closed { 134 cls.err = c.c.Close() 135 cls.closed = true 136 c.cls.store(cls) 137 } 138 err = cls.err 139 } 140 return 141 } 142 143 func (c *rpcCodec) ready() (err error) { 144 if c.c == nil { 145 err = errRpcNoConn 146 } else { 147 cls := c.cls.load() 148 if cls.closed { 149 if err = cls.err; err == nil { 150 err = errRpcIsClosed 151 } 152 } 153 } 154 return 155 } 156 157 func (c *rpcCodec) ReadResponseBody(body interface{}) error { 158 return c.read(body) 159 } 160 161 // ------------------------------------- 162 163 type goRpcCodec struct { 164 rpcCodec 165 } 166 167 func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error { 168 return c.write(r, body) 169 } 170 171 func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error { 172 return c.write(r, body) 173 } 174 175 func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error { 176 return c.read(r) 177 } 178 179 func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error { 180 return c.read(r) 181 } 182 183 func (c *goRpcCodec) ReadRequestBody(body interface{}) error { 184 return c.read(body) 185 } 186 187 // ------------------------------------- 188 189 // goRpc is the implementation of Rpc that uses the communication protocol 190 // as defined in net/rpc package. 191 type goRpc struct{} 192 193 // GoRpc implements Rpc using the communication protocol defined in net/rpc package. 194 // 195 // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered. 196 // 197 // For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle. 198 // This ensures we use an adequate buffer during reading and writing. 199 // If not configured, we will internally initialize and use a buffer during reads and writes. 200 // This can be turned off via the RPCNoBuffer option on the Handle. 201 // 202 // var handle codec.JsonHandle 203 // handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer 204 // 205 // Example 1: one way of configuring buffering explicitly: 206 // 207 // var handle codec.JsonHandle // codec handle 208 // handle.ReaderBufferSize = 1024 209 // handle.WriterBufferSize = 1024 210 // var conn io.ReadWriteCloser // connection got from a socket 211 // var serverCodec = GoRpc.ServerCodec(conn, handle) 212 // var clientCodec = GoRpc.ClientCodec(conn, handle) 213 // 214 // Example 2: you can also explicitly create a buffered connection yourself, 215 // and not worry about configuring the buffer sizes in the Handle. 216 // 217 // var handle codec.Handle // codec handle 218 // var conn io.ReadWriteCloser // connection got from a socket 219 // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser 220 // io.Closer 221 // *bufio.Reader 222 // *bufio.Writer 223 // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)} 224 // var serverCodec = GoRpc.ServerCodec(bufconn, handle) 225 // var clientCodec = GoRpc.ClientCodec(bufconn, handle) 226 var GoRpc goRpc 227 228 func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec { 229 return &goRpcCodec{newRPCCodec(conn, h)} 230 } 231 232 func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec { 233 return &goRpcCodec{newRPCCodec(conn, h)} 234 }