*** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700 --- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700 *************** *** 36,41 **** --- 36,55 ---- #endif /* + * The (arbitrary) maximum number of outgoing messages we're willing to hold, on + * a queue per connection, waiting for TCP buffer space to become available in + * the kernel. Rather than exceeding this limit, we simply discard additional + * messages (since this is always allowed by the replication protocol). + * As a special dispensation, if a message is destined for a specific remote + * site (i.e., it's not a broadcast), then we first try blocking the sending + * thread, waiting for space to become available (though we only wait a limited + * time). This is so as to be able to handle the immediate flood of (a + * potentially large number of) outgoing messages that replication generates, in + * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests. + */ + #define OUT_QUEUE_LIMIT 10 + + /* * The system value is available from sysconf(_SC_HOST_NAME_MAX). * Historically, the maximum host name was 256. */ *************** *** 47,52 **** --- 61,71 ---- #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20) typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1]; + /* Default timeout values, in seconds. */ + #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC) + #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC) + #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC) + struct __repmgr_connection; typedef struct __repmgr_connection REPMGR_CONNECTION; struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE; *************** *** 171,178 **** #ifdef DB_WIN32 WSAEVENT event_object; #endif ! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */ ! #define CONN_DEFUNCT 0x02 /* socket close pending */ u_int32_t flags; /* --- 190,198 ---- #ifdef DB_WIN32 WSAEVENT event_object; #endif ! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */ ! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */ ! #define CONN_DEFUNCT 0x04 /* socket close pending */ u_int32_t flags; /* *************** *** 180,189 **** * send() function's thread. But if TCP doesn't have enough network * buffer space for us when we first try it, we instead allocate some * memory, and copy the message, and then send it as space becomes ! * available in our main select() thread. */ OUT_Q_HEADER outbound_queue; int out_queue_length; /* * Input: while we're reading a message, we keep track of what phase --- 200,215 ---- * send() function's thread. But if TCP doesn't have enough network * buffer space for us when we first try it, we instead allocate some * memory, and copy the message, and then send it as space becomes ! * available in our main select() thread. In some cases, if the queue ! * gets too long we wait until it's drained, and then append to it. ! * This condition variable's associated mutex is the normal per-repmgr ! * db_rep->mutex, because that mutex is always held anyway whenever the ! * output queue is consulted. */ OUT_Q_HEADER outbound_queue; int out_queue_length; + cond_var_t drained; + int blockers; /* ref count of msg threads waiting on us */ /* * Input: while we're reading a message, we keep track of what phase *** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 --- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 *************** *** 1420,1425 **** --- 1420,1428 ---- #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@ #define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@ #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@ #define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@ #define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@ #define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@ *** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 --- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 *************** *** 21,30 **** int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); void __repmgr_stash_generation __P((DB_ENV *)); int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *)); int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int)); ! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); --- 21,30 ---- int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); void __repmgr_stash_generation __P((DB_ENV *)); int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int)); int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *)); ! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); *************** *** 39,44 **** --- 39,47 ---- int __repmgr_wake_waiting_senders __P((DB_ENV *)); int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *)); void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t)); + int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t)); + int __repmgr_alloc_cond __P((cond_var_t *)); + int __repmgr_free_cond __P((cond_var_t *)); int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); int __repmgr_close_sync __P((DB_ENV *)); int __repmgr_net_init __P((DB_ENV *, DB_REP *)); *** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700 --- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700 *************** *** 196,204 **** int ret; /* Set some default values. */ ! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */ ! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */ ! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */ db_rep->config_nsites = 0; db_rep->peer = DB_EID_INVALID; db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; --- 196,204 ---- int ret; /* Set some default values. */ ! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT; ! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; ! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; db_rep->config_nsites = 0; db_rep->peer = DB_EID_INVALID; db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; *************** *** 238,243 **** --- 238,244 ---- DB_ENV *dbenv; { DB_REP *db_rep; + REPMGR_CONNECTION *conn; int ret; db_rep = dbenv->rep_handle; *************** *** 254,259 **** --- 255,266 ---- if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) goto unlock; + + TAILQ_FOREACH(conn, &db_rep->connections, entries) { + if (conn->blockers > 0 && + ((ret = __repmgr_signal(&conn->drained)) != 0)) + goto unlock; + } UNLOCK_MUTEX(db_rep->mutex); return (__repmgr_wake_main_thread(dbenv)); *** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700 --- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700 *************** *** 183,192 **** /* * Acknowledges a message. - * - * !!! - * Note that this cannot be called from the select() thread, in case we call - * __repmgr_bust_connection(..., FALSE). */ static int ack_message(dbenv, generation, lsn) --- 183,188 ---- *************** *** 227,235 **** rec2.size = 0; conn = site->ref.conn; if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, ! &control2, &rec2)) == DB_REP_UNAVAIL) ! ret = __repmgr_bust_connection(dbenv, conn, FALSE); } UNLOCK_MUTEX(db_rep->mutex); --- 223,236 ---- rec2.size = 0; conn = site->ref.conn; + /* + * It's hard to imagine anyone would care about a lost ack if + * the path to the master is so congested as to need blocking; + * so pass "blockable" argument as FALSE. + */ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, ! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL) ! ret = __repmgr_bust_connection(dbenv, conn); } UNLOCK_MUTEX(db_rep->mutex); *** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700 --- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700 *************** *** 63,69 **** static void setup_sending_msg __P((struct sending_msg *, u_int, const DBT *, const DBT *)); static int __repmgr_send_internal ! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *)); static int enqueue_msg __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); static int flatten __P((DB_ENV *, struct sending_msg *)); --- 63,69 ---- static void setup_sending_msg __P((struct sending_msg *, u_int, const DBT *, const DBT *)); static int __repmgr_send_internal ! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int)); static int enqueue_msg __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); static int flatten __P((DB_ENV *, struct sending_msg *)); *************** *** 73,85 **** * __repmgr_send -- * The send function for DB_ENV->rep_set_transport. * - * !!! - * This is only ever called as the replication transport call-back, which means - * it's either on one of our message processing threads or an application - * thread. It mustn't be called from the select() thread, because we might call - * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the - * select() thread. - * * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, * PUBLIC: const DB_LSN *, int, u_int32_t)); */ --- 73,78 ---- *************** *** 126,134 **** } conn = site->ref.conn; if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, ! control, rec)) == DB_REP_UNAVAIL && ! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0) ret = t_ret; if (ret != 0) goto out; --- 119,128 ---- } conn = site->ref.conn; + /* Pass the "blockable" argument as TRUE. */ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, ! control, rec, TRUE)) == DB_REP_UNAVAIL && ! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0) ret = t_ret; if (ret != 0) goto out; *************** *** 222,228 **** if (site->state != SITE_CONNECTED) return (NULL); ! if (F_ISSET(site->ref.conn, CONN_CONNECTING)) return (NULL); return (site); } --- 216,222 ---- if (site->state != SITE_CONNECTED) return (NULL); ! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT)) return (NULL); return (site); } *************** *** 235,244 **** * * !!! * Caller must hold dbenv->mutex. - * - * !!! - * Note that this cannot be called from the select() thread, in case we call - * __repmgr_bust_connection(..., FALSE). */ static int __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp) --- 229,234 ---- *************** *** 268,281 **** !IS_VALID_EID(conn->eid)) continue; ! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) { site = SITE_FROM_EID(conn->eid); nsites++; if (site->priority > 0) npeers++; } else if (ret == DB_REP_UNAVAIL) { ! if ((ret = __repmgr_bust_connection( ! dbenv, conn, FALSE)) != 0) return (ret); } else return (ret); --- 258,277 ---- !IS_VALID_EID(conn->eid)) continue; ! /* ! * Broadcast messages are either application threads committing ! * transactions, or replication status message that we can ! * afford to lose. So don't allow blocking for them (pass ! * "blockable" argument as FALSE). ! */ ! if ((ret = __repmgr_send_internal(dbenv, ! conn, &msg, FALSE)) == 0) { site = SITE_FROM_EID(conn->eid); nsites++; if (site->priority > 0) npeers++; } else if (ret == DB_REP_UNAVAIL) { ! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) return (ret); } else return (ret); *************** *** 301,339 **** * intersperse writes that are part of two single messages. * * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, ! * PUBLIC: u_int, const DBT *, const DBT *)); */ int ! __repmgr_send_one(dbenv, conn, msg_type, control, rec) DB_ENV *dbenv; REPMGR_CONNECTION *conn; u_int msg_type; const DBT *control, *rec; { struct sending_msg msg; setup_sending_msg(&msg, msg_type, control, rec); ! return (__repmgr_send_internal(dbenv, conn, &msg)); } /* * Attempts a "best effort" to send a message on the given site. If there is an ! * excessive backlog of message already queued on the connection, we simply drop ! * this message, and still return 0 even in this case. */ static int ! __repmgr_send_internal(dbenv, conn, msg) DB_ENV *dbenv; REPMGR_CONNECTION *conn; struct sending_msg *msg; { ! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */ REPMGR_IOVECS iovecs; SITE_STRING_BUFFER buffer; int ret; size_t nw; size_t total_written; DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); if (!STAILQ_EMPTY(&conn->outbound_queue)) { /* --- 297,355 ---- * intersperse writes that are part of two single messages. * * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, ! * PUBLIC: u_int, const DBT *, const DBT *, int)); */ int ! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable) DB_ENV *dbenv; REPMGR_CONNECTION *conn; u_int msg_type; const DBT *control, *rec; + int blockable; { struct sending_msg msg; setup_sending_msg(&msg, msg_type, control, rec); ! return (__repmgr_send_internal(dbenv, conn, &msg, blockable)); } /* * Attempts a "best effort" to send a message on the given site. If there is an ! * excessive backlog of message already queued on the connection, what shall we ! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of ! * time) for the queue to drain. Otherwise we'll simply drop the message. This ! * is always allowed by the replication protocol. But in the case of a ! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we ! * almost always get a flood of messages that instantly fills our queue, so ! * blocking improves performance (by avoiding the need for the client to ! * re-request). ! * ! * How long shall we wait? We could of course create a new timeout ! * configuration type, so that the application could set it directly. But that ! * would start to overwhelm the user with too many choices to think about. We ! * already have an ACK timeout, which is the user's estimate of how long it ! * should take to send a message to the client, have it be processed, and return ! * a message back to us. We multiply that by the queue size, because that's how ! * many messages have to be swallowed up by the client before we're able to ! * start sending again (at least to a rough approximation). */ static int ! __repmgr_send_internal(dbenv, conn, msg, blockable) DB_ENV *dbenv; REPMGR_CONNECTION *conn; struct sending_msg *msg; + int blockable; { ! DB_REP *db_rep; REPMGR_IOVECS iovecs; SITE_STRING_BUFFER buffer; + db_timeout_t drain_to; int ret; size_t nw; size_t total_written; + db_rep = dbenv->rep_handle; + DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); if (!STAILQ_EMPTY(&conn->outbound_queue)) { /* *************** *** 344,358 **** RPRINT(dbenv, (dbenv, "msg to %s to be queued", __repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer))); if (conn->out_queue_length < OUT_QUEUE_LIMIT) return (enqueue_msg(dbenv, conn, msg, 0)); else { RPRINT(dbenv, (dbenv, "queue limit exceeded")); STAT(dbenv->rep_handle-> region->mstat.st_msgs_dropped++); ! return (0); } } /* * Send as much data to the site as we can, without blocking. Keep --- 360,393 ---- RPRINT(dbenv, (dbenv, "msg to %s to be queued", __repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer))); + if (conn->out_queue_length >= OUT_QUEUE_LIMIT && + blockable && !F_ISSET(conn, CONN_CONGESTED)) { + RPRINT(dbenv, (dbenv, + "block msg thread, await queue space")); + + if ((drain_to = db_rep->ack_timeout) == 0) + drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT; + conn->blockers++; + ret = __repmgr_await_drain(dbenv, + conn, drain_to * OUT_QUEUE_LIMIT); + conn->blockers--; + if (db_rep->finished) + return (DB_TIMEOUT); + if (ret != 0) + return (ret); + if (STAILQ_EMPTY(&conn->outbound_queue)) + goto empty; + } if (conn->out_queue_length < OUT_QUEUE_LIMIT) return (enqueue_msg(dbenv, conn, msg, 0)); else { RPRINT(dbenv, (dbenv, "queue limit exceeded")); STAT(dbenv->rep_handle-> region->mstat.st_msgs_dropped++); ! return (blockable ? DB_TIMEOUT : 0); } } + empty: /* * Send as much data to the site as we can, without blocking. Keep *************** *** 498,521 **** /* * Abandons a connection, to recover from an error. Upon entry the conn struct ! * must be on the connections list. ! * ! * If the 'do_close' flag is true, we do the whole job; the clean-up includes ! * removing the struct from the list and freeing all its memory, so upon return ! * the caller must not refer to it any further. Otherwise, we merely mark the ! * connection for clean-up later by the main thread. * * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, ! * PUBLIC: REPMGR_CONNECTION *, int)); * * !!! * Caller holds mutex. */ int ! __repmgr_bust_connection(dbenv, conn, do_close) DB_ENV *dbenv; REPMGR_CONNECTION *conn; - int do_close; { DB_REP *db_rep; int connecting, ret, eid; --- 533,553 ---- /* * Abandons a connection, to recover from an error. Upon entry the conn struct ! * must be on the connections list. For now, just mark it as unusable; it will ! * be fully cleaned up in the top-level select thread, as soon as possible. * * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, ! * PUBLIC: REPMGR_CONNECTION *)); * * !!! * Caller holds mutex. + * + * Must be idempotent */ int ! __repmgr_bust_connection(dbenv, conn) DB_ENV *dbenv; REPMGR_CONNECTION *conn; { DB_REP *db_rep; int connecting, ret, eid; *************** *** 526,537 **** DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); eid = conn->eid; connecting = F_ISSET(conn, CONN_CONNECTING); ! if (do_close) ! __repmgr_cleanup_connection(dbenv, conn); ! else { ! F_SET(conn, CONN_DEFUNCT); ! conn->eid = -1; ! } /* * When we first accepted the incoming connection, we set conn->eid to --- 558,566 ---- DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); eid = conn->eid; connecting = F_ISSET(conn, CONN_CONNECTING); ! ! F_SET(conn, CONN_DEFUNCT); ! conn->eid = -1; /* * When we first accepted the incoming connection, we set conn->eid to *************** *** 557,563 **** dbenv, ELECT_FAILURE_ELECTION)) != 0) return (ret); } ! } else if (!do_close) { /* * One way or another, make sure the main thread is poked, so * that we do the deferred clean-up. --- 586,592 ---- dbenv, ELECT_FAILURE_ELECTION)) != 0) return (ret); } ! } else { /* * One way or another, make sure the main thread is poked, so * that we do the deferred clean-up. *************** *** 568,577 **** } /* ! * PUBLIC: void __repmgr_cleanup_connection * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); */ ! void __repmgr_cleanup_connection(dbenv, conn) DB_ENV *dbenv; REPMGR_CONNECTION *conn; --- 597,610 ---- } /* ! * PUBLIC: int __repmgr_cleanup_connection * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); + * + * !!! + * Idempotent. This can be called repeatedly as blocking message threads (of + * which there could be multiples) wake up in case of error on the connection. */ ! int __repmgr_cleanup_connection(dbenv, conn) DB_ENV *dbenv; REPMGR_CONNECTION *conn; *************** *** 580,596 **** QUEUED_OUTPUT *out; REPMGR_FLAT *msg; DBT *dbt; db_rep = dbenv->rep_handle; ! TAILQ_REMOVE(&db_rep->connections, conn, entries); if (conn->fd != INVALID_SOCKET) { ! (void)closesocket(conn->fd); #ifdef DB_WIN32 ! (void)WSACloseEvent(conn->event_object); #endif } /* * Deallocate any input and output buffers we may have. */ --- 613,643 ---- QUEUED_OUTPUT *out; REPMGR_FLAT *msg; DBT *dbt; + int ret; db_rep = dbenv->rep_handle; ! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished); ! if (conn->fd != INVALID_SOCKET) { ! ret = closesocket(conn->fd); ! conn->fd = INVALID_SOCKET; ! if (ret == SOCKET_ERROR) { ! ret = net_errno; ! __db_err(dbenv, ret, "closing socket"); ! } #ifdef DB_WIN32 ! if (!WSACloseEvent(conn->event_object) && ret != 0) ! ret = net_errno; #endif + if (ret != 0) + return (ret); } + if (conn->blockers > 0) + return (__repmgr_signal(&conn->drained)); + + TAILQ_REMOVE(&db_rep->connections, conn, entries); /* * Deallocate any input and output buffers we may have. */ *************** *** 614,620 **** --- 661,669 ---- __os_free(dbenv, out); } + ret = __repmgr_free_cond(&conn->drained); __os_free(dbenv, conn); + return (ret); } static int *************** *** 1063,1069 **** while (!TAILQ_EMPTY(&db_rep->connections)) { conn = TAILQ_FIRST(&db_rep->connections); ! __repmgr_cleanup_connection(dbenv, conn); } for (i = 0; i < db_rep->site_cnt; i++) { --- 1112,1118 ---- while (!TAILQ_EMPTY(&db_rep->connections)) { conn = TAILQ_FIRST(&db_rep->connections); ! (void)__repmgr_cleanup_connection(dbenv, conn); } for (i = 0; i < db_rep->site_cnt; i++) { *** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700 --- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700 *************** *** 21,26 **** --- 21,28 ---- size_t __repmgr_guesstimated_max = (128 * 1024); #endif + static int __repmgr_conn_work __P((DB_ENV *, + REPMGR_CONNECTION *, fd_set *, fd_set *, int)); static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *)); /* *************** *** 189,194 **** --- 191,284 ---- } /* + * PUBLIC: int __repmgr_await_drain __P((DB_ENV *, + * PUBLIC: REPMGR_CONNECTION *, db_timeout_t)); + * + * Waits for space to become available on the connection's output queue. + * Various ways we can exit: + * + * 1. queue becomes non-full + * 2. exceed time limit + * 3. connection becomes defunct (due to error in another thread) + * 4. repmgr is shutting down + * 5. any unexpected system resource failure + * + * In cases #3 and #5 we return an error code. Caller is responsible for + * distinguishing the remaining cases if desired. + * + * !!! + * Caller must hold repmgr->mutex. + */ + int + __repmgr_await_drain(dbenv, conn, timeout) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + db_timeout_t timeout; + { + DB_REP *db_rep; + struct timespec deadline; + int ret; + + db_rep = dbenv->rep_handle; + + __repmgr_compute_wait_deadline(dbenv, &deadline, timeout); + + ret = 0; + while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { + ret = pthread_cond_timedwait(&conn->drained, + &db_rep->mutex, &deadline); + switch (ret) { + case 0: + if (db_rep->finished) + goto out; /* #4. */ + /* + * Another thread could have stumbled into an error on + * the socket while we were waiting. + */ + if (F_ISSET(conn, CONN_DEFUNCT)) { + ret = DB_REP_UNAVAIL; /* #3. */ + goto out; + } + break; + case ETIMEDOUT: + F_SET(conn, CONN_CONGESTED); + ret = 0; + goto out; /* #2. */ + default: + goto out; /* #5. */ + } + } + /* #1. */ + + out: + return (ret); + } + + /* + * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *)); + * + * Initialize a condition variable (in allocated space). + */ + int + __repmgr_alloc_cond(c) + cond_var_t *c; + { + return (pthread_cond_init(c, NULL)); + } + + /* + * PUBLIC: int __repmgr_free_cond __P((cond_var_t *)); + * + * Clean up a previously initialized condition variable. + */ + int + __repmgr_free_cond(c) + cond_var_t *c; + { + return (pthread_cond_destroy(c)); + } + + /* * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); * * Allocate/initialize all data necessary for thread synchronization. This *************** *** 443,449 **** REPMGR_RETRY *retry; db_timespec timeout; fd_set reads, writes; ! int ret, flow_control, maxfd, nready; u_int8_t buf[10]; /* arbitrary size */ flow_control = FALSE; --- 533,539 ---- REPMGR_RETRY *retry; db_timespec timeout; fd_set reads, writes; ! int ret, flow_control, maxfd; u_int8_t buf[10]; /* arbitrary size */ flow_control = FALSE; *************** *** 477,482 **** --- 567,575 ---- * each one. */ TAILQ_FOREACH(conn, &db_rep->connections, entries) { + if (F_ISSET(conn, CONN_DEFUNCT)) + continue; + if (F_ISSET(conn, CONN_CONNECTING)) { FD_SET((u_int)conn->fd, &reads); FD_SET((u_int)conn->fd, &writes); *************** *** 533,616 **** return (ret); } } - nready = ret; - LOCK_MUTEX(db_rep->mutex); - /* - * The first priority thing we must do is to clean up any - * pending defunct connections. Otherwise, if they have any - * lingering pending input, we get very confused if we try to - * process it. - * - * The TAILQ_FOREACH macro would be suitable here, except that - * it doesn't allow unlinking the current element, which is - * needed for cleanup_connection. - */ - for (conn = TAILQ_FIRST(&db_rep->connections); - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); - if (F_ISSET(conn, CONN_DEFUNCT)) - __repmgr_cleanup_connection(dbenv, conn); - } - if ((ret = __repmgr_retry_connections(dbenv)) != 0) goto out; - if (nready == 0) - continue; /* ! * Traverse the linked list. (Again, like TAILQ_FOREACH, except ! * that we need the ability to unlink an element along the way.) */ for (conn = TAILQ_FIRST(&db_rep->connections); conn != NULL; conn = next) { next = TAILQ_NEXT(conn, entries); ! if (F_ISSET(conn, CONN_CONNECTING)) { ! if (FD_ISSET((u_int)conn->fd, &reads) || ! FD_ISSET((u_int)conn->fd, &writes)) { ! if ((ret = finish_connecting(dbenv, ! conn)) == DB_REP_UNAVAIL) { ! if ((ret = ! __repmgr_bust_connection( ! dbenv, conn, TRUE)) != 0) ! goto out; ! } else if (ret != 0) ! goto out; ! } ! continue; ! } ! ! /* ! * Here, the site is connected, and the FD_SET's are ! * valid. ! */ ! if (FD_ISSET((u_int)conn->fd, &writes)) { ! if ((ret = __repmgr_write_some( ! dbenv, conn)) == DB_REP_UNAVAIL) { ! if ((ret = ! __repmgr_bust_connection(dbenv, ! conn, TRUE)) != 0) ! goto out; ! continue; ! } else if (ret != 0) ! goto out; ! } ! ! if (!flow_control && ! FD_ISSET((u_int)conn->fd, &reads)) { ! if ((ret = __repmgr_read_from_site(dbenv, conn)) ! == DB_REP_UNAVAIL) { ! if ((ret = ! __repmgr_bust_connection(dbenv, ! conn, TRUE)) != 0) ! goto out; ! continue; ! } else if (ret != 0) ! goto out; ! } } /* --- 626,650 ---- return (ret); } } LOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_retry_connections(dbenv)) != 0) goto out; /* ! * Examine each connection, to see what work needs to be done. ! * ! * The TAILQ_FOREACH macro would be suitable here, except that ! * it doesn't allow unlinking the current element, which is ! * needed for cleanup_connection. */ for (conn = TAILQ_FIRST(&db_rep->connections); conn != NULL; conn = next) { next = TAILQ_NEXT(conn, entries); ! if ((ret = __repmgr_conn_work(dbenv, ! conn, &reads, &writes, flow_control)) != 0) ! goto out; } /* *************** *** 637,642 **** --- 671,719 ---- } static int + __repmgr_conn_work(dbenv, conn, reads, writes, flow_control) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + fd_set *reads, *writes; + int flow_control; + { + int ret; + u_int fd; + + if (F_ISSET(conn, CONN_DEFUNCT)) { + /* + * Deferred clean-up, from an error that happened in another + * thread, while we were sleeping in select(). + */ + return (__repmgr_cleanup_connection(dbenv, conn)); + } + + ret = 0; + fd = (u_int)conn->fd; + + if (F_ISSET(conn, CONN_CONNECTING)) { + if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes)) + ret = finish_connecting(dbenv, conn); + } else { + /* + * Here, the site is connected, and the FD_SET's are valid. + */ + if (FD_ISSET(fd, writes)) + ret = __repmgr_write_some(dbenv, conn); + + if (ret == 0 && !flow_control && FD_ISSET(fd, reads)) + ret = __repmgr_read_from_site(dbenv, conn); + } + + if (ret == DB_REP_UNAVAIL) { + if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) + return (ret); + ret = __repmgr_cleanup_connection(dbenv, conn); + } + return (ret); + } + + static int finish_connecting(dbenv, conn) DB_ENV *dbenv; REPMGR_CONNECTION *conn; *************** *** 657,662 **** --- 734,740 ---- goto err_rpt; } + DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING)); F_CLR(conn, CONN_CONNECTING); return (__repmgr_send_handshake(dbenv, conn)); *************** *** 671,690 **** "connecting to %s", __repmgr_format_site_loc(site, buffer)); /* If we've exhausted the list of possible addresses, give up. */ ! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) return (DB_REP_UNAVAIL); /* * This is just like a little mini-"bust_connection", except that we * don't reschedule for later, 'cuz we're just about to try again right ! * now. * * !!! * Which means this must only be called on the select() thread, since * only there are we allowed to actually close a connection. */ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); ! __repmgr_cleanup_connection(dbenv, conn); ret = __repmgr_connect_site(dbenv, eid); DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); return (ret); --- 749,773 ---- "connecting to %s", __repmgr_format_site_loc(site, buffer)); /* If we've exhausted the list of possible addresses, give up. */ ! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) { ! STAT(db_rep->region->mstat.st_connect_fail++); return (DB_REP_UNAVAIL); + } /* * This is just like a little mini-"bust_connection", except that we * don't reschedule for later, 'cuz we're just about to try again right ! * now. (Note that we don't have to worry about message threads ! * blocking on a full output queue: that can't happen when we're only ! * just connecting.) * * !!! * Which means this must only be called on the select() thread, since * only there are we allowed to actually close a connection. */ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); ! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) ! return (ret); ret = __repmgr_connect_site(dbenv, eid); DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); return (ret); *** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700 --- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700 *************** *** 36,45 **** /* * PUBLIC: int __repmgr_accept __P((DB_ENV *)); - * - * !!! - * Only ever called in the select() thread, since we may call - * __repmgr_bust_connection(..., TRUE). */ int __repmgr_accept(dbenv) --- 36,41 ---- *************** *** 133,139 **** case 0: return (0); case DB_REP_UNAVAIL: ! return (__repmgr_bust_connection(dbenv, conn, TRUE)); default: return (ret); } --- 129,135 ---- case 0: return (0); case DB_REP_UNAVAIL: ! return (__repmgr_bust_connection(dbenv, conn)); default: return (ret); } *************** *** 254,263 **** * starting with the "current" element of its address list and trying as many * addresses as necessary until the list is exhausted. * - * !!! - * Only ever called in the select() thread, since we may call - * __repmgr_bust_connection(..., TRUE). - * * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid)); */ int --- 250,255 ---- *************** *** 332,338 **** case 0: break; case DB_REP_UNAVAIL: ! return (__repmgr_bust_connection(dbenv, con, TRUE)); default: return (ret); } --- 324,330 ---- case 0: break; case DB_REP_UNAVAIL: ! return (__repmgr_bust_connection(dbenv, con)); default: return (ret); } *************** *** 437,443 **** DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); ! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec)); } /* --- 429,443 ---- DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); ! /* ! * It would of course be disastrous to block the select() thread, so ! * pass the "blockable" argument as FALSE. Fortunately blocking should ! * never be necessary here, because the hand-shake is always the first ! * thing we send. Which is a good thing, because it would be almost as ! * disastrous if we allowed ourselves to drop a handshake. ! */ ! return (__repmgr_send_one(dbenv, ! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE)); } /* *************** *** 854,859 **** --- 854,872 ---- conn->out_queue_length--; if (--msg->ref_count <= 0) __os_free(dbenv, msg); + + /* + * We've achieved enough movement to free up at least + * one space in the outgoing queue. Wake any message + * threads that may be waiting for space. Clear the + * CONGESTED status so that when the queue reaches the + * high-water mark again, the filling thread will be + * allowed to try waiting again. + */ + F_CLR(conn, CONN_CONGESTED); + if (conn->blockers > 0 && + (ret = __repmgr_signal(&conn->drained)) != 0) + return (ret); } } *** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700 --- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700 *************** *** 103,108 **** --- 103,113 ---- db_rep = dbenv->rep_handle; if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0) return (ret); + if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) { + __os_free(dbenv, c); + return (ret); + } + c->blockers = 0; c->fd = s; c->flags = flags; *** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700 --- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700 *************** *** 11,16 **** --- 11,19 ---- #define __INCLUDE_NETWORKING 1 #include "db_int.h" + /* Convert time-out from microseconds to milliseconds, rounding up. */ + #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS) + typedef struct __ack_waiter { HANDLE event; const DB_LSN *lsnp; *************** *** 120,136 **** { DB_REP *db_rep; ACK_WAITER *me; ! DWORD ret; ! DWORD timeout; db_rep = dbenv->rep_handle; if ((ret = allocate_wait_slot(dbenv, &me)) != 0) goto err; - /* convert time-out from microseconds to milliseconds, rounding up */ timeout = db_rep->ack_timeout > 0 ? ! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE; me->lsnp = lsnp; if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, FALSE)) == WAIT_FAILED) { --- 123,137 ---- { DB_REP *db_rep; ACK_WAITER *me; ! DWORD ret, timeout; db_rep = dbenv->rep_handle; if ((ret = allocate_wait_slot(dbenv, &me)) != 0) goto err; timeout = db_rep->ack_timeout > 0 ? ! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE; me->lsnp = lsnp; if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, FALSE)) == WAIT_FAILED) { *************** *** 211,216 **** --- 212,296 ---- db_rep->waiters->first_free = slot; } + /* (See requirements described in repmgr_posix.c.) */ + int + __repmgr_await_drain(dbenv, conn, timeout) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + db_timeout_t timeout; + { + DB_REP *db_rep; + db_timespec deadline, delta, now; + db_timeout_t t; + DWORD duration, ret; + int round_up; + + db_rep = dbenv->rep_handle; + + __os_gettime(dbenv, &deadline); + DB_TIMEOUT_TO_TIMESPEC(timeout, &delta); + timespecadd(&deadline, &delta); + + while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { + if (!ResetEvent(conn->drained)) + return (GetLastError()); + + /* How long until the deadline? */ + __os_gettime(dbenv, &now); + if (timespeccmp(&now, &deadline, >=)) { + F_SET(conn, CONN_CONGESTED); + return (0); + } + delta = deadline; + timespecsub(&delta, &now); + round_up = TRUE; + DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up); + duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t); + + ret = SignalObjectAndWait(db_rep->mutex, + conn->drained, duration, FALSE); + LOCK_MUTEX(db_rep->mutex); + if (ret == WAIT_FAILED) + return (GetLastError()); + else if (ret == WAIT_TIMEOUT) { + F_SET(conn, CONN_CONGESTED); + return (0); + } else + DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); + + if (db_rep->finished) + return (0); + if (F_ISSET(conn, CONN_DEFUNCT)) + return (DB_REP_UNAVAIL); + } + return (0); + } + + /* + * Creates a manual reset event, which is usually our best choice when we may + * have multiple threads waiting on a single event. + */ + int + __repmgr_alloc_cond(c) + cond_var_t *c; + { + HANDLE event; + + if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) + return (GetLastError()); + *c = event; + return (0); + } + + int + __repmgr_free_cond(c) + cond_var_t *c; + { + if (CloseHandle(*c)) + return (0); + return (GetLastError()); + } + /* * Make resource allocation an all-or-nothing affair, outside of this and the * close_sync function. db_rep->waiters should be non-NULL iff all of these *************** *** 488,493 **** --- 568,576 ---- * don't hurt anything flow-control-wise. */ TAILQ_FOREACH(conn, &db_rep->connections, entries) { + if (F_ISSET(conn, CONN_DEFUNCT)) + continue; + if (F_ISSET(conn, CONN_CONNECTING) || !STAILQ_EMPTY(&conn->outbound_queue) || (!flow_control || !IS_VALID_EID(conn->eid))) { *************** *** 534,541 **** conn != NULL; conn = next) { next = TAILQ_NEXT(conn, entries); ! if (F_ISSET(conn, CONN_DEFUNCT)) ! __repmgr_cleanup_connection(dbenv, conn); } /* --- 617,626 ---- conn != NULL; conn = next) { next = TAILQ_NEXT(conn, entries); ! if (F_ISSET(conn, CONN_DEFUNCT) && ! (ret = __repmgr_cleanup_connection(dbenv, ! conn)) != 0) ! goto unlock; } /* *************** *** 587,597 **** return (ret); } - /* - * !!! - * Only ever called on the select() thread, since we may call - * __repmgr_bust_connection(..., TRUE). - */ static int handle_completion(dbenv, conn) DB_ENV *dbenv; --- 672,677 ---- *************** *** 651,660 **** } } ! return (0); ! ! err: if (ret == DB_REP_UNAVAIL) ! return (__repmgr_bust_connection(dbenv, conn, TRUE)); return (ret); } --- 731,742 ---- } } ! err: ! if (ret == DB_REP_UNAVAIL) { ! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) ! return (ret); ! ret = __repmgr_cleanup_connection(dbenv, conn); ! } return (ret); } *************** *** 708,714 **** } DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); ! __repmgr_cleanup_connection(dbenv, conn); ret = __repmgr_connect_site(dbenv, eid); DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); return (ret); --- 790,797 ---- } DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); ! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) ! return (ret); ret = __repmgr_connect_site(dbenv, eid); DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); return (ret);