Simplify http::Agent methods and use multipart upload

Should speed-up uploads by updating metadata and contents at the same time.
Also should fix most 412 errors because of the update atomicity.
pull/40/head
Vitaliy Filippov 2015-10-09 12:31:19 +03:00
parent 679fa0eec6
commit 887da88c14
15 changed files with 338 additions and 290 deletions

View File

@ -1,6 +1,6 @@
# Grive2 0.4.1
# Grive2 0.4.2-dev
4 Aug 2015, Vitaliy Filippov
10 Oct 2015, Vitaliy Filippov
http://yourcmc.ru/wiki/Grive2
@ -28,7 +28,7 @@ You need the following libraries:
- libcurl
- libstdc++
- libgcrypt
- Boost (Boost filesystem, program_options and regex are required)
- Boost (Boost filesystem, program_options, regex, unit_test_framework and system are required)
- expat
There are also some optional dependencies:
@ -62,7 +62,10 @@ Enjoy!
### Grive2 v0.4.2 (unreleased)
- Exclude files by perl regexp
- Option to exclude files by perl regexp
- Reimplemented HTTP response logging for debug purposes
- Use multipart uploads (update metadata and contents at the same time) for improved perfomance & stability
- Bug fixes
### Grive2 v0.4.1

View File

@ -75,7 +75,7 @@ void Syncer1::DeleteRemote( Resource *res )
m_http->Get( res->SelfHref(), &xml, hdr ) ;
AssignIDs( res, Entry1( xml.Response() ) ) ;
m_http->Custom( "DELETE", res->SelfHref(), &str, hdr ) ;
m_http->Request( "DELETE", res->SelfHref(), NULL, &str, hdr ) ;
}
catch ( Exception& e )
{

View File

@ -32,6 +32,8 @@
#include "util/OS.hh"
#include "util/log/Log.hh"
#include "util/StringStream.hh"
#include "util/ConcatStream.hh"
#include <boost/exception/all.hpp>
@ -87,14 +89,19 @@ bool Syncer2::Create( Resource *res )
return Upload( res );
}
std::string to_string( uint64_t n )
{
std::ostringstream s;
s << n;
return s.str();
}
bool Syncer2::Upload( Resource *res )
{
Val meta;
meta.Add( "title", Val( res->Name() ) );
if ( res->IsFolder() )
{
meta.Add( "mimeType", Val( mime_types::folder ) );
}
if ( !res->Parent()->IsRoot() )
{
Val parent;
@ -107,8 +114,9 @@ bool Syncer2::Upload( Resource *res )
Val valr ;
// Issue metadata update request
if ( res->IsFolder() )
{
// Only issue metadata update request
http::Header hdr2 ;
hdr2.Add( "Content-Type: application/json" );
http::ValResponse vrsp ;
@ -120,35 +128,34 @@ bool Syncer2::Upload( Resource *res )
valr = vrsp.Response();
assert( !( valr["id"].Str().empty() ) );
}
if ( !res->IsFolder() )
else
{
while ( true )
{
File file( res->Path() ) ;
std::ostringstream xcontent_len ;
xcontent_len << "Content-Length: " << file.Size() ;
File file( res->Path() ) ;
ConcatStream multipart ;
StringStream p1(
"--file_contents\r\nContent-Type: application/json; charset=utf-8\r\n\r\n" + json_meta +
"\r\n--file_contents\r\nContent-Type: application/octet-stream\r\nContent-Length: " + to_string( file.Size() ) +
"\r\n\r\n"
);
StringStream p2("\r\n--file_contents--\r\n");
multipart.Append( &p1 );
multipart.Append( &file );
multipart.Append( &p2 );
http::Header hdr ;
hdr.Add( "Content-Type: application/octet-stream" ) ;
hdr.Add( xcontent_len.str() ) ;
if ( valr.Has( "etag" ) )
hdr.Add( "If-Match: " + valr["etag"].Str() ) ;
http::Header hdr ;
if ( !res->ETag().empty() )
hdr.Add( "If-Match: " + res->ETag() ) ;
hdr.Add( "Content-Type: multipart/related; boundary=\"file_contents\"" );
hdr.Add( "Content-Length: " + to_string( multipart.Size() ) );
http::ValResponse vrsp;
long http_code = m_http->Put( upload_base + "/" + valr["id"].Str() + "?uploadType=media", &file, &vrsp, hdr ) ;
if ( http_code == 410 || http_code == 412 )
{
Log( "request failed with %1%, body: %2%. retrying whole upload in 5s", http_code, m_http->LastError(), log::warning ) ;
os::Sleep( 5 );
}
else
{
valr = vrsp.Response() ;
assert( !( valr["id"].Str().empty() ) );
break ;
}
}
http::ValResponse vrsp;
m_http->Request(
res->ResourceID().empty() ? "POST" : "PUT",
upload_base + ( res->ResourceID().empty() ? "" : "/" + res->ResourceID() ) + "?uploadType=multipart",
&multipart, &vrsp, hdr
) ;
valr = vrsp.Response() ;
assert( !( valr["id"].Str().empty() ) );
}
Entry2 responseEntry = Entry2( valr ) ;

View File

@ -0,0 +1,67 @@
/*
Convenience wrapper methods for various kinds of HTTP requests
Copyright (C) 2015 Vitaliy Filippov
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation version 2
of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "Agent.hh"
#include "Header.hh"
#include "util/StringStream.hh"
namespace gr {
namespace http {
long Agent::Put(
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr )
{
StringStream s( data );
return Request( "PUT", url, &s, dest, hdr );
}
long Agent::Put(
const std::string& url,
File *file,
DataStream *dest,
const Header& hdr )
{
return Request( "PUT", url, (SeekStream*)file, dest, hdr );
}
long Agent::Get(
const std::string& url,
DataStream *dest,
const Header& hdr )
{
return Request( "GET", url, NULL, dest, hdr );
}
long Agent::Post(
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr )
{
Header h( hdr ) ;
StringStream s( data );
h.Add( "Content-Type: application/x-www-form-urlencoded" );
return Request( "POST", url, &s, dest, h );
}
} } // end of namespace

View File

@ -21,10 +21,11 @@
#include <string>
#include "ResponseLog.hh"
#include "util/Types.hh"
namespace gr {
class DataStream ;
class SeekStream ;
class File ;
namespace http {
@ -43,28 +44,29 @@ public :
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr ) = 0 ;
const Header& hdr ) ;
virtual long Put(
const std::string& url,
File *file,
DataStream *dest,
const Header& hdr ) = 0 ;
const Header& hdr ) ;
virtual long Get(
const std::string& url,
DataStream *dest,
const Header& hdr ) = 0 ;
const Header& hdr ) ;
virtual long Post(
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr ) = 0 ;
const Header& hdr ) ;
virtual long Custom(
virtual long Request(
const std::string& method,
const std::string& url,
SeekStream *in,
DataStream *dest,
const Header& hdr ) = 0 ;

View File

@ -46,35 +46,15 @@ namespace {
using namespace gr::http ;
using namespace gr ;
std::size_t ReadStringCallback( void *ptr, std::size_t size, std::size_t nmemb, std::string *data )
{
assert( ptr != 0 ) ;
assert( data != 0 ) ;
std::size_t count = std::min( size * nmemb, data->size() ) ;
if ( count > 0 )
{
std::memcpy( ptr, &(*data)[0], count ) ;
data->erase( 0, count ) ;
}
return count ;
}
std::size_t ReadFileCallback( void *ptr, std::size_t size, std::size_t nmemb, File *file )
std::size_t ReadFileCallback( void *ptr, std::size_t size, std::size_t nmemb, SeekStream *file )
{
assert( ptr != 0 ) ;
assert( file != 0 ) ;
std::size_t count = std::min(
static_cast<std::size_t>(size * nmemb),
static_cast<std::size_t>(file->Size() - file->Tell()) ) ;
assert( count <= std::numeric_limits<std::size_t>::max() ) ;
if ( count > 0 )
file->Read( static_cast<char*>(ptr), static_cast<std::size_t>(count) ) ;
return count ;
if ( size*nmemb > 0 )
return file->Read( static_cast<char*>(ptr), size*nmemb ) ;
return 0 ;
}
} // end of local namespace
@ -214,96 +194,27 @@ long CurlAgent::ExecCurl(
return http_code ;
}
long CurlAgent::Put(
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr )
{
Trace("HTTP PUT \"%1%\"", url ) ;
Init() ;
CURL *curl = m_pimpl->curl ;
std::string put_data = data ;
// set common options
::curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L ) ;
::curl_easy_setopt(curl, CURLOPT_READFUNCTION, &ReadStringCallback ) ;
::curl_easy_setopt(curl, CURLOPT_READDATA , &put_data ) ;
::curl_easy_setopt(curl, CURLOPT_INFILESIZE, put_data.size() ) ;
return ExecCurl( url, dest, hdr ) ;
}
long CurlAgent::Put(
long CurlAgent::Request(
const std::string& method,
const std::string& url,
File *file,
SeekStream *in,
DataStream *dest,
const Header& hdr )
{
assert( file != 0 ) ;
Trace("HTTP %1% \"%2%\"", method, url ) ;
Trace("HTTP PUT \"%1%\"", url ) ;
Init() ;
CURL *curl = m_pimpl->curl ;
// set common options
::curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L ) ;
::curl_easy_setopt(curl, CURLOPT_READFUNCTION, &ReadFileCallback ) ;
::curl_easy_setopt(curl, CURLOPT_READDATA , file ) ;
::curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(file->Size()) ) ;
return ExecCurl( url, dest, hdr ) ;
}
long CurlAgent::Get(
const std::string& url,
DataStream *dest,
const Header& hdr )
{
Trace("HTTP GET \"%1%\"", url ) ;
Init() ;
// set get specific options
::curl_easy_setopt(m_pimpl->curl, CURLOPT_HTTPGET, 1L);
return ExecCurl( url, dest, hdr ) ;
}
long CurlAgent::Post(
const std::string& url,
const std::string& post_data,
DataStream *dest,
const Header& hdr )
{
Trace("HTTP POST \"%1%\" with \"%2%\"", url, post_data ) ;
Init() ;
CURL *curl = m_pimpl->curl ;
// set post specific options
::curl_easy_setopt(curl, CURLOPT_POST, 1L);
::curl_easy_setopt(curl, CURLOPT_POSTFIELDS, &post_data[0] ) ;
::curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, post_data.size() ) ;
return ExecCurl( url, dest, hdr ) ;
}
long CurlAgent::Custom(
const std::string& method,
const std::string& url,
DataStream *dest,
const Header& hdr )
{
Trace("HTTP %2% \"%1%\"", url, method ) ;
Init() ;
CURL *curl = m_pimpl->curl ;
::curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, method.c_str() );
// ::curl_easy_setopt(curl, CURLOPT_VERBOSE, 1 );
if ( in )
{
::curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L ) ;
::curl_easy_setopt(curl, CURLOPT_READFUNCTION, &ReadFileCallback ) ;
::curl_easy_setopt(curl, CURLOPT_READDATA , in ) ;
::curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>( in->Size() ) ) ;
}
return ExecCurl( url, dest, hdr ) ;
}

View File

@ -44,35 +44,13 @@ public :
ResponseLog* GetLog() const ;
void SetLog( ResponseLog *log ) ;
long Put(
long Request(
const std::string& method,
const std::string& url,
const std::string& post_data,
DataStream *dest,
const Header& hdr ) ;
long Put(
const std::string& url,
File *file,
SeekStream *in,
DataStream *dest,
const Header& hdr ) ;
long Get(
const std::string& url,
DataStream *dest,
const Header& hdr ) ;
long Post(
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr ) ;
long Custom(
const std::string& method,
const std::string& url,
DataStream *dest,
const Header& hdr ) ;
std::string LastError() const ;
std::string LastErrorHeaders() const ;
@ -84,14 +62,14 @@ public :
private :
static std::size_t HeaderCallback( void *ptr, size_t size, size_t nmemb, CurlAgent *pthis ) ;
static std::size_t Receive( void* ptr, size_t size, size_t nmemb, CurlAgent *pthis ) ;
long ExecCurl(
const std::string& url,
DataStream *dest,
const Header& hdr ) ;
void Init() ;
private :
struct Impl ;
std::auto_ptr<Impl> m_pimpl ;

View File

@ -47,98 +47,29 @@ void AuthAgent::SetLog( http::ResponseLog *log )
return m_agent->SetLog( log );
}
Header AuthAgent::AppendHeader( const Header& hdr ) const
http::Header AuthAgent::AppendHeader( const http::Header& hdr ) const
{
Header h(hdr) ;
http::Header h(hdr) ;
h.Add( "Authorization: Bearer " + m_auth.AccessToken() ) ;
h.Add( "GData-Version: 3.0" ) ;
return h ;
}
long AuthAgent::Put(
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr )
{
long response;
Header auth;
do
{
auth = AppendHeader( hdr );
response = m_agent->Put( url, data, dest, auth );
} while ( CheckRetry( response ) );
return CheckHttpResponse( response, url, auth );
}
long AuthAgent::Put(
const std::string& url,
File *file,
DataStream *dest,
const Header& hdr )
{
long response;
Header auth;
while ( true )
{
auth = AppendHeader( hdr );
response = m_agent->Put( url, file, dest, auth );
if ( !CheckRetry( response ) )
break;
file->Seek( 0, SEEK_SET );
}
// On 410 Gone or 412 Precondition failed, recovery may be possible so don't
// throw an exception
if ( response == 410 || response == 412 )
return response;
return CheckHttpResponse( response, url, auth );
}
long AuthAgent::Get(
const std::string& url,
DataStream *dest,
const Header& hdr )
{
long response;
Header auth;
do
{
auth = AppendHeader( hdr );
response = m_agent->Get( url, dest, auth );
} while ( CheckRetry( response ) );
return CheckHttpResponse( response, url, auth );
}
long AuthAgent::Post(
const std::string& url,
const std::string& data,
DataStream *dest,
const Header& hdr )
{
long response;
Header auth;
do
{
auth = AppendHeader( hdr );
response = m_agent->Post( url, data, dest, auth );
} while ( CheckRetry( response ) );
return CheckHttpResponse( response, url, auth );
}
long AuthAgent::Custom(
long AuthAgent::Request(
const std::string& method,
const std::string& url,
SeekStream *in,
DataStream *dest,
const Header& hdr )
const http::Header& hdr )
{
long response;
Header auth;
do
{
auth = AppendHeader( hdr );
response = m_agent->Custom( method, url, dest, auth );
if ( in )
in->Seek( 0, 0 );
response = m_agent->Request( method, url, in, dest, auth );
} while ( CheckRetry( response ) );
return CheckHttpResponse( response, url, auth );
}

View File

@ -39,32 +39,10 @@ public :
http::ResponseLog* GetLog() const ;
void SetLog( http::ResponseLog *log ) ;
long Put(
const std::string& url,
const std::string& data,
DataStream *dest,
const http::Header& hdr ) ;
long Put(
const std::string& url,
File* file,
DataStream *dest,
const http::Header& hdr ) ;
long Get(
const std::string& url,
DataStream *dest,
const http::Header& hdr ) ;
long Post(
const std::string& url,
const std::string& data,
DataStream *dest,
const http::Header& hdr ) ;
long Custom(
long Request(
const std::string& method,
const std::string& url,
SeekStream *in,
DataStream *dest,
const http::Header& hdr ) ;

View File

@ -0,0 +1,93 @@
/*
A stream representing a concatenation of several underlying streams
Copyright (C) 2015 Vitaliy Filippov
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation version 2
of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "ConcatStream.hh"
namespace gr {
ConcatStream::ConcatStream() :
m_cur( 0 ), m_size( 0 ), m_pos( 0 )
{
}
std::size_t ConcatStream::Read( char *data, std::size_t size )
{
std::size_t done = 0, l;
while ( done < size && m_cur < m_streams.size() )
{
l = m_streams[m_cur]->Read( data+done, size-done );
if ( !l )
{
m_cur++;
if ( m_cur < m_streams.size() )
m_streams[m_cur]->Seek( 0, 0 );
}
done += l;
m_pos += l;
}
return done ;
}
std::size_t ConcatStream::Write( const char *data, std::size_t size )
{
return 0 ;
}
off_t ConcatStream::Seek( off_t offset, int whence )
{
if ( whence == 1 )
offset += m_pos;
else if ( whence == 2 )
offset += Size();
if ( offset > Size() )
offset = Size();
m_cur = 0;
m_pos = offset;
if ( m_streams.size() )
{
while ( offset > m_streams[m_cur]->Size() )
{
offset -= m_streams[m_cur]->Size();
m_cur++;
}
m_streams[m_cur]->Seek( offset, 0 );
}
return m_pos ;
}
off_t ConcatStream::Tell() const
{
return m_pos ;
}
u64_t ConcatStream::Size() const
{
return m_size ;
}
void ConcatStream::Append( SeekStream *stream )
{
if ( stream )
{
m_streams.push_back( stream );
m_size += stream->Size();
}
}
} // end of namespace

View File

@ -0,0 +1,48 @@
/*
A stream representing a concatenation of several underlying streams
Copyright (C) 2015 Vitaliy Filippov
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation version 2
of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#pragma once
#include "DataStream.hh"
#include <vector>
namespace gr {
class ConcatStream : public SeekStream
{
public :
ConcatStream() ;
std::size_t Read( char *data, std::size_t size ) ;
std::size_t Write( const char *data, std::size_t size ) ;
off_t Seek( off_t offset, int whence ) ;
off_t Tell() const ;
u64_t Size() const ;
void Append( SeekStream *stream ) ;
private :
std::vector<SeekStream*> m_streams ;
off_t m_size, m_pos ;
int m_cur ;
} ;
} // end of namespace

View File

@ -20,6 +20,7 @@
#pragma once
#include <cstddef>
#include "util/Types.hh"
namespace gr {
@ -47,9 +48,13 @@ public :
virtual std::size_t Write( const char *data, std::size_t size ) = 0 ;
} ;
/// Stream for /dev/null, i.e. read and writing nothing
DataStream* DevNull() ;
class SeekStream: public DataStream
{
public :
virtual off_t Seek( off_t offset, int whence ) = 0 ;
virtual off_t Tell() const = 0 ;
virtual u64_t Size() const = 0 ;
} ;
} // end of namespace

View File

@ -35,7 +35,7 @@ namespace gr {
It is a simple wrapper around the UNIX file descriptor. It will
throw exceptions (i.e. Error) when it encounters errors.
*/
class File : public DataStream
class File : public SeekStream
{
public :
/// File specific errors. It often includes

View File

@ -27,33 +27,55 @@ namespace
{
// the max size of the cached string. this is to prevent allocating
// too much memory if client sends a line too long (i.e. DOS attack)
const std::size_t capacity = 1024 ;
const std::size_t capacity = 4096 ;
}
StringStream::StringStream( const std::string& init ) :
m_str( init )
// FIXME avoid copy
m_str( init ), m_pos( 0 )
{
}
/// Read `size` bytes from the stream. Those bytes will be removed from
/// the underlying string by calling `std::string::erase()`. Therefore, it is
/// not a good idea to call Read() to read byte-by-byte.
/// Read `size` bytes from the stream.
std::size_t StringStream::Read( char *data, std::size_t size )
{
// wow! no need to count count == 0
std::size_t count = std::min( m_str.size(), size ) ;
std::copy( m_str.begin(), m_str.begin() + count, data ) ;
m_str.erase( 0, count ) ;
std::size_t count = std::min( m_str.size()-m_pos, size ) ;
std::copy( m_str.begin() + m_pos, m_str.begin() + m_pos + count, data ) ;
m_pos += count ;
return count ;
}
std::size_t StringStream::Write( const char *data, std::size_t size )
{
std::size_t count = std::min( size, capacity - m_str.size() ) ;
m_str.insert( m_str.end(), data, data+count ) ;
m_str.replace( m_str.begin() + m_pos, m_str.begin() + m_pos + count, data, data+count ) ;
m_pos += count ;
return count ;
}
off_t StringStream::Seek( off_t offset, int whence )
{
if ( whence == 1 )
offset += m_pos;
else if ( whence == 2 )
offset += Size();
if ( offset > Size() )
offset = Size();
m_pos = (size_t)offset;
return m_pos;
}
off_t StringStream::Tell() const
{
return m_pos;
}
u64_t StringStream::Size() const
{
return m_str.size();
}
const std::string& StringStream::Str() const
{
return m_str ;

View File

@ -35,10 +35,8 @@ namespace gr {
is to prevent DOS attacks in which the client sends a lot of bytes before
the delimiter (e.g. new-line characters) and the server is forced to hold
all of them in memory.
The limit is current 1024 bytes.
*/
class StringStream : public DataStream
class StringStream : public SeekStream
{
public :
explicit StringStream( const std::string& init = std::string() ) ;
@ -46,11 +44,16 @@ public :
std::size_t Read( char *data, std::size_t size ) ;
std::size_t Write( const char *data, std::size_t size ) ;
off_t Seek( off_t offset, int whence ) ;
off_t Tell() const ;
u64_t Size() const ;
const std::string& Str() const ;
void Str( const std::string& str ) ;
private :
std::string m_str ;
std::size_t m_pos ;
} ;
} // end of namespace