Implementing TCP Traffic Control

Материал из Wiki.crossplatform.ru

Перейти к: навигация, поиск
Image:qt-logo_new.png Image:qq-title-article.png
Qt Quarterly | Выпуск 17 | Документация

by Andreas Aardal Hanssen
TCP traffic control (also known as "rate control" or "trafficthrottling") is about managing the network bandwidth available toyour application. This means limiting the number of concurrentconnections and the upload and download rates for eachconnection.

Содержание

In this article, we will show how you can implement your ownrate-controlled socket by subclassing QTcpSocket. Ourimplementation is a simplified version of the traffic controller fromQt 4.1's Torrent example (located in examples/network/torrent).

[Download source code]

[править] Who Needs Traffic Control Anyway?

Three Levels of Traffic Control

There are several places that traffic control can be applied:

  • In hardware: Routers, switches, rate controlling blackboxes, and your host machine's network card all play a role incontrolling network traffic. Quality of Service hardware, which isvery often used by ISPs, allows the provision of different bandwidthsto different connections, and can allow certain connectionsto bypass network congestion.
  • In the operating system: You could have different bandwidthsfor different users. Operating systems play an important role whendealing with concurrent transfers, especially when it comes tofairness.
  • In the application: Your application can limit the number ofsimultaneous connections, and provide a means for the user to pauseand resume each transfer. Excessive concurrent downloading greatlyburdens the TCP congestion control in the network,which can lead to a low download rate, regardless of howadvanced your hardware is.

Many of the applications we use every day implement some sort of TCPtraffic control: games use traffic control for downloading patches,operating systems use it for downloading security updates, and in webbrowsers with transfer dialogs, traffic control is absolutelynecessary to avoid network congestion.

For applications that download large datasets, limiting aconnection's download rate is also very important. Withouttraffic control, your application might end up stealing most of thebandwidth, slowing down everyone else using the samenetwork link. While fetching a huge email attachment, you couldannoy coworkers waiting for an important web transaction to complete.

With traffic control, if you download a 4 GB DVD ISO image fileduring office hours, you can specify a maximum download rate of20 KB/s. Your image file will arrive at a steady pace while therest of the bandwidth remains available to everyone else.

There are also benefits to limiting your upload rate. Uploadlimiting is crucial for connections with a low general uploadbandwidth (ADSL connections often have a 1:10 ratio between uploadand download rates). While uploading at full speed, you willoften see that a concurrent download rate will drop significantly. Inthat case, limiting your upload rate from 25 KB/s to 15 KB/s willusually allow downloading at full speed again. This happens because acongested TCP upload pipe prevents your host from sending TCP ACKpackets for the download.

But that's enough chit-chat. Let's write some code! Ourimplementation consists of two classes: RcTcpSocket inherits QTcpSocket, and RateController is a lean QObjectsubclass. One RateController controls the upload and downloadrates for a group of RcTcpSocket connections. Let's look atRateController first.

[править] Implementing the Rate Controller

What's important in a traffic controller?

  • It should be reliable and fair, so that it doesn't starve anyconnections, and can distribute bandwidth evenly. If you've got 149 KB/son one connection and 1 KB/s on another, limiting the total download rateto 50KB/s should allow both connections to continue downloading at asteady pace.
  • It should minimize the impact on your maximum download speed. Who wantsa traffic controller that shaves 10% off your bandwidth?
  • It should handle changing of the limits in real time, as well asallowing connections to be added and removed on the fly.

Let's start with the class definition:

class RateController : public QObject
{
    Q_OBJECT
 
public:
    RateController(QObject *parent = 0)
        : QObject(parent), transferScheduled(false) { }
 
    void addSocket(RcTcpSocket *socket);
    void removeSocket(RcTcpSocket *socket);
 
    int uploadLimit() const { return upLimit; }
    int downloadLimit() const { return downLimit; }
    void setUploadLimit(int bytesPerSecond)
        { upLimit = bytesPerSecond; }
    void setDownloadLimit(int bytesPerSecond);
 
public slots:
    void transfer();
    void scheduleTransfer();
 
private:
    QTimer stopWatch;
    QSet<RcTcpSocket *> sockets;
    int upLimit;
    int downLimit;
    bool transferScheduled;
};

The RateController class contains a list of RcTcpSockets.Once you add a socket by calling addSocket(), RateControllerwill start managing this socket's transfer rate by calling thetransfer() slot repeatedly.

All sockets managed by the sameRateController will have an accumulated maximum upload anddownload rate that you can set by calling setUploadLimit() andsetDownloadLimit(). These functions can be calledat any time to influence RateController's behavior.

void RateController::addSocket(RcTcpSocket *socket)
{
    connect(socket, SIGNAL(readyToTransfer()), this, SLOT(transfer()));
    socket->setReadBufferSize(downLimit * 2);
    sockets.insert(socket);
    scheduleTransfer();
}

In addSocket(), we connect the RcTcpSocket::readyToTransfer()signal to our RateController::transfer() slot. RcTcpSocketuses this signal to notify the rate controller when it has data ready totransfer.

Next, we limit the socket's read buffer size. This is necessary to prevent QTcpSocket from downloading at maximum capacity;once the read buffer is full, QTcpSocket willpause downloading. By setting this buffer to be twice as large as the alloweddownload limit, we will help keep the download rate steady.

Finally, we add the socket to an internal QSet andcall scheduleTransfer(); we'll get back to this later.

void RateController::removeSocket(RcTcpSocket *socket)
{
    disconnect(socket, SIGNAL(readyToTransfer()), this, SLOT(transfer()));
    socket->setReadBufferSize(0);
    sockets.remove(socket);
}

The removeSocket() function simply undoes what we did inaddSocket(). The special zero value passed tosetReadBufferSize() removes any limit on the read buffer size.

void RateController::setDownloadLimit(int bytesPerSecond)
{
    downLimit = bytesPerSecond;
    foreach (RcTcpSocket *socket, sockets)
        socket->setReadBufferSize(downLimit * 2);
}

In setDownloadLimit(), we update all registered sockets' read buffersizes to allow real-time adjustments to the download limit.

void RateController::scheduleTransfer()
{
    if (transferScheduled)
        return;
    transferScheduled = true;
    QTimer::singleShot(50, this, SLOT(transfer()));
}

The scheduleTransfer() slot ensures that transfer() is calledat most once every 50 milliseconds, simplifying our code significantly– if every signal that notifies data transfer activity is connected tothis slot, the transfer() slot will eventually be called.For network protocols that require very low round-trip times,a shorter interval can be used, at the expense of some CPU.

Let's move on to the transfer() slot, which is where all thetraffic throttling goes on:

void RateController::transfer()
{
    transferScheduled = false;
    int msecs = 1000;
    if (!stopWatch.isNull())
        msecs = qMin(msecs, stopWatch.elapsed());

We start by resetting transferScheduled to false, allowingthis slot to be called again (if transferScheduled is true,scheduleTransfer() returns immediately). Then we check how longit is since the last time this slot was called using the class'sinternal stopWatch timer.

The first time transfer() is called, thetimer will be null, so we will default to 1000 milliseconds.

    qint64 bytesToWrite = (upLimit * msecs) / 1000;
    qint64 bytesToRead = (downLimit * msecs) / 1000;
    if (bytesToWrite == 0 &amp;&amp; bytesToRead == 0) {
        scheduleTransfer();
        return;
    }

Each time transfer() is called, we take the full upload anddownload limits of the rate controller, reducing them to shares of thefull 1 second transfer window, to find out how much data wecan read and write. If the slot is called too soon, and there is nodata to read or write, we schedule another transfer() call andreturn.

    QSet<RcTcpSocket *> pendingSockets;
    foreach (RcTcpSocket *client, sockets) {
        if (client->canTransferMore())
        pendingSockets.insert(client);
    }
    if (pendingSockets.isEmpty())
        return;
    stopWatch.start();

Now, we put together a list of all monitored sockets that are readyto transfer data. If this list is empty (i.e., no sockets cantransfer anything), we return. Otherwise, we start or restart thestopWatch timer because we are about to start transferring data.

    bool canTransferMore;
    do {
        canTransferMore = false;
        qint64 writeChunk = qMax(qint64(1), bytesToWrite / pendingSockets.size());
        qint64 readChunk = qMax(qint64(1), bytesToRead / pendingSockets.size());
 
        QSetIterator<RcTcpSocket *> it(pendingSockets);
        while (it.hasNext() &amp;&amp; (bytesToWrite > 0 || bytesToRead > 0)) {
            RcTcpSocket *socket = it.next();

Our data transfer step consists of an outer loop that runs as longas there is still data left to transfer on any socket, and an innerloop that runs through all sockets that can transfer data.Before we enter the inner loop, we take the total bytes to read andwrite, divide them by the number of pending sockets, and the resultis the maximum chunk of data each socket can write this time.

            bool dataTransferred = false;
            qint64 available = qMin(readChunk,
                   socket->networkBytesAvailable());
            if (available > 0) {
                qint64 readBytes = socket->readFromNetwork(
                       qMin(available, bytesToRead));
                if (readBytes > 0) {
                    bytesToRead -= readBytes;
                    dataTransferred = true;
                }
            }

We will handle reading first. The socket is asked to read the minimumof readChunk, bytesToRead, and the number of bytes that areavailable for reading from the socket. If the socket was able to readanything, we update bytesToRead and set dataTransferred tonotify that the socket was able to transfer data.

            if (upLimit * 2 > socket->bytesToWrite()) {
                qint64 chunkSize = qMin(writeChunk, bytesToWrite);
                qint64 toWrite = qMin(chkSize,
                       upLimit * 2 - socket->bytesToWrite());
                if (toWrite > 0) {
                    qint64 writtenBytes = socket->writeToNetwork(toWrite);
                    if (writtenBytes > 0) {
                        bytesToWrite -= writtenBytes;
                        dataTransferred = true;
                    }
                }
            }

We control writing with the same pattern, but in addition wemake sure that the socket doesn't have more than twice its uploadlimit pending already. This is to prevent the socket's outgoingbuffer from growing too much when the connection is congested. Again,if we transferred some data, we set dataTransferred.

            if (dataTransferred &amp;&amp; socket->canTransferMore())
                canTransferMore = true;
            else
                pendingSockets.remove(socket);
        }
    } while (canTransferMore
             &amp;&amp; (bytesToWrite > 0 || bytesToRead > 0)
             &amp;&amp; !pendingSockets.isEmpty());
 
    if (canTransferMore)
        scheduleTransfer();
}

Finally, if the socket was unable to transfer any data, it is removedfrom the list of pending sockets. Otherwise, if more data can betransferred by any socket, the outer loop will restart and continueuntil there is nothing more to transfer. At the end oftransfer(), we schedule another call if any data can betransferred.

[править] Implementing the Rate-Controlled Socket

RcTcpSocket is a subclass of QTcpSocket that stores incomingand outgoing data in two buffers, instead of operating directly onthe network. Outgoing data is first stored in a buffer, untilRateController calls writeToNetwork() with anappropriate number of bytes. Similarly, incoming data is available tothe user of RcTcpSocket only after readFromNetwork()is called.

class RcTcpSocket : public QTcpSocket
{
    Q_OBJECT
 
public:
    RcTcpSocket(QObject *parent = 0);
 
    bool canReadLine() const
        { return incoming.contains('\n'); }
 
    qint64 writeToNetwork(qint64 maxLen);
    qint64 readFromNetwork(qint64 maxLen);
 
    bool canTransferMore() const;
    qint64 bytesAvailable() const;
    qint64 networkBytesAvailable() const
        { return QTcpSocket::bytesAvailable(); }
 
signals:
    void readyToTransfer();
 
protected:
    qint64 readData(char *data, qint64 maxLen);
    qint64 readLineData(char *data, qint64 maxLen);
    qint64 writeData(const char *data, qint64 len);
 
private:
    QByteArray outgoing;
    QByteArray incoming;
};

Here's the constructor:

RcTcpSocket::RcTcpSocket(QObject *parent)
    : QTcpSocket(parent)
{
    connect(this, SIGNAL(readyRead()), this, SIGNAL(readyToTransfer()));
    connect(this, SIGNAL(connected()), this, SIGNAL(readyToTransfer()));
}

We connect the connected() and readyRead() signals toreadyToTransfer(). This allows us to notify RateControllerwhen we are ready to read from or write to the network.

qint64 RcTcpSocket::writeToNetwork(qint64 maxLen)
{
    qint64 bytesWritten =
            QTcpSocket::writeData(outgoing.data(),
                   qMin(maxLen, qint64(outgoing.size())));
    if (bytesWritten <= 0)
        return bytesWritten;
    outgoing.remove(0, bytesWritten);
    return bytesWritten;
}

The writeToNetwork() function writes as much as it can, up to amaximum of maxLen bytes, from its outgoing buffer and onto thenetwork.

qint64 RcTcpSocket::readFromNetwork(qint64 maxLen)
{
    int oldSize = incoming.size();
    incoming.resize(incoming.size() + maxLen);
    qint64 bytesRead = QTcpSocket::readData(
           incoming.data() + oldSize, maxLen);
    incoming.resize(bytesRead <= 0 ? oldSize : oldSize + bytesRead);
    if (bytesRead > 0)
        emit readyRead();
    return bytesRead;
}

The readFromNetwork() function works similarily towriteToNetwork(). We useQTcpSocket::readData() to read asmuch as we can, up to a limit of maxLen bytes, from the network.Then we store the result in the socket's incoming buffer. We emitreadyRead() to notify that bytes have arrived.

bool RcTcpSocket::canTransferMore() const
{
    return !incoming.isEmpty()
           || !outgoing.isEmpty()
           || QTcpSocket::bytesAvailable() > 0;
}

The canTransferMore() function returns true if either of thebuffers contain data or if there's data available on the socket.

qint64 RcTcpSocket::bytesAvailable() const
{
    if (state() != ConnectedState) {
        QByteArray buffer;
        buffer.resize(QTcpSocket::bytesAvailable());
        RcTcpSocket *that = const_cast<RcTcpSocket *>(this);
        that->QTcpSocket::readData(buffer.data(), buffer.size());
        that->incoming += buffer;
    }
    return incoming.size();
}

The bytesAvailable() function overrides the QTcpSocketversion of the function. If called in ConnectedState, it willsimply return the number of bytes in the incoming buffer. Otherwise,it will cast away the const in the this pointer and read allpending data from QTcpSocket's buffer intoits own buffer and then return the size. We'll get back to this in the nextparagraph.

qint64 RcTcpSocket::readData(char *data, qint64 maxLen)
{
    int bytesRead = qMin<int>(maxLen, incoming.size());
    memcpy(data, incoming.constData(), bytesRead);
    incoming.remove(0, bytesRead);
 
    if (state() != ConnectedState) {
        QByteArray buffer;
        buffer.resize(QTcpSocket::bytesAvailable());
        QTcpSocket::readData(buffer.data(), buffer.size());
        incoming += buffer;
    }
    return qint64(bytesRead);
}

By default, in our readData() implementation, we simply pop dataoff our incoming buffer. However, if the connection is not open, weapply the same block of code as we used in bytesAvailable(). When QTcpSocket emits disconnected() orerror(RemoteHostClosed), you can normally assume that a singlereadAll() will give you all the pending data. However, the ratecontroller is not likely to have transferred all data before thesocket connection closed. We have solved this problem by reading allpending data into the incoming buffer if the connection is no longeropen.

qint64 RcTcpSocket::readLineData(char *data, qint64 maxLen)
{
    return QIODevice::readLineData(data, maxLen);
}

We must remember to reimplement readLineData() as well;otherwise, QTcpSocket's own implementation will be called, butthat function operates directly on the network, bypassing our buffer.Our implementation calls QIODevice::readLineData(), a defaultimplementation of a "read line" algorithm that in turn calls ourreadData() implementation. As you can see from the classdefinition, we have also reimplementedRcTcpSocket::canReadLine(), which simply searches for a '"\\"n'in the incoming buffer.

qint64 RcTcpSocket::writeData(const char *data, qint64 len)
{
    int oldSize = outgoing.size();
    outgoing.resize(oldSize + len);
    memcpy(outgoing.data() + oldSize, data, len);
    emit readyToTransfer();
    return len;
}

Our writeData() reimplementation appends the data to the outgoingbuffer, and emits readyToTransfer() so that RateControllercan make sure the data is transferred to the network.

And that completes this example. We'll finish off with main(), todemonstrate how our new rate controlled socket is used to fetchTrolltech's home page with a maximum download rate of 2 KB/s:

int main(int argc, char **argv)
{
    QCoreApplication app(argc, argv);
 
    RcTcpSocket socket;
    socket.connectToHost("www.trolltech.com", 80);
    socket.write("GET / HTTP/1.0\r\n\r\n");
 
    RateController controller;
    controller.setUploadLimit(512);
    controller.setDownloadLimit(2048);
    controller.addSocket(&amp;socket);
 
    return app.exec();
}

Copyright © 2006 Trolltech Trademarks