1: <?php
2:
3: 4: 5: 6: 7: 8: 9: 10: 11: 12:
13:
14:
15: include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
16:
17: 18: 19: 20: 21: 22:
23: if (!function_exists('apc_fetch')) {
24: function apc_fetch($key) { return FALSE; }
25: function apc_store($key, $var, $ttl=0) { return FALSE; }
26: }
27:
28: 29: 30: 31: 32: 33: 34:
35: class TSocketPool extends TSocket {
36:
37: 38: 39:
40: private $servers_ = array();
41:
42: 43: 44: 45: 46:
47: private $numRetries_ = 1;
48:
49: 50: 51: 52: 53: 54:
55: private $retryInterval_ = 60;
56:
57: 58: 59: 60: 61:
62: private $maxConsecutiveFailures_ = 1;
63:
64: 65: 66: 67: 68:
69: private $randomize_ = TRUE;
70:
71: 72: 73: 74: 75:
76: private $alwaysTryLast_ = TRUE;
77:
78: 79: 80: 81: 82: 83: 84: 85:
86: public function __construct($hosts=array('localhost'),
87: $ports=array(9090),
88: $persist=FALSE,
89: $debugHandler=null) {
90: parent::__construct(null, 0, $persist, $debugHandler);
91:
92: if (!is_array($ports)) {
93: $port = $ports;
94: $ports = array();
95: foreach ($hosts as $key => $val) {
96: $ports[$key] = $port;
97: }
98: }
99:
100: foreach ($hosts as $key => $host) {
101: $this->servers_ []= array('host' => $host,
102: 'port' => $ports[$key]);
103: }
104: }
105:
106: 107: 108: 109: 110: 111: 112: 113:
114: public function addServer($host, $port) {
115: $this->servers_[] = array('host' => $host, 'port' => $port);
116: }
117:
118: 119: 120: 121: 122:
123: public function setNumRetries($numRetries) {
124: $this->numRetries_ = $numRetries;
125: }
126:
127: 128: 129: 130: 131:
132: public function setRetryInterval($retryInterval) {
133: $this->retryInterval_ = $retryInterval;
134: }
135:
136: 137: 138: 139: 140:
141: public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
142: $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
143: }
144:
145: 146: 147: 148: 149:
150: public function setRandomize($randomize) {
151: $this->randomize_ = $randomize;
152: }
153:
154: 155: 156: 157: 158:
159: public function setAlwaysTryLast($alwaysTryLast) {
160: $this->alwaysTryLast_ = $alwaysTryLast;
161: }
162:
163:
164: 165: 166: 167:
168: public function open() {
169:
170: if ($this->randomize_) {
171: shuffle($this->servers_);
172: }
173:
174:
175: $numServers = count($this->servers_);
176:
177: for ($i = 0; $i < $numServers; ++$i) {
178:
179:
180: extract($this->servers_[$i]);
181:
182:
183: $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
184:
185:
186: $lastFailtime = apc_fetch($failtimeKey);
187: if ($lastFailtime === FALSE) {
188: $lastFailtime = 0;
189: }
190:
191: $retryIntervalPassed = FALSE;
192:
193:
194: if ($lastFailtime > 0) {
195: $elapsed = time() - $lastFailtime;
196: if ($elapsed > $this->retryInterval_) {
197: $retryIntervalPassed = TRUE;
198: if ($this->debug_) {
199: call_user_func($this->debugHandler_,
200: 'TSocketPool: retryInterval '.
201: '('.$this->retryInterval_.') '.
202: 'has passed for host '.$host.':'.$port);
203: }
204: }
205: }
206:
207:
208:
209: $isLastServer = FALSE;
210: if ($this->alwaysTryLast_) {
211: $isLastServer = ($i == ($numServers - 1));
212: }
213:
214: if (($lastFailtime === 0) ||
215: ($isLastServer) ||
216: ($lastFailtime > 0 && $retryIntervalPassed)) {
217:
218:
219: $this->host_ = $host;
220: $this->port_ = $port;
221:
222:
223: for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
224: try {
225:
226: parent::open();
227:
228:
229: if ($lastFailtime > 0) {
230: apc_store($failtimeKey, 0);
231: }
232:
233:
234: return;
235:
236: } catch (TException $tx) {
237:
238: }
239: }
240:
241:
242: $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
243:
244:
245: $consecfails = apc_fetch($consecfailsKey);
246: if ($consecfails === FALSE) {
247: $consecfails = 0;
248: }
249:
250:
251: $consecfails++;
252:
253:
254: if ($consecfails >= $this->maxConsecutiveFailures_) {
255: if ($this->debug_) {
256: call_user_func($this->debugHandler_,
257: 'TSocketPool: marking '.$host.':'.$port.
258: ' as down for '.$this->retryInterval_.' secs '.
259: 'after '.$consecfails.' failed attempts.');
260: }
261:
262: apc_store($failtimeKey, time());
263:
264:
265: apc_store($consecfailsKey, 0);
266: } else {
267: apc_store($consecfailsKey, $consecfails);
268: }
269: }
270: }
271:
272:
273: $error = 'TSocketPool: All hosts in pool are down. ';
274: $hosts = array();
275: foreach ($this->servers_ as $server) {
276: $hosts []= $server['host'].':'.$server['port'];
277: }
278: $hostlist = implode(',', $hosts);
279: $error .= '('.$hostlist.')';
280: if ($this->debug_) {
281: call_user_func($this->debugHandler_, $error);
282: }
283: throw new TException($error);
284: }
285: }
286:
287: ?>
288: