Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class ClientImpl {
/*
* Get the buffered event based on transaction routing algorithm
*/
struct bufferevent *routeProcedure(Procedure &proc, ScopedByteBuffer &sbb);
struct bufferevent *routeProcedure(Procedure &proc, ScopedByteBuffer &sbb, boost::shared_ptr<ProcedureCallback> callback);

/*
* Initiate connection based on pending connection instance
Expand Down
62 changes: 57 additions & 5 deletions include/ProcedureCallback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,58 @@ class ProcedureCallback {

enum AbandonReason { NOT_ABANDONED, TOO_BUSY };

typedef struct {
std::string procName;
std::string hostName;
int hostId;
int partition;
bool readonly;
bool multipart;
} InvokeInfo;

ProcedureCallback():
m_reason(NOT_ABANDONED), m_allowAbandon(true), m_info({"", "", ~0, ~0, false, false}){
}

virtual void abandon(AbandonReason reason) {
m_reason = reason;
}

// Mechanism for procedure to over-ride abandon property set in client in event of backpressure.
// @return true: allow abandoning of requests in case of back pressure
// false: don't abandon the requests in back pressure scenario.
bool allowAbandon() const {
return m_allowAbandon;
}


void invokeProcName(const std::string& n) {
m_info.procName = n;
}
void invokeHostName(const std::string& h) {
m_info.hostName = h;
}

void invokeHostId(const int id) {
m_info.hostId = id;
}

void invokePartition(const int p) {
m_info.partition = p;
}

void invokeReadonly(const bool r) {
m_info.readonly = r;
}

void invokeMultipart(const bool m) {
m_info.multipart = m;
}

const InvokeInfo& invokeInfo() const {
return m_info;
}

/*
* Invoked when a response to an invocation is available or
* the connection to the node the invocation was sent to was lost.
Expand All @@ -46,12 +98,12 @@ class ProcedureCallback {
* @return true if the event loop should break after invoking this callback, false otherwise
*/
virtual bool callback(InvocationResponse response) throw (voltdb::Exception) = 0;
virtual void abandon(AbandonReason reason) {}
// Mechanism for procedure to over-ride abandon property set in client in event of backpressure.
// @return true: allow abandoning of requests in case of back pressure
// false: don't abandon the requests in back pressure scenario.
virtual bool allowAbandon() const {return true;}
virtual ~ProcedureCallback() {}

protected:
AbandonReason m_reason;
bool m_allowAbandon;
InvokeInfo m_info;
};
}

Expand Down
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ BOOST_LIBS=/usr/local/lib
LIB_NAME=libvoltdbcpp
KIT_NAME=voltdb-client-cpp-x86_64-7.1

CFLAGS=-I$(BOOST_INCLUDES) -Iinclude -D__STDC_CONSTANT_MACROS -D__STDC_LIMIT_MACROS -g3 ${OPTIMIZATION} -fPIC
CFLAGS=-I$(BOOST_INCLUDES) -Iinclude -D__STDC_CONSTANT_MACROS -D__STDC_LIMIT_MACROS -g3 ${OPTIMIZATION} -fPIC -fpermissive
PLATFORM = $(shell uname)

ifeq ($(PLATFORM),Darwin)
Expand Down
64 changes: 35 additions & 29 deletions src/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,6 @@ class SyncCallback : public ProcedureCallback {
return true;
}

void abandon(AbandonReason reason) {}

private:
InvocationResponse *m_responseOut;
};
Expand Down Expand Up @@ -1009,17 +1007,16 @@ InvocationResponse ClientImpl::invoke(Procedure &proc) throw (Exception, NoConne
class DummyCallback : public ProcedureCallback {
public:
ProcedureCallback *m_callback;
DummyCallback(ProcedureCallback *callback) : m_callback(callback) {}
DummyCallback(ProcedureCallback *callback) : m_callback(callback) {
m_allowAbandon = m_callback->allowAbandon();
}
bool callback(InvocationResponse response) throw (Exception) {
return m_callback->callback(response);
}

void abandon(AbandonReason reason) {}

bool allowAbandon() const {
return m_callback->allowAbandon();
void abandon(AbandonReason reason) {
m_callback->abandon(reason);
}

};

void ClientImpl::invoke(Procedure &proc, ProcedureCallback *callback) throw (Exception, NoConnectionsException, UninitializedParamsException, LibEventException, ElasticModeMismatchException) {
Expand All @@ -1032,26 +1029,33 @@ bool ClientImpl::isReadOnly(const Procedure &proc) {
return (procInfo != NULL && procInfo->m_readOnly);
}

struct bufferevent *ClientImpl::routeProcedure(Procedure &proc, ScopedByteBuffer &sbb){
struct bufferevent *ClientImpl::routeProcedure(Procedure &proc, ScopedByteBuffer &sbb, boost::shared_ptr<ProcedureCallback> callback){
ProcedureInfo *procInfo = m_distributer.getProcedure(proc.getName());

//route transaction to correct event if procedure is found, transaction is single partitioned
int hostId = -1;
if (procInfo && !procInfo->m_multiPart){
const int hashedPartition = m_distributer.getHashedPartitionForParameter(sbb, procInfo->m_partitionParameter);
if (hashedPartition >= 0) {
callback->invokePartition(hashedPartition);
hostId = m_distributer.getHostIdByPartitionId(hashedPartition);
}
}
else
{
//use MIP partition instead
hostId = m_distributer.getHostIdByPartitionId(Distributer::MP_INIT_PID);
callback->invokeMultipart(true);
}
if (hostId >= 0) {
std::map<int, bufferevent*>::iterator bevEntry = m_hostIdToEvent.find(hostId);
if (bevEntry != m_hostIdToEvent.end()) {
return bevEntry->second;
//aici
bufferevent* bev = bevEntry->second;
const boost::shared_ptr<CxnContext> ctx = m_contexts[bev];
callback->invokeHostName(ctx->m_name);
callback->invokeHostId(ctx->m_hostId);
return bev;
}
}
return NULL;
Expand Down Expand Up @@ -1187,24 +1191,27 @@ void ClientImpl::invoke(Procedure &proc, boost::shared_ptr<ProcedureCallback> ca
}

bool procReadOnly = false;
struct bufferevent *routed_bev = NULL;
//route transaction to correct event if client affinity is enabled and hashinator updating is not in progress
//elastic scalability is disabled
if (m_useClientAffinity && !m_distributer.isUpdating()) {
routed_bev = routeProcedure(proc, sbb);
struct bufferevent* routed_bev = routeProcedure(proc, sbb, callback);
// Check if the routed_bev is valid and has not been removed due to lost connection
if ((routed_bev != NULL) && (m_callbacks.find(routed_bev) != m_callbacks.end())) {
bev = routed_bev;
}
} else {
// routing has failed, set invokation info from the roudrobin bev
const boost::shared_ptr<CxnContext> ctx = m_contexts[bev];
callback->invokeHostName(ctx->m_name);
callback->invokeHostId(ctx->m_hostId);
}

if (isReadOnly(proc)) {
procReadOnly = true;
}
procReadOnly = isReadOnly(proc);
callback->invokeReadonly(procReadOnly);
}

CallBackBookeeping *cbPtr = new CallBackBookeeping(callback, expirationTime, procReadOnly);
assert (cbPtr != NULL);
boost::shared_ptr<CallBackBookeeping> cb (cbPtr);
callback->invokeProcName(proc.getName());
boost::shared_ptr<CallBackBookeeping> cb(new CallBackBookeeping(callback, expirationTime, procReadOnly));
assert (cb);

BEVToCallbackMap::iterator bevFromCBMap = m_callbacks.find(bev);
if ( bevFromCBMap == m_callbacks.end()) {
Expand All @@ -1224,8 +1231,6 @@ void ClientImpl::invoke(Procedure &proc, boost::shared_ptr<ProcedureCallback> ca
if (evbuffer_get_length(evbuf) > 262144) {
m_backpressuredBevs.insert(bev);
}

return;
}

void ClientImpl::runOnce() throw (Exception, NoConnectionsException, LibEventException) {
Expand Down Expand Up @@ -1507,7 +1512,9 @@ bool ClientImpl::drain() throw (Exception, NoConnectionsException, LibEventExcep
class TopoUpdateCallback : public ProcedureCallback
{
public:
TopoUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {}
TopoUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {
m_allowAbandon = false;
}

bool callback(InvocationResponse response) throw (Exception)
{
Expand All @@ -1526,8 +1533,6 @@ class TopoUpdateCallback : public ProcedureCallback
return true;
}

bool allowAbandon() const {return false;}

private:
Distributer *m_dist;
ClientLogger *m_logger;
Expand All @@ -1536,7 +1541,9 @@ class TopoUpdateCallback : public ProcedureCallback
class SubscribeCallback : public ProcedureCallback
{
public:
SubscribeCallback(ClientLogger *logger) : m_logger(logger) {}
SubscribeCallback(ClientLogger *logger) : m_logger(logger) {
m_allowAbandon = false;
}

bool callback(InvocationResponse response) throw (Exception)
{
Expand All @@ -1553,7 +1560,6 @@ class SubscribeCallback : public ProcedureCallback
return true;
}

bool allowAbandon() const {return false;}
private:
ClientLogger *m_logger;
};
Expand All @@ -1564,7 +1570,9 @@ class SubscribeCallback : public ProcedureCallback
class ProcUpdateCallback : public ProcedureCallback
{
public:
ProcUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {}
ProcUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {
m_allowAbandon = false;
}

bool callback(InvocationResponse response) throw (Exception)
{
Expand All @@ -1584,8 +1592,6 @@ class ProcUpdateCallback : public ProcedureCallback
return true;
}

bool allowAbandon() const {return false;}

private:
Distributer *m_dist;
ClientLogger *m_logger;
Expand Down