"Fossies" - the Fresh Open Source Software Archive

Member "sphinx-2.2.11-release/mysqlse/ha_sphinx.cc" (19 Jul 2016, 93117 Bytes) of package /linux/www/sphinx-2.2.11-release.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "ha_sphinx.cc" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 2.2.10-release_vs_2.2.11-release.

    1 //
    2 // $Id$
    3 //
    4 
    5 //
    6 // Copyright (c) 2001-2016, Andrew Aksyonoff
    7 // Copyright (c) 2008-2016, Sphinx Technologies Inc
    8 // All rights reserved
    9 //
   10 // This program is free software; you can redistribute it and/or modify
   11 // it under the terms of the GNU General Public License. You should have
   12 // received a copy of the GPL license along with this program; if you
   13 // did not, you can find it at http://www.gnu.org/
   14 //
   15 
   16 #ifdef USE_PRAGMA_IMPLEMENTATION
   17 #pragma implementation // gcc: Class implementation
   18 #endif
   19 
   20 #if _MSC_VER>=1400
   21 #define _CRT_SECURE_NO_DEPRECATE 1
   22 #define _CRT_NONSTDC_NO_DEPRECATE 1
   23 #endif
   24 
   25 #include <mysql_version.h>
   26 
   27 #if MYSQL_VERSION_ID>=50515
   28 #include "sql_class.h"
   29 #include "sql_array.h"
   30 #elif MYSQL_VERSION_ID>50100
   31 #include "mysql_priv.h"
   32 #include <mysql/plugin.h>
   33 #else
   34 #include "../mysql_priv.h"
   35 #endif
   36 
   37 #include <mysys_err.h>
   38 #include <my_sys.h>
   39 #include <mysql.h> // include client for INSERT table (sort of redoing federated..)
   40 
   41 #ifndef __WIN__
   42     // UNIX-specific
   43     #include <my_net.h>
   44     #include <netdb.h>
   45     #include <sys/un.h>
   46 
   47     #define RECV_FLAGS  MSG_WAITALL
   48 
   49     #define sphSockClose(_sock) ::close(_sock)
   50 #else
   51     // Windows-specific
   52     #include <io.h>
   53     #define strcasecmp  stricmp
   54     #define snprintf    _snprintf
   55 
   56     #define RECV_FLAGS  0
   57 
   58     #define sphSockClose(_sock) ::closesocket(_sock)
   59 #endif
   60 
   61 #include <ctype.h>
   62 #include "ha_sphinx.h"
   63 
   64 #ifndef MSG_WAITALL
   65 #define MSG_WAITALL 0
   66 #endif
   67 
   68 #if _MSC_VER>=1400
   69 #pragma warning(push,4)
   70 #endif
   71 
   72 /////////////////////////////////////////////////////////////////////////////
   73 
   74 /// there might be issues with min() on different platforms (eg. Gentoo, they say)
   75 #define Min(a,b) ((a)<(b)?(a):(b))
   76 
   77 /// unaligned RAM accesses are forbidden on SPARC
   78 #if defined(sparc) || defined(__sparc__)
   79 #define UNALIGNED_RAM_ACCESS 0
   80 #else
   81 #define UNALIGNED_RAM_ACCESS 1
   82 #endif
   83 
   84 
   85 #if UNALIGNED_RAM_ACCESS
   86 
   87 /// pass-through wrapper
   88 template < typename T > inline T sphUnalignedRead ( const T & tRef )
   89 {
   90     return tRef;
   91 }
   92 
   93 /// pass-through wrapper
   94 template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
   95 {
   96     *(T*)pPtr = tVal;
   97 }
   98 
   99 #else
  100 
  101 /// unaligned read wrapper for some architectures (eg. SPARC)
  102 template < typename T >
  103 inline T sphUnalignedRead ( const T & tRef )
  104 {
  105     T uTmp;
  106     byte * pSrc = (byte *) &tRef;
  107     byte * pDst = (byte *) &uTmp;
  108     for ( int i=0; i<(int)sizeof(T); i++ )
  109         *pDst++ = *pSrc++;
  110     return uTmp;
  111 }
  112 
  113 /// unaligned write wrapper for some architectures (eg. SPARC)
  114 template < typename T >
  115 void sphUnalignedWrite ( void * pPtr, const T & tVal )
  116 {
  117     byte * pDst = (byte *) pPtr;
  118     byte * pSrc = (byte *) &tVal;
  119     for ( int i=0; i<(int)sizeof(T); i++ )
  120         *pDst++ = *pSrc++;
  121 }
  122 
  123 #endif
  124 
  125 #if MYSQL_VERSION_ID>=50515
  126 
  127 #define sphinx_hash_init my_hash_init
  128 #define sphinx_hash_free my_hash_free
  129 #define sphinx_hash_search my_hash_search
  130 #define sphinx_hash_delete my_hash_delete
  131 
  132 #else
  133 
  134 #define sphinx_hash_init hash_init
  135 #define sphinx_hash_free hash_free
  136 #define sphinx_hash_search hash_search
  137 #define sphinx_hash_delete hash_delete
  138 
  139 #endif
  140 
  141 /////////////////////////////////////////////////////////////////////////////
  142 
  143 // FIXME! make this all dynamic
  144 #define SPHINXSE_MAX_FILTERS        32
  145 
  146 #define SPHINXAPI_DEFAULT_HOST      "127.0.0.1"
  147 #define SPHINXAPI_DEFAULT_PORT      9312
  148 #define SPHINXAPI_DEFAULT_INDEX     "*"
  149 
  150 #define SPHINXQL_DEFAULT_PORT       9306
  151 
  152 #define SPHINXSE_SYSTEM_COLUMNS     3
  153 
  154 #define SPHINXSE_MAX_ALLOC          (16*1024*1024)
  155 #define SPHINXSE_MAX_KEYWORDSTATS   4096
  156 
  157 #define SPHINXSE_VERSION            "2.2.11-release"
  158 
  159 // FIXME? the following is cut-n-paste from sphinx.h and searchd.cpp
  160 // cut-n-paste is somewhat simpler that adding dependencies however..
  161 
  162 enum
  163 {
  164     SPHINX_SEARCHD_PROTO    = 1,
  165     SEARCHD_COMMAND_SEARCH  = 0,
  166     VER_COMMAND_SEARCH      = 0x119,
  167 };
  168 
  169 /// search query sorting orders
  170 enum ESphSortOrder
  171 {
  172     SPH_SORT_RELEVANCE      = 0,    ///< sort by document relevance desc, then by date
  173     SPH_SORT_ATTR_DESC      = 1,    ///< sort by document date desc, then by relevance desc
  174     SPH_SORT_ATTR_ASC       = 2,    ///< sort by document date asc, then by relevance desc
  175     SPH_SORT_TIME_SEGMENTS  = 3,    ///< sort by time segments (hour/day/week/etc) desc, then by relevance desc
  176     SPH_SORT_EXTENDED       = 4,    ///< sort by SQL-like expression (eg. "@relevance DESC, price ASC, @id DESC")
  177     SPH_SORT_EXPR           = 5,    ///< sort by expression
  178 
  179     SPH_SORT_TOTAL
  180 };
  181 
  182 /// search query matching mode
  183 enum ESphMatchMode
  184 {
  185     SPH_MATCH_ALL = 0,          ///< match all query words
  186     SPH_MATCH_ANY,              ///< match any query word
  187     SPH_MATCH_PHRASE,           ///< match this exact phrase
  188     SPH_MATCH_BOOLEAN,          ///< match this boolean query
  189     SPH_MATCH_EXTENDED,         ///< match this extended query
  190     SPH_MATCH_FULLSCAN,         ///< match all document IDs w/o fulltext query, apply filters
  191     SPH_MATCH_EXTENDED2,        ///< extended engine V2
  192 
  193     SPH_MATCH_TOTAL
  194 };
  195 
  196 /// search query relevance ranking mode
  197 enum ESphRankMode
  198 {
  199     SPH_RANK_PROXIMITY_BM25     = 0,    ///< default mode, phrase proximity major factor and BM25 minor one
  200     SPH_RANK_BM25               = 1,    ///< statistical mode, BM25 ranking only (faster but worse quality)
  201     SPH_RANK_NONE               = 2,    ///< no ranking, all matches get a weight of 1
  202     SPH_RANK_WORDCOUNT          = 3,    ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
  203     SPH_RANK_PROXIMITY          = 4,    ///< phrase proximity
  204     SPH_RANK_MATCHANY           = 5,    ///< emulate old match-any weighting
  205     SPH_RANK_FIELDMASK          = 6,    ///< sets bits where there were matches
  206     SPH_RANK_SPH04              = 7,    ///< codename SPH04, phrase proximity + bm25 + head/exact boost
  207     SPH_RANK_EXPR               = 8,    ///< expression based ranker
  208 
  209     SPH_RANK_TOTAL,
  210     SPH_RANK_DEFAULT            = SPH_RANK_PROXIMITY_BM25
  211 };
  212 
  213 /// search query grouping mode
  214 enum ESphGroupBy
  215 {
  216     SPH_GROUPBY_DAY     = 0,    ///< group by day
  217     SPH_GROUPBY_WEEK    = 1,    ///< group by week
  218     SPH_GROUPBY_MONTH   = 2,    ///< group by month
  219     SPH_GROUPBY_YEAR    = 3,    ///< group by year
  220     SPH_GROUPBY_ATTR    = 4     ///< group by attribute value
  221 };
  222 
  223 /// known attribute types
  224 enum
  225 {
  226     SPH_ATTR_NONE       = 0,            ///< not an attribute at all
  227     SPH_ATTR_INTEGER    = 1,            ///< this attr is just an integer
  228     SPH_ATTR_TIMESTAMP  = 2,            ///< this attr is a timestamp
  229     SPH_ATTR_ORDINAL    = 3,            ///< this attr is an ordinal string number (integer at search time, specially handled at indexing time)
  230     SPH_ATTR_BOOL       = 4,            ///< this attr is a boolean bit field
  231     SPH_ATTR_FLOAT      = 5,
  232     SPH_ATTR_BIGINT     = 6,
  233     SPH_ATTR_STRING     = 7,            ///< string (binary; in-memory)
  234 
  235     SPH_ATTR_UINT32SET      = 0x40000001UL, ///< this attr is multiple int32 values (0 or more)
  236     SPH_ATTR_UINT64SET      = 0x40000002UL  ///< this attr is multiple int64 values (0 or more)
  237 };
  238 
  239 /// known answers
  240 enum
  241 {
  242     SEARCHD_OK      = 0,    ///< general success, command-specific reply follows
  243     SEARCHD_ERROR   = 1,    ///< general failure, error message follows
  244     SEARCHD_RETRY   = 2,    ///< temporary failure, error message follows, client should retry later
  245     SEARCHD_WARNING = 3     ///< general success, warning message and command-specific reply follow
  246 };
  247 
  248 //////////////////////////////////////////////////////////////////////////////
  249 
  250 #define SPHINX_DEBUG_OUTPUT     0
  251 #define SPHINX_DEBUG_CALLS      0
  252 
  253 #include <stdarg.h>
  254 
  255 #if SPHINX_DEBUG_OUTPUT
  256 inline void SPH_DEBUG ( const char * format, ... )
  257 {
  258     va_list ap;
  259     va_start ( ap, format );
  260     fprintf ( stderr, "SphinxSE: " );
  261     vfprintf ( stderr, format, ap );
  262     fprintf ( stderr, "\n" );
  263     va_end ( ap );
  264 }
  265 #else
  266 inline void SPH_DEBUG ( const char *, ... ) {}
  267 #endif
  268 
  269 #if SPHINX_DEBUG_CALLS
  270 
  271 #define SPH_ENTER_FUNC() { SPH_DEBUG ( "enter %s", __FUNCTION__ ); }
  272 #define SPH_ENTER_METHOD() { SPH_DEBUG ( "enter %s(this=%08x)", __FUNCTION__, this ); }
  273 #define SPH_RET(_arg) { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return _arg; }
  274 #define SPH_VOID_RET() { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return; }
  275 
  276 #else
  277 
  278 #define SPH_ENTER_FUNC()
  279 #define SPH_ENTER_METHOD()
  280 #define SPH_RET(_arg) { return(_arg); }
  281 #define SPH_VOID_RET() { return; }
  282 
  283 #endif
  284 
  285 
  286 #define SafeDelete(_arg)        { if ( _arg ) delete ( _arg );      (_arg) = NULL; }
  287 #define SafeDeleteArray(_arg)   { if ( _arg ) delete [] ( _arg );   (_arg) = NULL; }
  288 
  289 //////////////////////////////////////////////////////////////////////////////
  290 
  291 /// per-table structure that will be shared among all open Sphinx SE handlers
  292 struct CSphSEShare
  293 {
  294     pthread_mutex_t m_tMutex;
  295     THR_LOCK        m_tLock;
  296 
  297     char *          m_sTable;
  298     char *          m_sScheme;      ///< our connection string
  299     char *          m_sHost;        ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
  300     char *          m_sSocket;      ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
  301     char *          m_sIndex;       ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
  302     ushort          m_iPort;
  303     bool            m_bSphinxQL;    ///< is this read-only SphinxAPI table, or write-only SphinxQL table?
  304     uint            m_iTableNameLen;
  305     uint            m_iUseCount;
  306 #if MYSQL_VERSION_ID<50610
  307     CHARSET_INFO *  m_pTableQueryCharset;
  308 #else   
  309     const CHARSET_INFO *    m_pTableQueryCharset;
  310 #endif  
  311 
  312     int                 m_iTableFields;
  313     char **             m_sTableField;
  314     enum_field_types *  m_eTableFieldType;
  315 
  316     CSphSEShare ()
  317         : m_sTable ( NULL )
  318         , m_sScheme ( NULL )
  319         , m_sHost ( NULL )
  320         , m_sSocket ( NULL )
  321         , m_sIndex ( NULL )
  322         , m_iPort ( 0 )
  323         , m_bSphinxQL ( false )
  324         , m_iTableNameLen ( 0 )
  325         , m_iUseCount ( 1 )
  326         , m_pTableQueryCharset ( NULL )
  327 
  328         , m_iTableFields ( 0 )
  329         , m_sTableField ( NULL )
  330         , m_eTableFieldType ( NULL )
  331     {
  332         thr_lock_init ( &m_tLock );
  333         pthread_mutex_init ( &m_tMutex, MY_MUTEX_INIT_FAST );
  334     }
  335 
  336     ~CSphSEShare ()
  337     {
  338         pthread_mutex_destroy ( &m_tMutex );
  339         thr_lock_delete ( &m_tLock );
  340 
  341         SafeDeleteArray ( m_sTable );
  342         SafeDeleteArray ( m_sScheme );
  343         ResetTable ();
  344     }
  345 
  346     void ResetTable ()
  347     {
  348         for ( int i=0; i<m_iTableFields; i++ )
  349             SafeDeleteArray ( m_sTableField[i] );
  350         SafeDeleteArray ( m_sTableField );
  351         SafeDeleteArray ( m_eTableFieldType );
  352     }
  353 };
  354 
  355 /// schema attribute
  356 struct CSphSEAttr
  357 {
  358     char *          m_sName;        ///< attribute name (received from Sphinx)
  359     uint32          m_uType;        ///< attribute type (received from Sphinx)
  360     int             m_iField;       ///< field index in current table (-1 if none)
  361 
  362     CSphSEAttr()
  363         : m_sName ( NULL )
  364         , m_uType ( SPH_ATTR_NONE )
  365         , m_iField ( -1 )
  366     {}
  367 
  368     ~CSphSEAttr ()
  369     {
  370         SafeDeleteArray ( m_sName );
  371     }
  372 };
  373 
  374 /// word stats
  375 struct CSphSEWordStats
  376 {
  377     char *          m_sWord;
  378     int             m_iDocs;
  379     int             m_iHits;
  380 
  381     CSphSEWordStats ()
  382         : m_sWord ( NULL )
  383         , m_iDocs ( 0 )
  384         , m_iHits ( 0 )
  385     {}
  386 
  387     ~CSphSEWordStats ()
  388     {
  389         SafeDeleteArray ( m_sWord );
  390     }
  391 };
  392 
  393 /// request stats
  394 struct CSphSEStats
  395 {
  396 public:
  397     int                 m_iMatchesTotal;
  398     int                 m_iMatchesFound;
  399     int                 m_iQueryMsec;
  400     int                 m_iWords;
  401     CSphSEWordStats *   m_dWords;
  402     bool                m_bLastError;
  403     char                m_sLastMessage[1024];
  404 
  405     CSphSEStats()
  406         : m_dWords ( NULL )
  407     {
  408         Reset ();
  409     }
  410 
  411     void Reset ()
  412     {
  413         m_iMatchesTotal = 0;
  414         m_iMatchesFound = 0;
  415         m_iQueryMsec = 0;
  416         m_iWords = 0;
  417         SafeDeleteArray ( m_dWords );
  418         m_bLastError = false;
  419         m_sLastMessage[0] = '\0';
  420     }
  421 
  422     ~CSphSEStats()
  423     {
  424         Reset ();
  425     }
  426 };
  427 
  428 /// thread local storage
  429 struct CSphSEThreadTable
  430 {
  431     static const int    MAX_QUERY_LEN   = 262144; // 256k should be enough, right?
  432 
  433     bool                m_bStats;
  434     CSphSEStats         m_tStats;
  435 
  436     bool                m_bQuery;
  437     char                m_sQuery[MAX_QUERY_LEN];
  438 
  439 #if MYSQL_VERSION_ID<50610
  440     CHARSET_INFO *      m_pQueryCharset;
  441 #else
  442     const CHARSET_INFO *        m_pQueryCharset;
  443 #endif  
  444 
  445     bool                m_bReplace;     ///< are we doing an INSERT or REPLACE
  446 
  447     bool                m_bCondId;      ///< got a value from condition pushdown
  448     longlong            m_iCondId;      ///< value acquired from id=value condition pushdown
  449     bool                m_bCondDone;    ///< index_read() is now over
  450 
  451     const ha_sphinx *   m_pHandler;
  452     CSphSEThreadTable * m_pTableNext;
  453 
  454     CSphSEThreadTable ( const ha_sphinx * pHandler )
  455         : m_bStats ( false )
  456         , m_bQuery ( false )
  457         , m_pQueryCharset ( NULL )
  458         , m_bReplace ( false )
  459         , m_bCondId ( false )
  460         , m_iCondId ( 0 )
  461         , m_bCondDone ( false )
  462         , m_pHandler ( pHandler )
  463         , m_pTableNext ( NULL )
  464     {}
  465 };
  466 
  467 
  468 struct CSphTLS
  469 {
  470     CSphSEThreadTable * m_pHeadTable;
  471 
  472     explicit CSphTLS ( const ha_sphinx * pHandler )
  473     {
  474         m_pHeadTable = new CSphSEThreadTable ( pHandler );
  475     }
  476 
  477     ~CSphTLS()
  478     {
  479         CSphSEThreadTable * pCur = m_pHeadTable;
  480         while ( pCur )
  481         {
  482             CSphSEThreadTable * pNext = pCur->m_pTableNext;
  483             SafeDelete ( pCur );
  484             pCur = pNext;
  485         }
  486     }
  487 };
  488 
  489 
  490 /// filter types
  491 enum ESphFilter
  492 {
  493     SPH_FILTER_VALUES       = 0,    ///< filter by integer values set
  494     SPH_FILTER_RANGE        = 1,    ///< filter by integer range
  495     SPH_FILTER_FLOATRANGE   = 2     ///< filter by float range
  496 };
  497 
  498 
  499 /// search query filter
  500 struct CSphSEFilter
  501 {
  502 public:
  503     ESphFilter      m_eType;
  504     char *          m_sAttrName;
  505     longlong        m_uMinValue;
  506     longlong        m_uMaxValue;
  507     float           m_fMinValue;
  508     float           m_fMaxValue;
  509     int             m_iValues;
  510     longlong *      m_pValues;
  511     int             m_bExclude;
  512 
  513 public:
  514     CSphSEFilter ()
  515         : m_eType ( SPH_FILTER_VALUES )
  516         , m_sAttrName ( NULL )
  517         , m_uMinValue ( 0 )
  518         , m_uMaxValue ( UINT_MAX )
  519         , m_fMinValue ( 0.0f )
  520         , m_fMaxValue ( 0.0f )
  521         , m_iValues ( 0 )
  522         , m_pValues ( NULL )
  523         , m_bExclude ( 0 )
  524     {
  525     }
  526 
  527     ~CSphSEFilter ()
  528     {
  529         SafeDeleteArray ( m_pValues );
  530     }
  531 };
  532 
  533 
  534 /// float vs dword conversion
  535 inline uint32 sphF2DW ( float f )   { union { float f; uint32 d; } u; u.f = f; return u.d; }
  536 
  537 /// dword vs float conversion
  538 inline float sphDW2F ( uint32 d )   { union { float f; uint32 d; } u; u.d = d; return u.f; }
  539 
  540 
  541 /// client-side search query
  542 struct CSphSEQuery
  543 {
  544 public:
  545     const char *    m_sHost;
  546     int             m_iPort;
  547 
  548 private:
  549     char *          m_sQueryBuffer;
  550 
  551     const char *    m_sIndex;
  552     int             m_iOffset;
  553     int             m_iLimit;
  554 
  555     bool            m_bQuery;
  556     char *          m_sQuery;
  557     uint32 *        m_pWeights;
  558     int             m_iWeights;
  559     ESphMatchMode   m_eMode;
  560     ESphRankMode    m_eRanker;
  561     char *          m_sRankExpr;
  562     ESphSortOrder   m_eSort;
  563     char *          m_sSortBy;
  564     int             m_iMaxMatches;
  565     int             m_iMaxQueryTime;
  566     uint32          m_iMinID;
  567     uint32          m_iMaxID;
  568 
  569     int             m_iFilters;
  570     CSphSEFilter    m_dFilters[SPHINXSE_MAX_FILTERS];
  571 
  572     ESphGroupBy     m_eGroupFunc;
  573     char *          m_sGroupBy;
  574     char *          m_sGroupSortBy;
  575     int             m_iCutoff;
  576     int             m_iRetryCount;
  577     int             m_iRetryDelay;
  578     char *          m_sGroupDistinct;                           ///< points to query buffer; do NOT delete
  579     int             m_iIndexWeights;
  580     char *          m_sIndexWeight[SPHINXSE_MAX_FILTERS];       ///< points to query buffer; do NOT delete
  581     int             m_iIndexWeight[SPHINXSE_MAX_FILTERS];
  582     int             m_iFieldWeights;
  583     char *          m_sFieldWeight[SPHINXSE_MAX_FILTERS];       ///< points to query buffer; do NOT delete
  584     int             m_iFieldWeight[SPHINXSE_MAX_FILTERS];
  585 
  586     bool            m_bGeoAnchor;
  587     char *          m_sGeoLatAttr;
  588     char *          m_sGeoLongAttr;
  589     float           m_fGeoLatitude;
  590     float           m_fGeoLongitude;
  591 
  592     char *          m_sComment;
  593     char *          m_sSelect;
  594 
  595     struct Override_t
  596     {
  597         union Value_t
  598         {
  599             uint32      m_uValue;
  600             longlong    m_iValue64;
  601             float       m_fValue;
  602         };
  603         char *                      m_sName; ///< points to query buffer
  604         int                         m_iType;
  605         Dynamic_array<ulonglong>    m_dIds;
  606         Dynamic_array<Value_t>      m_dValues;
  607     };
  608     Dynamic_array<Override_t *> m_dOverrides;
  609 
  610 public:
  611     char            m_sParseError[256];
  612 
  613 public:
  614     CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex );
  615     ~CSphSEQuery ();
  616 
  617     bool            Parse ();
  618     int             BuildRequest ( char ** ppBuffer );
  619 
  620 protected:
  621     char *          m_pBuf;
  622     char *          m_pCur;
  623     int             m_iBufLeft;
  624     bool            m_bBufOverrun;
  625 
  626     template < typename T > int ParseArray ( T ** ppValues, const char * sValue );
  627     bool            ParseField ( char * sField );
  628 
  629     void            SendBytes ( const void * pBytes, int iBytes );
  630     void            SendWord ( short int v )        { v = ntohs(v); SendBytes ( &v, sizeof(v) ); }
  631     void            SendInt ( int v )               { v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
  632     void            SendDword ( uint v )            { v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
  633     void            SendUint64 ( ulonglong v )      { SendDword ( (uint)(v>>32) ); SendDword ( (uint)(v&0xFFFFFFFFUL) ); }
  634     void            SendString ( const char * v )   { int iLen = strlen(v); SendDword(iLen); SendBytes ( v, iLen ); }
  635     void            SendFloat ( float v )           { SendDword ( sphF2DW(v) ); }
  636 };
  637 
  638 template int CSphSEQuery::ParseArray<uint32> ( uint32 **, const char * );
  639 template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
  640 
  641 //////////////////////////////////////////////////////////////////////////////
  642 
  643 #if MYSQL_VERSION_ID>50100
  644 
  645 #if MYSQL_VERSION_ID<50114
  646 #error Sphinx SE requires MySQL 5.1.14 or higher if compiling for 5.1.x series!
  647 #endif
  648 
  649 static handler *    sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
  650 static int          sphinx_init_func ( void * p );
  651 static int          sphinx_close_connection ( handlerton * hton, THD * thd );
  652 static int          sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
  653 static bool         sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
  654 
  655 #else
  656 
  657 static bool         sphinx_init_func_for_handlerton ();
  658 static int          sphinx_close_connection ( THD * thd );
  659 bool                sphinx_show_status ( THD * thd );
  660 
  661 #endif // >50100
  662 
  663 //////////////////////////////////////////////////////////////////////////////
  664 
  665 static const char   sphinx_hton_name[]      = "SPHINX";
  666 static const char   sphinx_hton_comment[]   = "Sphinx storage engine " SPHINXSE_VERSION;
  667 
  668 #if MYSQL_VERSION_ID<50100
  669 handlerton sphinx_hton =
  670 {
  671     #ifdef MYSQL_HANDLERTON_INTERFACE_VERSION
  672     MYSQL_HANDLERTON_INTERFACE_VERSION,
  673     #endif
  674     sphinx_hton_name,
  675     SHOW_OPTION_YES,
  676     sphinx_hton_comment,
  677     DB_TYPE_SPHINX_DB,
  678     sphinx_init_func_for_handlerton,
  679     0,                          // slot
  680     0,                          // savepoint size
  681     sphinx_close_connection,    // close_connection
  682     NULL,   // savepoint
  683     NULL,   // rollback to savepoint
  684     NULL,   // release savepoint
  685     NULL,   // commit
  686     NULL,   // rollback
  687     NULL,   // prepare
  688     NULL,   // recover
  689     NULL,   // commit_by_xid
  690     NULL,   // rollback_by_xid
  691     NULL,   // create_cursor_read_view
  692     NULL,   // set_cursor_read_view
  693     NULL,   // close_cursor_read_view
  694     HTON_CAN_RECREATE
  695 };
  696 #else
  697 static handlerton * sphinx_hton_ptr = NULL;
  698 #endif
  699 
  700 //////////////////////////////////////////////////////////////////////////////
  701 
  702 // variables for Sphinx shared methods
  703 pthread_mutex_t     sphinx_mutex;       // mutex to init the hash
  704 static int          sphinx_init = 0;    // flag whether the hash was initialized
  705 static HASH         sphinx_open_tables; // hash used to track open tables
  706 
  707 //////////////////////////////////////////////////////////////////////////////
  708 // INITIALIZATION AND SHUTDOWN
  709 //////////////////////////////////////////////////////////////////////////////
  710 
  711 // hashing function
  712 #if MYSQL_VERSION_ID>=50120
  713 typedef size_t GetKeyLength_t;
  714 #else
  715 typedef uint GetKeyLength_t;
  716 #endif
  717 
  718 static byte * sphinx_get_key ( const byte * pSharePtr, GetKeyLength_t * pLength, my_bool )
  719 {
  720     CSphSEShare * pShare = (CSphSEShare *) pSharePtr;
  721     *pLength = (size_t) pShare->m_iTableNameLen;
  722     return (byte*) pShare->m_sTable;
  723 }
  724 
  725 #if MYSQL_VERSION_ID<50100
  726 static int sphinx_init_func ( void * ) // to avoid unused arg warning
  727 #else
  728 static int sphinx_init_func ( void * p )
  729 #endif
  730 {
  731     SPH_ENTER_FUNC();
  732     if ( !sphinx_init )
  733     {
  734         sphinx_init = 1;
  735         void ( pthread_mutex_init ( &sphinx_mutex, MY_MUTEX_INIT_FAST ) );
  736         sphinx_hash_init ( &sphinx_open_tables, system_charset_info, 32, 0, 0,
  737             sphinx_get_key, 0, 0 );
  738 
  739         #if MYSQL_VERSION_ID > 50100
  740         handlerton * hton = (handlerton*) p;
  741         hton->state = SHOW_OPTION_YES;
  742         hton->db_type = DB_TYPE_FIRST_DYNAMIC;
  743         hton->create = sphinx_create_handler;
  744         hton->close_connection = sphinx_close_connection;
  745         hton->show_status = sphinx_show_status;
  746         hton->panic = sphinx_panic;
  747         hton->flags = HTON_CAN_RECREATE;
  748         #endif
  749     }
  750     SPH_RET(0);
  751 }
  752 
  753 
  754 #if MYSQL_VERSION_ID<50100
  755 static bool sphinx_init_func_for_handlerton ()
  756 {
  757     return sphinx_init_func ( &sphinx_hton );
  758 }
  759 #endif
  760 
  761 
  762 #if MYSQL_VERSION_ID>50100
  763 
  764 static int sphinx_close_connection ( handlerton * hton, THD * thd )
  765 {
  766     // deallocate common handler data
  767     SPH_ENTER_FUNC();
  768     void ** tmp = thd_ha_data ( thd, hton );
  769     CSphTLS * pTls = (CSphTLS *) (*tmp);
  770     SafeDelete ( pTls );
  771     *tmp = NULL;
  772     SPH_RET(0);
  773 }
  774 
  775 
  776 static int sphinx_done_func ( void * )
  777 {
  778     SPH_ENTER_FUNC();
  779 
  780     int error = 0;
  781     if ( sphinx_init )
  782     {
  783         sphinx_init = 0;
  784         if ( sphinx_open_tables.records )
  785             error = 1;
  786         sphinx_hash_free ( &sphinx_open_tables );
  787         pthread_mutex_destroy ( &sphinx_mutex );
  788     }
  789 
  790     SPH_RET(0);
  791 }
  792 
  793 
  794 static int sphinx_panic ( handlerton * hton, enum ha_panic_function )
  795 {
  796     return sphinx_done_func ( hton );
  797 }
  798 
  799 #else
  800 
  801 static int sphinx_close_connection ( THD * thd )
  802 {
  803     // deallocate common handler data
  804     SPH_ENTER_FUNC();
  805     CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
  806     SafeDelete ( pTls );
  807     thd->ha_data[sphinx_hton.slot] = NULL;
  808     SPH_RET(0);
  809 }
  810 
  811 #endif // >50100
  812 
  813 //////////////////////////////////////////////////////////////////////////////
  814 // SHOW STATUS
  815 //////////////////////////////////////////////////////////////////////////////
  816 
  817 #if MYSQL_VERSION_ID>50100
  818 static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print,
  819     enum ha_stat_type )
  820 #else
  821 bool sphinx_show_status ( THD * thd )
  822 #endif
  823 {
  824     SPH_ENTER_FUNC();
  825 
  826 #if MYSQL_VERSION_ID<50100
  827     Protocol * protocol = thd->protocol;
  828     List<Item> field_list;
  829 #endif
  830 
  831     char buf1[IO_SIZE];
  832     uint buf1len;
  833     char buf2[IO_SIZE];
  834     uint buf2len = 0;
  835     String words;
  836 
  837     buf1[0] = '\0';
  838     buf2[0] = '\0';
  839 
  840 
  841 #if MYSQL_VERSION_ID>50100
  842     // 5.1.x style stats
  843     CSphTLS * pTls = (CSphTLS*) ( *thd_ha_data ( thd, hton ) );
  844 
  845 #define LOC_STATS(_key,_keylen,_val,_vallen) \
  846     stat_print ( thd, sphinx_hton_name, strlen(sphinx_hton_name), _key, _keylen, _val, _vallen );
  847 
  848 #else
  849     // 5.0.x style stats
  850     if ( have_sphinx_db!=SHOW_OPTION_YES )
  851     {
  852         my_message ( ER_NOT_SUPPORTED_YET,
  853             "failed to call SHOW SPHINX STATUS: --skip-sphinx was specified",
  854             MYF(0) );
  855         SPH_RET(TRUE);
  856     }
  857     CSphTLS * pTls = (CSphTLS*) thd->ha_data[sphinx_hton.slot];
  858 
  859     field_list.push_back ( new Item_empty_string ( "Type", 10 ) );
  860     field_list.push_back ( new Item_empty_string ( "Name", FN_REFLEN ) );
  861     field_list.push_back ( new Item_empty_string ( "Status", 10 ) );
  862     if ( protocol->send_fields ( &field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF ) )
  863         SPH_RET(TRUE);
  864 
  865 #define LOC_STATS(_key,_keylen,_val,_vallen) \
  866     protocol->prepare_for_resend (); \
  867     protocol->store ( "SPHINX", 6, system_charset_info ); \
  868     protocol->store ( _key, _keylen, system_charset_info ); \
  869     protocol->store ( _val, _vallen, system_charset_info ); \
  870     if ( protocol->write() ) \
  871         SPH_RET(TRUE);
  872 
  873 #endif
  874 
  875 
  876     // show query stats
  877     if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
  878     {
  879         const CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
  880         buf1len = my_snprintf ( buf1, sizeof(buf1),
  881             "total: %d, total found: %d, time: %d, words: %d",
  882             pStats->m_iMatchesTotal, pStats->m_iMatchesFound, pStats->m_iQueryMsec, pStats->m_iWords );
  883 
  884         LOC_STATS ( "stats", 5, buf1, buf1len );
  885 
  886         if ( pStats->m_iWords )
  887         {
  888             for ( int i=0; i<pStats->m_iWords; i++ )
  889             {
  890                 CSphSEWordStats & tWord = pStats->m_dWords[i];
  891                 buf2len = my_snprintf ( buf2, sizeof(buf2), "%s%s:%d:%d ",
  892                     buf2, tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
  893             }
  894 
  895             // convert it if we can
  896             const char * sWord = buf2;
  897             int iWord = buf2len;
  898 
  899             String sBuf3;
  900             if ( pTls->m_pHeadTable->m_pQueryCharset )
  901             {
  902                 uint iErrors;
  903                 sBuf3.copy ( buf2, buf2len, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
  904                 sWord = sBuf3.c_ptr();
  905                 iWord = sBuf3.length();
  906             }
  907 
  908             LOC_STATS ( "words", 5, sWord, iWord );
  909         }
  910     }
  911 
  912     // show last error or warning (either in addition to stats, or on their own)
  913     if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_tStats.m_sLastMessage && pTls->m_pHeadTable->m_tStats.m_sLastMessage[0] )
  914     {
  915         const char * sMessageType = pTls->m_pHeadTable->m_tStats.m_bLastError ? "error" : "warning";
  916 
  917         LOC_STATS (
  918             sMessageType, strlen ( sMessageType ),
  919             pTls->m_pHeadTable->m_tStats.m_sLastMessage, strlen ( pTls->m_pHeadTable->m_tStats.m_sLastMessage ) );
  920 
  921     } else
  922     {
  923         // well, nothing to show just yet
  924 #if MYSQL_VERSION_ID < 50100
  925         LOC_STATS ( "stats", 5, "no query has been executed yet", sizeof("no query has been executed yet")-1 );
  926 #endif
  927     }
  928 
  929 #if MYSQL_VERSION_ID < 50100
  930     send_eof(thd);
  931 #endif
  932 
  933     SPH_RET(FALSE);
  934 }
  935 
  936 //////////////////////////////////////////////////////////////////////////////
  937 // HELPERS
  938 //////////////////////////////////////////////////////////////////////////////
  939 
  940 static char * sphDup ( const char * sSrc, int iLen=-1 )
  941 {
  942     if ( !sSrc )
  943         return NULL;
  944 
  945     if ( iLen<0 )
  946         iLen = strlen(sSrc);
  947 
  948     char * sRes = new char [ 1+iLen ];
  949     memcpy ( sRes, sSrc, iLen );
  950     sRes[iLen] = '\0';
  951     return sRes;
  952 }
  953 
  954 
  955 static void sphLogError ( const char * sFmt, ... )
  956 {
  957     // emit timestamp
  958 #ifdef __WIN__
  959     SYSTEMTIME t;
  960     GetLocalTime ( &t );
  961 
  962     fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
  963         (int)t.wYear % 100, (int)t.wMonth, (int)t.wDay,
  964         (int)t.wHour, (int)t.wMinute, (int)t.wSecond );
  965 #else
  966     // Unix version
  967     time_t tStamp;
  968     time ( &tStamp );
  969 
  970     struct tm * pParsed;
  971 #ifdef HAVE_LOCALTIME_R
  972     struct tm tParsed;
  973     localtime_r ( &tStamp, &tParsed );
  974     pParsed = &tParsed;
  975 #else
  976     pParsed = localtime ( &tStamp );
  977 #endif // HAVE_LOCALTIME_R
  978 
  979     fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
  980         pParsed->tm_year % 100, pParsed->tm_mon + 1, pParsed->tm_mday,
  981         pParsed->tm_hour, pParsed->tm_min, pParsed->tm_sec);
  982 #endif // __WIN__
  983 
  984     // emit message
  985     va_list ap;
  986     va_start ( ap, sFmt );
  987     vfprintf ( stderr, sFmt, ap );
  988     va_end ( ap );
  989 
  990     // emit newline
  991     fprintf ( stderr, "\n" );
  992 }
  993 
  994 
  995 
  996 // the following scheme variants are recognized
  997 //
  998 // sphinx://host[:port]/index
  999 // sphinxql://host[:port]/index
 1000 // unix://unix/domain/socket[:index]
 1001 static bool ParseUrl ( CSphSEShare * share, TABLE * table, bool bCreate )
 1002 {
 1003     SPH_ENTER_FUNC();
 1004 
 1005     if ( share )
 1006     {
 1007         // check incoming stuff
 1008         if ( !table )
 1009         {
 1010             sphLogError ( "table==NULL in ParseUrl()" );
 1011             return false;
 1012         }
 1013         if ( !table->s )
 1014         {
 1015             sphLogError ( "(table->s)==NULL in ParseUrl()" );
 1016             return false;
 1017         }
 1018 
 1019         // free old stuff
 1020         share->ResetTable ();
 1021 
 1022         // fill new stuff
 1023         share->m_iTableFields = table->s->fields;
 1024         if ( share->m_iTableFields )
 1025         {
 1026             share->m_sTableField = new char * [ share->m_iTableFields ];
 1027             share->m_eTableFieldType = new enum_field_types [ share->m_iTableFields ];
 1028 
 1029             for ( int i=0; i<share->m_iTableFields; i++ )
 1030             {
 1031                 share->m_sTableField[i] = sphDup ( table->field[i]->field_name );
 1032                 share->m_eTableFieldType[i] = table->field[i]->type();
 1033             }
 1034         }
 1035     }
 1036 
 1037     // defaults
 1038     bool bOk = true;
 1039     bool bQL = false;
 1040     char * sScheme = NULL;
 1041     char * sHost = SPHINXAPI_DEFAULT_HOST;
 1042     char * sIndex = SPHINXAPI_DEFAULT_INDEX;
 1043     int iPort = SPHINXAPI_DEFAULT_PORT;
 1044 
 1045     // parse connection string, if any
 1046     while ( table->s->connect_string.length!=0 )
 1047     {
 1048         sScheme = sphDup ( table->s->connect_string.str, table->s->connect_string.length );
 1049 
 1050         sHost = strstr ( sScheme, "://" );
 1051         if ( !sHost )
 1052         {
 1053             bOk = false;
 1054             break;
 1055         }
 1056         sHost[0] = '\0';
 1057         sHost += 3;
 1058 
 1059         /////////////////////////////
 1060         // sphinxapi via unix socket
 1061         /////////////////////////////
 1062 
 1063         if ( !strcmp ( sScheme, "unix" ) )
 1064         {
 1065             sHost--; // reuse last slash
 1066             iPort = 0;
 1067             if (!( sIndex = strrchr ( sHost, ':' ) ))
 1068                 sIndex = SPHINXAPI_DEFAULT_INDEX;
 1069             else
 1070             {
 1071                 *sIndex++ = '\0';
 1072                 if ( !*sIndex )
 1073                     sIndex = SPHINXAPI_DEFAULT_INDEX;
 1074             }
 1075             bOk = true;
 1076             break;
 1077         }
 1078 
 1079         /////////////////////
 1080         // sphinxapi via tcp
 1081         /////////////////////
 1082 
 1083         if ( !strcmp ( sScheme, "sphinx" ) )
 1084         {
 1085             char * sPort = strchr ( sHost, ':' );
 1086             if ( sPort )
 1087             {
 1088                 *sPort++ = '\0';
 1089                 if ( *sPort )
 1090                 {
 1091                     sIndex = strchr ( sPort, '/' );
 1092                     if ( sIndex )
 1093                         *sIndex++ = '\0';
 1094                     else
 1095                         sIndex = SPHINXAPI_DEFAULT_INDEX;
 1096 
 1097                     iPort = atoi(sPort);
 1098                     if ( !iPort )
 1099                         iPort = SPHINXAPI_DEFAULT_PORT;
 1100                 }
 1101             } else
 1102             {
 1103                 sIndex = strchr ( sHost, '/' );
 1104                 if ( sIndex )
 1105                     *sIndex++ = '\0';
 1106                 else
 1107                     sIndex = SPHINXAPI_DEFAULT_INDEX;
 1108             }
 1109             bOk = true;
 1110             break;
 1111         }
 1112 
 1113         ////////////
 1114         // sphinxql
 1115         ////////////
 1116 
 1117         if ( !strcmp ( sScheme, "sphinxql" ) )
 1118         {
 1119             bQL = true;
 1120             iPort = SPHINXQL_DEFAULT_PORT;
 1121 
 1122             // handle port
 1123             char * sPort = strchr ( sHost, ':' );
 1124             sIndex = sHost; // starting point for index name search
 1125 
 1126             if ( sPort )
 1127             {
 1128                 *sPort++ = '\0';
 1129                 sIndex = sPort;
 1130 
 1131                 iPort = atoi(sPort);
 1132                 if ( !iPort )
 1133                 {
 1134                     bOk = false; // invalid port; can report ER_FOREIGN_DATA_STRING_INVALID
 1135                     break;
 1136                 }
 1137             }
 1138 
 1139             // find index
 1140             sIndex = strchr ( sIndex, '/' );
 1141             if ( sIndex )
 1142                 *sIndex++ = '\0';
 1143 
 1144             // final checks
 1145             // host and index names are required
 1146             bOk = ( sHost && *sHost && sIndex && *sIndex );
 1147             break;
 1148         }
 1149 
 1150         // unknown case
 1151         bOk = false;
 1152         break;
 1153     }
 1154 
 1155     if ( !bOk )
 1156     {
 1157         my_error ( bCreate ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID,
 1158             MYF(0), table->s->connect_string );
 1159     } else
 1160     {
 1161         if ( share )
 1162         {
 1163             SafeDeleteArray ( share->m_sScheme );
 1164             share->m_sScheme = sScheme;
 1165             share->m_sHost = sHost;
 1166             share->m_sIndex = sIndex;
 1167             share->m_iPort = (ushort)iPort;
 1168             share->m_bSphinxQL = bQL;
 1169         }
 1170     }
 1171     if ( !bOk && !share )
 1172         SafeDeleteArray ( sScheme );
 1173 
 1174     SPH_RET(bOk);
 1175 }
 1176 
 1177 
 1178 // Example of simple lock controls. The "share" it creates is structure we will
 1179 // pass to each sphinx handler. Do you have to have one of these? Well, you have
 1180 // pieces that are used for locking, and they are needed to function.
 1181 static CSphSEShare * get_share ( const char * table_name, TABLE * table )
 1182 {
 1183     SPH_ENTER_FUNC();
 1184     pthread_mutex_lock ( &sphinx_mutex );
 1185 
 1186     CSphSEShare * pShare = NULL;
 1187     for ( ;; )
 1188     {
 1189         // check if we already have this share
 1190 #if MYSQL_VERSION_ID>=50120
 1191         pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const uchar *) table_name, strlen(table_name) );
 1192 #else
 1193 #ifdef __WIN__
 1194         pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const byte *) table_name, strlen(table_name) );
 1195 #else
 1196         pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, table_name, strlen(table_name) );
 1197 #endif // win
 1198 #endif // pre-5.1.20
 1199 
 1200         if ( pShare )
 1201         {
 1202             pShare->m_iUseCount++;
 1203             break;
 1204         }
 1205 
 1206         // try to allocate new share
 1207         pShare = new CSphSEShare ();
 1208         if ( !pShare )
 1209             break;
 1210 
 1211         // try to setup it
 1212         if ( !ParseUrl ( pShare, table, false ) )
 1213         {
 1214             SafeDelete ( pShare );
 1215             break;
 1216         }
 1217 
 1218         if ( !pShare->m_bSphinxQL )
 1219             pShare->m_pTableQueryCharset = table->field[2]->charset();
 1220 
 1221         // try to hash it
 1222         pShare->m_iTableNameLen = strlen(table_name);
 1223         pShare->m_sTable = sphDup ( table_name );
 1224         if ( my_hash_insert ( &sphinx_open_tables, (const byte *)pShare ) )
 1225         {
 1226             SafeDelete ( pShare );
 1227             break;
 1228         }
 1229 
 1230         // all seems fine
 1231         break;
 1232     }
 1233 
 1234     pthread_mutex_unlock ( &sphinx_mutex );
 1235     SPH_RET(pShare);
 1236 }
 1237 
 1238 
 1239 // Free lock controls. We call this whenever we close a table. If the table had
 1240 // the last reference to the share then we free memory associated with it.
 1241 static int free_share ( CSphSEShare * pShare )
 1242 {
 1243     SPH_ENTER_FUNC();
 1244     pthread_mutex_lock ( &sphinx_mutex );
 1245 
 1246     if ( !--pShare->m_iUseCount )
 1247     {
 1248         sphinx_hash_delete ( &sphinx_open_tables, (byte *)pShare );
 1249         SafeDelete ( pShare );
 1250     }
 1251 
 1252     pthread_mutex_unlock ( &sphinx_mutex );
 1253     SPH_RET(0);
 1254 }
 1255 
 1256 
 1257 #if MYSQL_VERSION_ID>50100
 1258 static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root )
 1259 {
 1260     sphinx_hton_ptr = hton;
 1261     return new ( mem_root ) ha_sphinx ( hton, table );
 1262 }
 1263 #endif
 1264 
 1265 //////////////////////////////////////////////////////////////////////////////
 1266 // CLIENT-SIDE REQUEST STUFF
 1267 //////////////////////////////////////////////////////////////////////////////
 1268 
 1269 CSphSEQuery::CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex )
 1270     : m_sHost ( "" )
 1271     , m_iPort ( 0 )
 1272     , m_sIndex ( sIndex ? sIndex : "*" )
 1273     , m_iOffset ( 0 )
 1274     , m_iLimit ( 20 )
 1275     , m_bQuery ( false )
 1276     , m_sQuery ( "" )
 1277     , m_pWeights ( NULL )
 1278     , m_iWeights ( 0 )
 1279     , m_eMode ( SPH_MATCH_ALL )
 1280     , m_eRanker ( SPH_RANK_PROXIMITY_BM25 )
 1281     , m_sRankExpr ( NULL )
 1282     , m_eSort ( SPH_SORT_RELEVANCE )
 1283     , m_sSortBy ( "" )
 1284     , m_iMaxMatches ( 1000 )
 1285     , m_iMaxQueryTime ( 0 )
 1286     , m_iMinID ( 0 )
 1287     , m_iMaxID ( 0 )
 1288     , m_iFilters ( 0 )
 1289     , m_eGroupFunc ( SPH_GROUPBY_DAY )
 1290     , m_sGroupBy ( "" )
 1291     , m_sGroupSortBy ( "@group desc" )
 1292     , m_iCutoff ( 0 )
 1293     , m_iRetryCount ( 0 )
 1294     , m_iRetryDelay ( 0 )
 1295     , m_sGroupDistinct ( "" )
 1296     , m_iIndexWeights ( 0 )
 1297     , m_iFieldWeights ( 0 )
 1298     , m_bGeoAnchor ( false )
 1299     , m_sGeoLatAttr ( "" )
 1300     , m_sGeoLongAttr ( "" )
 1301     , m_fGeoLatitude ( 0.0f )
 1302     , m_fGeoLongitude ( 0.0f )
 1303     , m_sComment ( "" )
 1304     , m_sSelect ( "*" )
 1305 
 1306     , m_pBuf ( NULL )
 1307     , m_pCur ( NULL )
 1308     , m_iBufLeft ( 0 )
 1309     , m_bBufOverrun ( false )
 1310 {
 1311     m_sQueryBuffer = new char [ iLength+2 ];
 1312     memcpy ( m_sQueryBuffer, sQuery, iLength );
 1313     m_sQueryBuffer[iLength] = ';';
 1314     m_sQueryBuffer[iLength+1] = '\0';
 1315 }
 1316 
 1317 
 1318 CSphSEQuery::~CSphSEQuery ()
 1319 {
 1320     SPH_ENTER_METHOD();
 1321     SafeDeleteArray ( m_sQueryBuffer );
 1322     SafeDeleteArray ( m_pWeights );
 1323     SafeDeleteArray ( m_pBuf );
 1324     for ( int i=0; i<m_dOverrides.elements(); i++ )
 1325         SafeDelete ( m_dOverrides.at(i) );
 1326     SPH_VOID_RET();
 1327 }
 1328 
 1329 
 1330 template < typename T >
 1331 int CSphSEQuery::ParseArray ( T ** ppValues, const char * sValue )
 1332 {
 1333     SPH_ENTER_METHOD();
 1334 
 1335     assert ( ppValues );
 1336     assert ( !(*ppValues) );
 1337 
 1338     const char * pValue;
 1339     bool bPrevDigit = false;
 1340     int iValues = 0;
 1341 
 1342     // count the values
 1343     for ( pValue=sValue; *pValue; pValue++ )
 1344     {
 1345         bool bDigit = (*pValue)>='0' && (*pValue)<='9';
 1346         if ( bDigit && !bPrevDigit )
 1347             iValues++;
 1348         bPrevDigit = bDigit;
 1349     }
 1350     if ( !iValues )
 1351         SPH_RET(0);
 1352 
 1353     // extract the values
 1354     T * pValues = new T [ iValues ];
 1355     *ppValues = pValues;
 1356 
 1357     int iIndex = 0, iSign = 1;
 1358     T uValue = 0;
 1359 
 1360     bPrevDigit = false;
 1361     for ( pValue=sValue ;; pValue++ )
 1362     {
 1363         bool bDigit = (*pValue)>='0' && (*pValue)<='9';
 1364 
 1365         if ( bDigit )
 1366         {
 1367             if ( !bPrevDigit )
 1368                 uValue = 0;
 1369             uValue = uValue*10 + ( (*pValue)-'0' );
 1370         } else if ( bPrevDigit )
 1371         {
 1372             assert ( iIndex<iValues );
 1373             pValues [ iIndex++ ] = uValue * iSign;
 1374             iSign = 1;
 1375         } else if ( *pValue=='-' )
 1376             iSign = -1;
 1377 
 1378         bPrevDigit = bDigit;
 1379         if ( !*pValue )
 1380             break;
 1381     }
 1382 
 1383     SPH_RET ( iValues );
 1384 }
 1385 
 1386 
 1387 static char * chop ( char * s )
 1388 {
 1389     while ( *s && isspace(*s) )
 1390         s++;
 1391 
 1392     char * p = s + strlen(s);
 1393     while ( p>s && isspace ( p[-1] ) )
 1394         p--;
 1395     *p = '\0';
 1396 
 1397     return s;
 1398 }
 1399 
 1400 
 1401 static bool myisattr ( char c )
 1402 {
 1403     return
 1404         ( c>='0' && c<='9' ) ||
 1405         ( c>='a' && c<='z' ) ||
 1406         ( c>='A' && c<='Z' ) ||
 1407         c=='_';
 1408 }
 1409 
 1410 static bool myismagic ( char c )
 1411 {
 1412     return c=='@';
 1413 }
 1414 
 1415 
 1416 bool CSphSEQuery::ParseField ( char * sField )
 1417 {
 1418     SPH_ENTER_METHOD();
 1419 
 1420     // look for option name/value separator
 1421     char * sValue = strchr ( sField, '=' );
 1422     if ( !sValue || sValue==sField || sValue[-1]=='\\' )
 1423     {
 1424         // by default let's assume it's just query
 1425         if ( sField[0] )
 1426         {
 1427             if ( m_bQuery )
 1428             {
 1429                 snprintf ( m_sParseError, sizeof(m_sParseError), "search query already specified; '%s' is redundant", sField );
 1430                 SPH_RET(false);
 1431             } else
 1432             {
 1433                 m_sQuery = sField;
 1434                 m_bQuery = true;
 1435 
 1436                 // unescape only 1st one
 1437                 char *s = sField, *d = sField;
 1438                 int iSlashes = 0;
 1439                 while ( *s )
 1440                 {
 1441                     iSlashes = ( *s=='\\' ) ? iSlashes+1 : 0;
 1442                     if ( ( iSlashes%2 )==0 ) *d++ = *s;
 1443                     s++;
 1444                 }
 1445                 *d = '\0';
 1446             }
 1447         }
 1448         SPH_RET(true);
 1449     }
 1450 
 1451     // split
 1452     *sValue++ = '\0';
 1453     sValue = chop ( sValue );
 1454     int iValue = atoi ( sValue );
 1455 
 1456     // handle options
 1457     char * sName = chop ( sField );
 1458 
 1459     if ( !strcmp ( sName, "query" ) )           m_sQuery = sValue;
 1460     else if ( !strcmp ( sName, "host" ) )       m_sHost = sValue;
 1461     else if ( !strcmp ( sName, "port" ) )       m_iPort = iValue;
 1462     else if ( !strcmp ( sName, "index" ) )      m_sIndex = sValue;
 1463     else if ( !strcmp ( sName, "offset" ) )     m_iOffset = iValue;
 1464     else if ( !strcmp ( sName, "limit" ) )      m_iLimit = iValue;
 1465     else if ( !strcmp ( sName, "weights" ) )    m_iWeights = ParseArray<uint32> ( &m_pWeights, sValue );
 1466     else if ( !strcmp ( sName, "minid" ) )      m_iMinID = iValue;
 1467     else if ( !strcmp ( sName, "maxid" ) )      m_iMaxID = iValue;
 1468     else if ( !strcmp ( sName, "maxmatches" ) ) m_iMaxMatches = iValue;
 1469     else if ( !strcmp ( sName, "maxquerytime" ) )   m_iMaxQueryTime = iValue;
 1470     else if ( !strcmp ( sName, "groupsort" ) )  m_sGroupSortBy = sValue;
 1471     else if ( !strcmp ( sName, "distinct" ) )   m_sGroupDistinct = sValue;
 1472     else if ( !strcmp ( sName, "cutoff" ) )     m_iCutoff = iValue;
 1473     else if ( !strcmp ( sName, "comment" ) )    m_sComment = sValue;
 1474     else if ( !strcmp ( sName, "select" ) )     m_sSelect = sValue;
 1475 
 1476     else if ( !strcmp ( sName, "mode" ) )
 1477     {
 1478         m_eMode = SPH_MATCH_ALL;
 1479         if ( !strcmp ( sValue, "any" ) )            m_eMode = SPH_MATCH_ANY;
 1480         else if ( !strcmp ( sValue, "phrase" ) )    m_eMode = SPH_MATCH_PHRASE;
 1481         else if ( !strcmp ( sValue, "boolean" ) )   m_eMode = SPH_MATCH_BOOLEAN;
 1482         else if ( !strcmp ( sValue, "ext" ) )       m_eMode = SPH_MATCH_EXTENDED;
 1483         else if ( !strcmp ( sValue, "extended" ) )  m_eMode = SPH_MATCH_EXTENDED;
 1484         else if ( !strcmp ( sValue, "ext2" ) )      m_eMode = SPH_MATCH_EXTENDED2;
 1485         else if ( !strcmp ( sValue, "extended2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
 1486         else if ( !strcmp ( sValue, "all" ) )       m_eMode = SPH_MATCH_ALL;
 1487         else if ( !strcmp ( sValue, "fullscan" ) )  m_eMode = SPH_MATCH_FULLSCAN;
 1488         else
 1489         {
 1490             snprintf ( m_sParseError, sizeof(m_sParseError), "unknown matching mode '%s'", sValue );
 1491             SPH_RET(false);
 1492         }
 1493     } else if ( !strcmp ( sName, "ranker" ) )
 1494     {
 1495         m_eRanker = SPH_RANK_PROXIMITY_BM25;
 1496         if ( !strcmp ( sValue, "proximity_bm25" ) ) m_eRanker = SPH_RANK_PROXIMITY_BM25;
 1497         else if ( !strcmp ( sValue, "bm25" ) )      m_eRanker = SPH_RANK_BM25;
 1498         else if ( !strcmp ( sValue, "none" ) )      m_eRanker = SPH_RANK_NONE;
 1499         else if ( !strcmp ( sValue, "wordcount" ) ) m_eRanker = SPH_RANK_WORDCOUNT;
 1500         else if ( !strcmp ( sValue, "proximity" ) ) m_eRanker = SPH_RANK_PROXIMITY;
 1501         else if ( !strcmp ( sValue, "matchany" ) )  m_eRanker = SPH_RANK_MATCHANY;
 1502         else if ( !strcmp ( sValue, "fieldmask" ) ) m_eRanker = SPH_RANK_FIELDMASK;
 1503         else if ( !strcmp ( sValue, "sph04" ) )     m_eRanker = SPH_RANK_SPH04;
 1504         else if ( !strncmp ( sValue, "expr:", 5 ) )
 1505         {
 1506             m_eRanker = SPH_RANK_EXPR;
 1507             m_sRankExpr = sValue+5;
 1508         } else
 1509         {
 1510             snprintf ( m_sParseError, sizeof(m_sParseError), "unknown ranking mode '%s'", sValue );
 1511             SPH_RET(false);
 1512         }
 1513     } else if ( !strcmp ( sName, "sort" ) )
 1514     {
 1515         static const struct
 1516         {
 1517             const char *    m_sName;
 1518             ESphSortOrder   m_eSort;
 1519         } dSortModes[] =
 1520         {
 1521             { "relevance",      SPH_SORT_RELEVANCE },
 1522             { "attr_desc:",     SPH_SORT_ATTR_DESC },
 1523             { "attr_asc:",      SPH_SORT_ATTR_ASC },
 1524             { "time_segments:", SPH_SORT_TIME_SEGMENTS },
 1525             { "extended:",      SPH_SORT_EXTENDED },
 1526             { "expr:",          SPH_SORT_EXPR }
 1527         };
 1528 
 1529         int i;
 1530         const int nModes = sizeof(dSortModes)/sizeof(dSortModes[0]);
 1531         for ( i=0; i<nModes; i++ )
 1532             if ( !strncmp ( sValue, dSortModes[i].m_sName, strlen ( dSortModes[i].m_sName ) ) )
 1533         {
 1534             m_eSort = dSortModes[i].m_eSort;
 1535             m_sSortBy = sValue + strlen ( dSortModes[i].m_sName );
 1536             break;
 1537         }
 1538         if ( i==nModes )
 1539         {
 1540             snprintf ( m_sParseError, sizeof(m_sParseError), "unknown sorting mode '%s'", sValue );
 1541             SPH_RET(false);
 1542         }
 1543 
 1544     } else if ( !strcmp ( sName, "groupby" ) )
 1545     {
 1546         static const struct
 1547         {
 1548             const char *    m_sName;
 1549             ESphGroupBy     m_eFunc;
 1550         } dGroupModes[] =
 1551         {
 1552             { "day:",   SPH_GROUPBY_DAY },
 1553             { "week:",  SPH_GROUPBY_WEEK },
 1554             { "month:", SPH_GROUPBY_MONTH },
 1555             { "year:",  SPH_GROUPBY_YEAR },
 1556             { "attr:",  SPH_GROUPBY_ATTR },
 1557         };
 1558 
 1559         int i;
 1560         const int nModes = sizeof(dGroupModes)/sizeof(dGroupModes[0]);
 1561         for ( i=0; i<nModes; i++ )
 1562             if ( !strncmp ( sValue, dGroupModes[i].m_sName, strlen ( dGroupModes[i].m_sName ) ) )
 1563         {
 1564             m_eGroupFunc = dGroupModes[i].m_eFunc;
 1565             m_sGroupBy = sValue + strlen ( dGroupModes[i].m_sName );
 1566             break;
 1567         }
 1568         if ( i==nModes )
 1569         {
 1570             snprintf ( m_sParseError, sizeof(m_sParseError), "unknown groupby mode '%s'", sValue );
 1571             SPH_RET(false);
 1572         }
 1573 
 1574     } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
 1575         ( !strcmp ( sName, "range" ) || !strcmp ( sName, "!range" ) || !strcmp ( sName, "floatrange" ) || !strcmp ( sName, "!floatrange" ) ) )
 1576     {
 1577         for ( ;; )
 1578         {
 1579             char * p = sName;
 1580             CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
 1581             tFilter.m_bExclude = ( *p=='!' ); if ( tFilter.m_bExclude ) p++;
 1582             tFilter.m_eType = ( *p=='f' ) ? SPH_FILTER_FLOATRANGE : SPH_FILTER_RANGE;
 1583 
 1584             if (!( p = strchr ( sValue, ',' ) ))
 1585                 break;
 1586             *p++ = '\0';
 1587 
 1588             tFilter.m_sAttrName = chop ( sValue );
 1589             sValue = p;
 1590 
 1591             if (!( p = strchr ( sValue, ',' ) ))
 1592                 break;
 1593             *p++ = '\0';
 1594 
 1595             if ( tFilter.m_eType==SPH_FILTER_RANGE )
 1596             {
 1597                 tFilter.m_uMinValue = strtoll ( sValue, NULL, 10 );
 1598                 tFilter.m_uMaxValue = strtoll ( p, NULL, 10 );
 1599             } else
 1600             {
 1601                 tFilter.m_fMinValue = (float)atof(sValue);
 1602                 tFilter.m_fMaxValue = (float)atof(p);
 1603             }
 1604 
 1605             // all ok
 1606             m_iFilters++;
 1607             break;
 1608         }
 1609 
 1610     } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
 1611         ( !strcmp ( sName, "filter" ) || !strcmp ( sName, "!filter" ) ) )
 1612     {
 1613         for ( ;; )
 1614         {
 1615             CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
 1616             tFilter.m_eType = SPH_FILTER_VALUES;
 1617             tFilter.m_bExclude = ( strcmp ( sName, "!filter" )==0 );
 1618 
 1619             // get the attr name
 1620             while ( (*sValue) && !( myisattr(*sValue) || myismagic(*sValue) ) )
 1621                 sValue++;
 1622             if ( !*sValue )
 1623                 break;
 1624 
 1625             tFilter.m_sAttrName = sValue;
 1626             while ( (*sValue) && ( myisattr(*sValue) || myismagic(*sValue) ) )
 1627                 sValue++;
 1628             if ( !*sValue )
 1629                 break;
 1630             *sValue++ = '\0';
 1631 
 1632             // get the values
 1633             tFilter.m_iValues = ParseArray<longlong> ( &tFilter.m_pValues, sValue );
 1634             if ( !tFilter.m_iValues )
 1635             {
 1636                 assert ( !tFilter.m_pValues );
 1637                 break;
 1638             }
 1639 
 1640             // all ok
 1641             m_iFilters++;
 1642             break;
 1643         }
 1644 
 1645     } else if ( !strcmp ( sName, "indexweights" ) || !strcmp ( sName, "fieldweights" ) )
 1646     {
 1647         bool bIndex = !strcmp ( sName, "indexweights" );
 1648         int * pCount = bIndex ? &m_iIndexWeights : &m_iFieldWeights;
 1649         char ** pNames = bIndex ? &m_sIndexWeight[0] : &m_sFieldWeight[0];
 1650         int * pWeights = bIndex ? &m_iIndexWeight[0] : &m_iFieldWeight[0];
 1651 
 1652         *pCount = 0;
 1653 
 1654         char * p = sValue;
 1655         while ( *p && *pCount<SPHINXSE_MAX_FILTERS )
 1656         {
 1657             // extract attr name
 1658             if ( !myisattr(*p) )
 1659             {
 1660                 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: index name expected near '%s'", sName, p );
 1661                 SPH_RET(false);
 1662             }
 1663 
 1664             pNames[*pCount] = p;
 1665             while ( myisattr(*p) ) p++;
 1666 
 1667             if ( *p!=',' )
 1668             {
 1669                 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
 1670                 SPH_RET(false);
 1671             }
 1672             *p++ = '\0';
 1673 
 1674             // extract attr value
 1675             char * sVal = p;
 1676             while ( isdigit(*p) ) p++;
 1677             if ( p==sVal )
 1678             {
 1679                 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: integer weight expected near '%s'", sName, sVal );
 1680                 SPH_RET(false);
 1681             }
 1682             pWeights[*pCount] = atoi(sVal);
 1683             (*pCount)++;
 1684 
 1685             if ( !*p )
 1686                 break;
 1687             if ( *p!=',' )
 1688             {
 1689                 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
 1690                 SPH_RET(false);
 1691             }
 1692             p++;
 1693         }
 1694 
 1695     } else if ( !strcmp ( sName, "geoanchor" ) )
 1696     {
 1697         m_bGeoAnchor = false;
 1698         for ( ;; )
 1699         {
 1700             char * sLat = sValue;
 1701             char * p = sValue;
 1702 
 1703             if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
 1704             char * sLong = p;
 1705 
 1706             if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
 1707             char * sLatVal = p;
 1708 
 1709             if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
 1710             char * sLongVal = p;
 1711 
 1712             m_sGeoLatAttr = chop(sLat);
 1713             m_sGeoLongAttr = chop(sLong);
 1714             m_fGeoLatitude = (float)atof ( sLatVal );
 1715             m_fGeoLongitude = (float)atof ( sLongVal );
 1716             m_bGeoAnchor = true;
 1717             break;
 1718         }
 1719         if ( !m_bGeoAnchor )
 1720         {
 1721             snprintf ( m_sParseError, sizeof(m_sParseError), "geoanchor: parse error, not enough comma-separated arguments" );
 1722             SPH_RET(false);
 1723         }
 1724     } else if ( !strcmp ( sName, "override" ) ) // name,type,id:value,id:value,...
 1725     {
 1726         char * sName = NULL;
 1727         int iType = 0;
 1728         CSphSEQuery::Override_t * pOverride = NULL;
 1729 
 1730         // get name and type
 1731         char * sRest = sValue;
 1732         for ( ;; )
 1733         {
 1734             sName = sRest;
 1735             if ( !*sName )
 1736                 break;
 1737             if (!( sRest = strchr ( sRest, ',' ) ))
 1738                 break;
 1739             *sRest++ = '\0';
 1740             char * sType = sRest;
 1741             if (!( sRest = strchr ( sRest, ',' ) ))
 1742                 break;
 1743 
 1744             static const struct
 1745             {
 1746                 const char *    m_sName;
 1747                 int             m_iType;
 1748             }
 1749             dAttrTypes[] =
 1750             {
 1751                 { "int",        SPH_ATTR_INTEGER },
 1752                 { "timestamp",  SPH_ATTR_TIMESTAMP },
 1753                 { "bool",       SPH_ATTR_BOOL },
 1754                 { "float",      SPH_ATTR_FLOAT },
 1755                 { "bigint",     SPH_ATTR_BIGINT }
 1756             };
 1757             for ( int i=0; i<sizeof(dAttrTypes)/sizeof(*dAttrTypes); i++ )
 1758                 if ( !strncmp ( sType, dAttrTypes[i].m_sName, sRest - sType ) )
 1759             {
 1760                 iType = dAttrTypes[i].m_iType;
 1761                 break;
 1762             }
 1763             break;
 1764         }
 1765 
 1766         // fail
 1767         if ( !sName || !*sName || !iType )
 1768         {
 1769             snprintf ( m_sParseError, sizeof(m_sParseError), "override: malformed query" );
 1770             SPH_RET(false);
 1771         }
 1772 
 1773         // grab id:value pairs
 1774         sRest++;
 1775         while ( sRest )
 1776         {
 1777             char * sId = sRest;
 1778             if (!( sRest = strchr ( sRest, ':' ) )) break; *sRest++ = '\0';
 1779             if (!( sRest - sId )) break;
 1780 
 1781             char * sValue = sRest;
 1782             if ( ( sRest = strchr ( sRest, ',' ) )!=NULL )
 1783                 *sRest++ = '\0';
 1784             if ( !*sValue )
 1785                 break;
 1786 
 1787             if ( !pOverride )
 1788             {
 1789                 pOverride = new CSphSEQuery::Override_t;
 1790                 pOverride->m_sName = chop(sName);
 1791                 pOverride->m_iType = iType;
 1792                 m_dOverrides.append ( pOverride );
 1793             }
 1794 
 1795             ulonglong uId = strtoull ( sId, NULL, 10 );
 1796             CSphSEQuery::Override_t::Value_t tValue;
 1797             if ( iType==SPH_ATTR_FLOAT )
 1798                 tValue.m_fValue = (float)atof(sValue);
 1799             else if ( iType==SPH_ATTR_BIGINT )
 1800                 tValue.m_iValue64 = strtoll ( sValue, NULL, 10 );
 1801             else
 1802                 tValue.m_uValue = (uint32)strtoul ( sValue, NULL, 10 );
 1803 
 1804             pOverride->m_dIds.append ( uId );
 1805             pOverride->m_dValues.append ( tValue );
 1806         }
 1807 
 1808         if ( !pOverride )
 1809         {
 1810             snprintf ( m_sParseError, sizeof(m_sParseError), "override: id:value mapping expected" );
 1811             SPH_RET(false);
 1812         }
 1813         SPH_RET(true);
 1814     } else
 1815     {
 1816         snprintf ( m_sParseError, sizeof(m_sParseError), "unknown parameter '%s'", sName );
 1817         SPH_RET(false);
 1818     }
 1819 
 1820     // !COMMIT handle syntax errors
 1821 
 1822     SPH_RET(true);
 1823 }
 1824 
 1825 
 1826 bool CSphSEQuery::Parse ()
 1827 {
 1828     SPH_ENTER_METHOD();
 1829     SPH_DEBUG ( "query [[ %s ]]", m_sQueryBuffer );
 1830 
 1831     m_bQuery = false;
 1832     char * pCur = m_sQueryBuffer;
 1833     char * pNext = pCur;
 1834 
 1835     while ( ( pNext = strchr ( pNext, ';' ) )!=NULL )
 1836     {
 1837         // handle escaped semicolons
 1838         if ( pNext>m_sQueryBuffer && pNext[-1]=='\\' && pNext[1]!='\0' )
 1839         {
 1840             pNext++;
 1841             continue;
 1842         }
 1843 
 1844         // handle semicolon-separated clauses
 1845         *pNext++ = '\0';
 1846         if ( !ParseField ( pCur ) )
 1847             SPH_RET(false);
 1848         pCur = pNext;
 1849     }
 1850 
 1851     SPH_DEBUG ( "q [[ %s ]]", m_sQuery );
 1852 
 1853     SPH_RET(true);
 1854 }
 1855 
 1856 
 1857 void CSphSEQuery::SendBytes ( const void * pBytes, int iBytes )
 1858 {
 1859     SPH_ENTER_METHOD();
 1860     if ( m_iBufLeft<iBytes )
 1861     {
 1862         m_bBufOverrun = true;
 1863         SPH_VOID_RET();
 1864     }
 1865 
 1866     memcpy ( m_pCur, pBytes, iBytes );
 1867 
 1868     m_pCur += iBytes;
 1869     m_iBufLeft -= iBytes;
 1870     SPH_VOID_RET();
 1871 }
 1872 
 1873 
 1874 int CSphSEQuery::BuildRequest ( char ** ppBuffer )
 1875 {
 1876     SPH_ENTER_METHOD();
 1877 
 1878     // calc request length
 1879     int iReqSize = 128 + 4*m_iWeights
 1880         + strlen ( m_sSortBy )
 1881         + strlen ( m_sQuery )
 1882         + strlen ( m_sIndex )
 1883         + strlen ( m_sGroupBy )
 1884         + strlen ( m_sGroupSortBy )
 1885         + strlen ( m_sGroupDistinct )
 1886         + strlen ( m_sComment )
 1887         + strlen ( m_sSelect );
 1888     if ( m_eRanker==SPH_RANK_EXPR )
 1889         iReqSize += 4 + strlen(m_sRankExpr);
 1890     for ( int i=0; i<m_iFilters; i++ )
 1891     {
 1892         const CSphSEFilter & tFilter = m_dFilters[i];
 1893         iReqSize += 12 + strlen ( tFilter.m_sAttrName ); // string attr-name; int type; int exclude-flag
 1894         switch ( tFilter.m_eType )
 1895         {
 1896             case SPH_FILTER_VALUES:     iReqSize += 4 + 8*tFilter.m_iValues; break;
 1897             case SPH_FILTER_RANGE:      iReqSize += 16; break;
 1898             case SPH_FILTER_FLOATRANGE: iReqSize += 8; break;
 1899         }
 1900     }
 1901     if ( m_bGeoAnchor ) // 1.14+
 1902         iReqSize += 16 + strlen ( m_sGeoLatAttr ) + strlen ( m_sGeoLongAttr );
 1903     for ( int i=0; i<m_iIndexWeights; i++ ) // 1.15+
 1904         iReqSize += 8 + strlen(m_sIndexWeight[i] );
 1905     for ( int i=0; i<m_iFieldWeights; i++ ) // 1.18+
 1906         iReqSize += 8 + strlen(m_sFieldWeight[i] );
 1907     // overrides
 1908     iReqSize += 4;
 1909     for ( int i=0; i<m_dOverrides.elements(); i++ )
 1910     {
 1911         CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
 1912         const uint32 uSize = pOverride->m_iType==SPH_ATTR_BIGINT ? 16 : 12; // id64 + value
 1913         iReqSize += strlen ( pOverride->m_sName ) + 12 + uSize*pOverride->m_dIds.elements();
 1914     }
 1915     // select
 1916     iReqSize += 4;
 1917 
 1918     m_iBufLeft = 0;
 1919     SafeDeleteArray ( m_pBuf );
 1920 
 1921     m_pBuf = new char [ iReqSize ];
 1922     if ( !m_pBuf )
 1923         SPH_RET(-1);
 1924 
 1925     m_pCur = m_pBuf;
 1926     m_iBufLeft = iReqSize;
 1927     m_bBufOverrun = false;
 1928     (*ppBuffer) = m_pBuf;
 1929 
 1930     // build request
 1931     SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
 1932     SendWord ( VER_COMMAND_SEARCH ); // command version
 1933     SendInt ( iReqSize-8 ); // packet body length
 1934     SendInt ( 0 ); // its a client
 1935 
 1936     SendInt ( 1 ); // number of queries
 1937     SendInt ( m_iOffset );
 1938     SendInt ( m_iLimit );
 1939     SendInt ( m_eMode );
 1940     SendInt ( m_eRanker ); // 1.16+
 1941     if ( m_eRanker==SPH_RANK_EXPR )
 1942         SendString ( m_sRankExpr );
 1943     SendInt ( m_eSort );
 1944     SendString ( m_sSortBy ); // sort attr
 1945     SendString ( m_sQuery ); // query
 1946     SendInt ( m_iWeights );
 1947     for ( int j=0; j<m_iWeights; j++ )
 1948         SendInt ( m_pWeights[j] ); // weights
 1949     SendString ( m_sIndex ); // indexes
 1950     SendInt ( 1 ); // id64 range follows
 1951     SendUint64 ( m_iMinID ); // id/ts ranges
 1952     SendUint64 ( m_iMaxID );
 1953 
 1954     SendInt ( m_iFilters );
 1955     for ( int j=0; j<m_iFilters; j++ )
 1956     {
 1957         const CSphSEFilter & tFilter = m_dFilters[j];
 1958         SendString ( tFilter.m_sAttrName );
 1959         SendInt ( tFilter.m_eType );
 1960 
 1961         switch ( tFilter.m_eType )
 1962         {
 1963             case SPH_FILTER_VALUES:
 1964                 SendInt ( tFilter.m_iValues );
 1965                 for ( int k=0; k<tFilter.m_iValues; k++ )
 1966                     SendUint64 ( tFilter.m_pValues[k] );
 1967                 break;
 1968 
 1969             case SPH_FILTER_RANGE:
 1970                 SendUint64 ( tFilter.m_uMinValue );
 1971                 SendUint64 ( tFilter.m_uMaxValue );
 1972                 break;
 1973 
 1974             case SPH_FILTER_FLOATRANGE:
 1975                 SendFloat ( tFilter.m_fMinValue );
 1976                 SendFloat ( tFilter.m_fMaxValue );
 1977                 break;
 1978         }
 1979 
 1980         SendInt ( tFilter.m_bExclude );
 1981     }
 1982 
 1983     SendInt ( m_eGroupFunc );
 1984     SendString ( m_sGroupBy );
 1985     SendInt ( m_iMaxMatches );
 1986     SendString ( m_sGroupSortBy );
 1987     SendInt ( m_iCutoff ); // 1.9+
 1988     SendInt ( m_iRetryCount ); // 1.10+
 1989     SendInt ( m_iRetryDelay );
 1990     SendString ( m_sGroupDistinct ); // 1.11+
 1991     SendInt ( m_bGeoAnchor ); // 1.14+
 1992     if ( m_bGeoAnchor )
 1993     {
 1994         SendString ( m_sGeoLatAttr );
 1995         SendString ( m_sGeoLongAttr );
 1996         SendFloat ( m_fGeoLatitude );
 1997         SendFloat ( m_fGeoLongitude );
 1998     }
 1999     SendInt ( m_iIndexWeights ); // 1.15+
 2000     for ( int i=0; i<m_iIndexWeights; i++ )
 2001     {
 2002         SendString ( m_sIndexWeight[i] );
 2003         SendInt ( m_iIndexWeight[i] );
 2004     }
 2005     SendInt ( m_iMaxQueryTime ); // 1.17+
 2006     SendInt ( m_iFieldWeights ); // 1.18+
 2007     for ( int i=0; i<m_iFieldWeights; i++ )
 2008     {
 2009         SendString ( m_sFieldWeight[i] );
 2010         SendInt ( m_iFieldWeight[i] );
 2011     }
 2012     SendString ( m_sComment );
 2013 
 2014     // overrides
 2015     SendInt ( m_dOverrides.elements() );
 2016     for ( int i=0; i<m_dOverrides.elements(); i++ )
 2017     {
 2018         CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
 2019         SendString ( pOverride->m_sName );
 2020         SendDword ( pOverride->m_iType );
 2021         SendInt ( pOverride->m_dIds.elements() );
 2022         for ( int j=0; j<pOverride->m_dIds.elements(); j++ )
 2023         {
 2024             SendUint64 ( pOverride->m_dIds.at(j) );
 2025             if ( pOverride->m_iType==SPH_ATTR_FLOAT )
 2026                 SendFloat ( pOverride->m_dValues.at(j).m_fValue );
 2027             else if ( pOverride->m_iType==SPH_ATTR_BIGINT )
 2028                 SendUint64 ( pOverride->m_dValues.at(j).m_iValue64 );
 2029             else
 2030                 SendDword ( pOverride->m_dValues.at(j).m_uValue );
 2031         }
 2032     }
 2033 
 2034     // select
 2035     SendString ( m_sSelect );
 2036 
 2037     // detect buffer overruns and underruns, and report internal error
 2038     if ( m_bBufOverrun || m_iBufLeft!=0 || m_pCur-m_pBuf!=iReqSize )
 2039         SPH_RET(-1);
 2040 
 2041     // all fine
 2042     SPH_RET ( iReqSize );
 2043 }
 2044 
 2045 //////////////////////////////////////////////////////////////////////////////
 2046 // SPHINX HANDLER
 2047 //////////////////////////////////////////////////////////////////////////////
 2048 
 2049 static const char * ha_sphinx_exts[] = { NullS };
 2050 
 2051 
 2052 #if MYSQL_VERSION_ID<50100
 2053 ha_sphinx::ha_sphinx ( TABLE_ARG * table )
 2054     : handler ( &sphinx_hton, table )
 2055 #else
 2056 ha_sphinx::ha_sphinx ( handlerton * hton, TABLE_ARG * table )
 2057     : handler ( hton, table )
 2058 #endif
 2059     , m_pShare ( NULL )
 2060     , m_iMatchesTotal ( 0 )
 2061     , m_iCurrentPos ( 0 )
 2062     , m_pCurrentKey ( NULL )
 2063     , m_iCurrentKeyLen ( 0 )
 2064     , m_pResponse ( NULL )
 2065     , m_pResponseEnd ( NULL )
 2066     , m_pCur ( NULL )
 2067     , m_bUnpackError ( false )
 2068     , m_iFields ( 0 )
 2069     , m_dFields ( NULL )
 2070     , m_iAttrs ( 0 )
 2071     , m_dAttrs ( NULL )
 2072     , m_bId64 ( 0 )
 2073     , m_dUnboundFields ( NULL )
 2074 {
 2075     SPH_ENTER_METHOD();
 2076     if ( current_thd )
 2077         current_thd->variables.engine_condition_pushdown = true;
 2078     SPH_VOID_RET();
 2079 }
 2080 
 2081 
 2082 // If frm_error() is called then we will use this to to find out what file extentions
 2083 // exist for the storage engine. This is also used by the default rename_table and
 2084 // delete_table method in handler.cc.
 2085 const char ** ha_sphinx::bas_ext() const
 2086 {
 2087     return ha_sphinx_exts;
 2088 }
 2089 
 2090 
 2091 // Used for opening tables. The name will be the name of the file.
 2092 // A table is opened when it needs to be opened. For instance
 2093 // when a request comes in for a select on the table (tables are not
 2094 // open and closed for each request, they are cached).
 2095 //
 2096 // Called from handler.cc by handler::ha_open(). The server opens all tables by
 2097 // calling ha_open() which then calls the handler specific open().
 2098 int ha_sphinx::open ( const char * name, int, uint )
 2099 {
 2100     SPH_ENTER_METHOD();
 2101     m_pShare = get_share ( name, table );
 2102     if ( !m_pShare )
 2103         SPH_RET(1);
 2104 
 2105     thr_lock_data_init ( &m_pShare->m_tLock, &m_tLock, NULL );
 2106 
 2107     #if MYSQL_VERSION_ID>50100
 2108     void ** tmp = thd_ha_data ( table->in_use, ht );
 2109     if ( *tmp )
 2110     {
 2111         CSphTLS * pTls = (CSphTLS *)( *tmp );
 2112         SafeDelete ( pTls );
 2113         *tmp = NULL;
 2114     }   
 2115     #else
 2116     if ( table->in_use->ha_data [ sphinx_hton.slot ] )
 2117     {
 2118         CSphTLS * pTls = (CSphTLS *)( table->in_use->ha_data [ sphinx_hton.slot ] );
 2119         SafeDelete ( pTls );
 2120         table->in_use->ha_data [ sphinx_hton.slot ] = NULL;
 2121     }
 2122     #endif
 2123 
 2124     SPH_RET(0);
 2125 }
 2126 
 2127 
 2128 int ha_sphinx::Connect ( const char * sHost, ushort uPort )
 2129 {
 2130     struct sockaddr_in sin;
 2131 #ifndef __WIN__
 2132     struct sockaddr_un saun;
 2133 #endif
 2134 
 2135     int iDomain = 0;
 2136     int iSockaddrSize = 0;
 2137     struct sockaddr * pSockaddr = NULL;
 2138 
 2139     in_addr_t ip_addr;
 2140 
 2141     if ( uPort )
 2142     {
 2143         iDomain = AF_INET;
 2144         iSockaddrSize = sizeof(sin);
 2145         pSockaddr = (struct sockaddr *) &sin;
 2146 
 2147         memset ( &sin, 0, sizeof(sin) );
 2148         sin.sin_family = AF_INET;
 2149         sin.sin_port = htons(uPort);
 2150 
 2151         // prepare host address
 2152         if ( (int)( ip_addr = inet_addr(sHost) )!=(int)INADDR_NONE )
 2153         {
 2154             memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
 2155         } else
 2156         {
 2157             int tmp_errno;
 2158             bool bError = false;
 2159 
 2160 #if MYSQL_VERSION_ID>=50515
 2161             struct addrinfo * hp = NULL;
 2162             tmp_errno = getaddrinfo ( sHost, NULL, NULL, &hp );
 2163             if ( tmp_errno!=0 || !hp || !hp->ai_addr )
 2164             {
 2165                 bError = true;
 2166                 if ( hp )
 2167                     freeaddrinfo ( hp );
 2168             }
 2169 #else
 2170             struct hostent tmp_hostent, *hp;
 2171             char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
 2172             hp = my_gethostbyname_r ( sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
 2173             if ( !hp )
 2174             {
 2175                 my_gethostbyname_r_free();
 2176                 bError = true;
 2177             }
 2178 #endif
 2179 
 2180             if ( bError )
 2181             {
 2182                 char sError[256];
 2183                 my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", sHost );
 2184 
 2185                 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2186                 SPH_RET(-1);
 2187             }
 2188 
 2189 #if MYSQL_VERSION_ID>=50515
 2190             memcpy ( &sin.sin_addr, &( (struct sockaddr_in *)hp->ai_addr )->sin_addr, sizeof(sin.sin_addr) );
 2191             freeaddrinfo ( hp );
 2192 #else
 2193             memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
 2194             my_gethostbyname_r_free();
 2195 #endif
 2196         }
 2197     } else
 2198     {
 2199 #ifndef __WIN__
 2200         iDomain = AF_UNIX;
 2201         iSockaddrSize = sizeof(saun);
 2202         pSockaddr = (struct sockaddr *) &saun;
 2203 
 2204         memset ( &saun, 0, sizeof(saun) );
 2205         saun.sun_family = AF_UNIX;
 2206         strncpy ( saun.sun_path, sHost, sizeof(saun.sun_path)-1 );
 2207 #else
 2208         my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "UNIX sockets are not supported on Windows" );
 2209         SPH_RET(-1);
 2210 #endif
 2211     }
 2212 
 2213     char sError[512];
 2214     int iSocket = socket ( iDomain, SOCK_STREAM, 0 );
 2215 
 2216     if ( iSocket<0 )
 2217     {
 2218         my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "failed to create client socket" );
 2219         SPH_RET(-1);
 2220     }
 2221 
 2222     if ( connect ( iSocket, pSockaddr, iSockaddrSize )<0 )
 2223     {
 2224         sphSockClose ( iSocket );
 2225         my_snprintf ( sError, sizeof(sError), "failed to connect to searchd (host=%s, errno=%d, port=%d)",
 2226             sHost, errno, (int)uPort );
 2227         my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2228         SPH_RET(-1);
 2229     }
 2230 
 2231     return iSocket;
 2232 }
 2233 
 2234 
 2235 int ha_sphinx::ConnectAPI ( const char * sQueryHost, int iQueryPort )
 2236 {
 2237     SPH_ENTER_METHOD();
 2238 
 2239     const char * sHost = ( sQueryHost && *sQueryHost ) ? sQueryHost : m_pShare->m_sHost;
 2240     ushort uPort = iQueryPort ? (ushort)iQueryPort : m_pShare->m_iPort;
 2241 
 2242     int iSocket = Connect ( sHost, uPort );
 2243     if ( iSocket<0 )
 2244         SPH_RET ( iSocket );
 2245 
 2246     char sError[512];
 2247 
 2248     int version;
 2249     if ( ::recv ( iSocket, (char *)&version, sizeof(version), 0 )!=sizeof(version) )
 2250     {
 2251         sphSockClose ( iSocket );
 2252         my_snprintf ( sError, sizeof(sError), "failed to receive searchd version (host=%s, port=%d)",
 2253             sHost, (int)uPort );
 2254         my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2255         SPH_RET(-1);
 2256     }
 2257 
 2258     uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
 2259     if ( ::send ( iSocket, (char*)&uClientVersion, sizeof(uClientVersion), 0 )!=sizeof(uClientVersion) )
 2260     {
 2261         sphSockClose ( iSocket );
 2262         my_snprintf ( sError, sizeof(sError), "failed to send client version (host=%s, port=%d)",
 2263             sHost, (int)uPort );
 2264         my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2265         SPH_RET(-1);
 2266     }
 2267 
 2268     SPH_RET ( iSocket );
 2269 }
 2270 
 2271 
 2272 // Closes a table. We call the free_share() function to free any resources
 2273 // that we have allocated in the "shared" structure.
 2274 //
 2275 // Called from sql_base.cc, sql_select.cc, and table.cc.
 2276 // In sql_select.cc it is only used to close up temporary tables or during
 2277 // the process where a temporary table is converted over to being a
 2278 // myisam table.
 2279 // For sql_base.cc look at close_data_tables().
 2280 int ha_sphinx::close()
 2281 {
 2282     SPH_ENTER_METHOD();
 2283     SPH_RET ( free_share ( m_pShare ) );
 2284 }
 2285 
 2286 
 2287 int ha_sphinx::HandleMysqlError ( MYSQL * pConn, int iErrCode )
 2288 {
 2289     CSphSEThreadTable * pTable = GetTls ();
 2290     if ( pTable )
 2291     {
 2292         strncpy ( pTable->m_tStats.m_sLastMessage, mysql_error ( pConn ), sizeof ( pTable->m_tStats.m_sLastMessage ) );
 2293         pTable->m_tStats.m_bLastError = true;
 2294     }
 2295 
 2296     mysql_close ( pConn );
 2297 
 2298     my_error ( iErrCode, MYF(0), pTable->m_tStats.m_sLastMessage );
 2299     return -1;
 2300 }
 2301 
 2302 
 2303 int ha_sphinx::extra ( enum ha_extra_function op )
 2304 {
 2305     CSphSEThreadTable * pTable = GetTls();
 2306     if ( pTable )
 2307     {
 2308         if ( op==HA_EXTRA_WRITE_CAN_REPLACE )
 2309             pTable->m_bReplace = true;
 2310         else if ( op==HA_EXTRA_WRITE_CANNOT_REPLACE )
 2311             pTable->m_bReplace = false;
 2312     }
 2313     return 0;
 2314 }
 2315 
 2316 
 2317 int ha_sphinx::write_row ( byte * )
 2318 {
 2319     SPH_ENTER_METHOD();
 2320     if ( !m_pShare || !m_pShare->m_bSphinxQL )
 2321         SPH_RET ( HA_ERR_WRONG_COMMAND );
 2322 
 2323     // SphinxQL inserts only, pretty much similar to abandoned federated
 2324     char sQueryBuf[1024];
 2325     char sValueBuf[1024];
 2326 
 2327     String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
 2328     String sValue ( sValueBuf, sizeof(sQueryBuf), &my_charset_bin );
 2329     sQuery.length ( 0 );
 2330     sValue.length ( 0 );
 2331 
 2332     CSphSEThreadTable * pTable = GetTls ();
 2333     sQuery.append ( pTable && pTable->m_bReplace ? "REPLACE INTO " : "INSERT INTO " );
 2334     sQuery.append ( m_pShare->m_sIndex );
 2335     sQuery.append ( " (" );
 2336 
 2337     for ( Field ** ppField = table->field; *ppField; ppField++ )
 2338     {
 2339         sQuery.append ( (*ppField)->field_name );
 2340         if ( ppField[1] )
 2341             sQuery.append ( ", " );
 2342     }
 2343     sQuery.append ( ") VALUES (" );
 2344 
 2345     for ( Field ** ppField = table->field; *ppField; ppField++ )
 2346     {
 2347         if ( (*ppField)->is_null() )
 2348         {
 2349             sQuery.append ( "''" );
 2350 
 2351         } else
 2352         {
 2353             if ( (*ppField)->type()==MYSQL_TYPE_TIMESTAMP )
 2354             {
 2355                 Item_field * pWrap = new Item_field ( *ppField ); // autofreed by query arena, I assume
 2356                 Item_func_unix_timestamp * pConv = new Item_func_unix_timestamp ( pWrap );
 2357                 pConv->quick_fix_field();
 2358                 unsigned int uTs = (unsigned int) pConv->val_int();
 2359 
 2360                 snprintf ( sValueBuf, sizeof(sValueBuf), "'%u'", uTs );
 2361                 sQuery.append ( sValueBuf );
 2362 
 2363             } else
 2364             {
 2365                 (*ppField)->val_str ( &sValue );
 2366                 
 2367                 int iLen = sValue.length();
 2368                 bool bMva = ( iLen>1 && sValue.ptr()[0]=='(' && sValue.ptr()[iLen-1]==')' );
 2369                 
 2370                 if ( !bMva )
 2371                     sQuery.append ( "'" );
 2372                 sValue.print ( &sQuery );
 2373                 if ( !bMva )
 2374                     sQuery.append ( "'" );
 2375                 
 2376                 sValue.length(0);
 2377             }
 2378         }
 2379 
 2380         if ( ppField[1] )
 2381             sQuery.append ( ", " );
 2382     }
 2383     sQuery.append ( ")" );
 2384 
 2385     // FIXME? pretty inefficient to reconnect every time under high load,
 2386     // but this was intentionally written for a low load scenario..
 2387     MYSQL * pConn = mysql_init ( NULL );
 2388     if ( !pConn )
 2389         SPH_RET ( ER_OUT_OF_RESOURCES );
 2390 
 2391     unsigned int uTimeout = 1;
 2392     mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
 2393 
 2394     if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
 2395         SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
 2396 
 2397     if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
 2398         SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
 2399 
 2400     // all ok!
 2401     mysql_close ( pConn );
 2402     SPH_RET(0);
 2403 }
 2404 
 2405 
 2406 static inline bool IsIntegerFieldType ( enum_field_types eType )
 2407 {
 2408     return eType==MYSQL_TYPE_LONG || eType==MYSQL_TYPE_LONGLONG;
 2409 }
 2410 
 2411 
 2412 static inline bool IsIDField ( Field * pField )
 2413 {
 2414     enum_field_types eType = pField->type();
 2415 
 2416     if ( eType==MYSQL_TYPE_LONGLONG )
 2417         return true;
 2418 
 2419     if ( eType==MYSQL_TYPE_LONG && ((Field_num*)pField)->unsigned_flag )
 2420         return true;
 2421 
 2422     return false;
 2423 }
 2424 
 2425 
 2426 int ha_sphinx::delete_row ( const byte * )
 2427 {
 2428     SPH_ENTER_METHOD();
 2429     if ( !m_pShare || !m_pShare->m_bSphinxQL )
 2430         SPH_RET ( HA_ERR_WRONG_COMMAND );
 2431 
 2432     char sQueryBuf[1024];
 2433     String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
 2434     sQuery.length ( 0 );
 2435 
 2436     sQuery.append ( "DELETE FROM " );
 2437     sQuery.append ( m_pShare->m_sIndex );
 2438     sQuery.append ( " WHERE id=" );
 2439 
 2440     char sValue[32];
 2441     snprintf ( sValue, sizeof(sValue), "%lld", table->field[0]->val_int() );
 2442     sQuery.append ( sValue );
 2443 
 2444     // FIXME? pretty inefficient to reconnect every time under high load,
 2445     // but this was intentionally written for a low load scenario..
 2446     MYSQL * pConn = mysql_init ( NULL );
 2447     if ( !pConn )
 2448         SPH_RET ( ER_OUT_OF_RESOURCES );
 2449 
 2450     unsigned int uTimeout = 1;
 2451     mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
 2452 
 2453     if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
 2454         SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
 2455 
 2456     if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
 2457         SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
 2458 
 2459     // all ok!
 2460     mysql_close ( pConn );
 2461     SPH_RET(0);
 2462 }
 2463 
 2464 
 2465 int ha_sphinx::update_row ( const byte *, byte * )
 2466 {
 2467     SPH_ENTER_METHOD();
 2468     SPH_RET ( HA_ERR_WRONG_COMMAND );
 2469 }
 2470 
 2471 
 2472 // keynr is key (index) number
 2473 // sorted is 1 if result MUST be sorted according to index
 2474 int ha_sphinx::index_init ( uint keynr, bool )
 2475 {
 2476     SPH_ENTER_METHOD();
 2477     active_index = keynr;
 2478 
 2479     CSphSEThreadTable * pTable = GetTls();
 2480     if ( pTable )
 2481         pTable->m_bCondDone = false;
 2482 
 2483     SPH_RET(0);
 2484 }
 2485 
 2486 
 2487 int ha_sphinx::index_end()
 2488 {
 2489     SPH_ENTER_METHOD();
 2490     SPH_RET(0);
 2491 }
 2492 
 2493 
 2494 bool ha_sphinx::CheckResponcePtr ( int iLen )
 2495 {
 2496     if ( m_pCur+iLen>m_pResponseEnd )
 2497     {
 2498         m_pCur = m_pResponseEnd;
 2499         m_bUnpackError = true;
 2500         return false;
 2501     }
 2502 
 2503     return true;
 2504 }
 2505 
 2506 
 2507 uint32 ha_sphinx::UnpackDword ()
 2508 {
 2509     if ( !CheckResponcePtr ( sizeof(uint32) ) ) // NOLINT
 2510     {
 2511         return 0;
 2512     }
 2513 
 2514     uint32 uRes = ntohl ( sphUnalignedRead ( *(uint32*)m_pCur ) );
 2515     m_pCur += sizeof(uint32); // NOLINT
 2516     return uRes;
 2517 }
 2518 
 2519 
 2520 char * ha_sphinx::UnpackString ()
 2521 {
 2522     uint32 iLen = UnpackDword ();
 2523     if ( !iLen )
 2524         return NULL;
 2525 
 2526     if ( !CheckResponcePtr ( iLen ) )
 2527     {
 2528         return NULL;
 2529     }
 2530 
 2531     char * sRes = new char [ 1+iLen ];
 2532     memcpy ( sRes, m_pCur, iLen );
 2533     sRes[iLen] = '\0';
 2534     m_pCur += iLen;
 2535     return sRes;
 2536 }
 2537 
 2538 
 2539 static inline const char * FixNull ( const char * s )
 2540 {
 2541     return s ? s : "(null)";
 2542 }
 2543 
 2544 
 2545 bool ha_sphinx::UnpackSchema ()
 2546 {
 2547     SPH_ENTER_METHOD();
 2548 
 2549     // cleanup
 2550     if ( m_dFields )
 2551         for ( int i=0; i<(int)m_iFields; i++ )
 2552             SafeDeleteArray ( m_dFields[i] );
 2553     SafeDeleteArray ( m_dFields );
 2554 
 2555     // unpack network packet
 2556     uint32 uStatus = UnpackDword ();
 2557     char * sMessage = NULL;
 2558 
 2559     if ( uStatus!=SEARCHD_OK )
 2560     {
 2561         sMessage = UnpackString ();
 2562         CSphSEThreadTable * pTable = GetTls ();
 2563         if ( pTable )
 2564         {
 2565             strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof(pTable->m_tStats.m_sLastMessage) );
 2566             pTable->m_tStats.m_bLastError = ( uStatus==SEARCHD_ERROR );
 2567         }
 2568 
 2569         if ( uStatus==SEARCHD_ERROR )
 2570         {
 2571             char sError[1024];
 2572             my_snprintf ( sError, sizeof(sError), "searchd error: %s", sMessage );
 2573             my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2574             SafeDeleteArray ( sMessage );
 2575             SPH_RET ( false );
 2576         }
 2577     }
 2578 
 2579     m_iFields = UnpackDword ();
 2580     m_dFields = new char * [ m_iFields ];
 2581     if ( !m_dFields )
 2582     {
 2583         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (fields alloc error)" );
 2584         SPH_RET(false);
 2585     }
 2586 
 2587     for ( uint32 i=0; i<m_iFields; i++ )
 2588         m_dFields[i] = UnpackString ();
 2589 
 2590     SafeDeleteArray ( m_dAttrs );
 2591     m_iAttrs = UnpackDword ();
 2592     m_dAttrs = new CSphSEAttr [ m_iAttrs ];
 2593     if ( !m_dAttrs )
 2594     {
 2595         for ( int i=0; i<(int)m_iFields; i++ )
 2596             SafeDeleteArray ( m_dFields[i] );
 2597         SafeDeleteArray ( m_dFields );
 2598         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (attrs alloc error)" );
 2599         SPH_RET(false);
 2600     }
 2601 
 2602     for ( uint32 i=0; i<m_iAttrs; i++ )
 2603     {
 2604         m_dAttrs[i].m_sName = UnpackString ();
 2605         m_dAttrs[i].m_uType = UnpackDword ();
 2606         if ( m_bUnpackError ) // m_sName may be null
 2607             break;
 2608 
 2609         m_dAttrs[i].m_iField = -1;
 2610         for ( int j=SPHINXSE_SYSTEM_COLUMNS; j<m_pShare->m_iTableFields; j++ )
 2611         {
 2612             const char * sTableField = m_pShare->m_sTableField[j];
 2613             const char * sAttrField = m_dAttrs[i].m_sName;
 2614             if ( m_dAttrs[i].m_sName[0]=='@' )
 2615             {
 2616                 const char * sAtPrefix = "_sph_";
 2617                 if ( strncmp ( sTableField, sAtPrefix, strlen(sAtPrefix) ) )
 2618                     continue;
 2619                 sTableField += strlen(sAtPrefix);
 2620                 sAttrField++;
 2621             }
 2622 
 2623             if ( !strcasecmp ( sAttrField, sTableField ) )
 2624             {
 2625                 // we're almost good, but
 2626                 // let's enforce that timestamp columns can only receive timestamp attributes
 2627                 if ( m_pShare->m_eTableFieldType[j]!=MYSQL_TYPE_TIMESTAMP || m_dAttrs[i].m_uType==SPH_ATTR_TIMESTAMP )
 2628                     m_dAttrs[i].m_iField = j;
 2629                 break;
 2630             }
 2631         }
 2632     }
 2633 
 2634     m_iMatchesTotal = UnpackDword ();
 2635 
 2636     m_bId64 = UnpackDword ();
 2637     if ( m_bId64 && m_pShare->m_eTableFieldType[0]!=MYSQL_TYPE_LONGLONG )
 2638     {
 2639         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: 1st column must be bigint to accept 64-bit DOCIDs" );
 2640         SPH_RET(false);
 2641     }
 2642 
 2643     // network packet unpacked; build unbound fields map
 2644     SafeDeleteArray ( m_dUnboundFields );
 2645     m_dUnboundFields = new int [ m_pShare->m_iTableFields ];
 2646 
 2647     for ( int i=0; i<m_pShare->m_iTableFields; i++ )
 2648     {
 2649         if ( i<SPHINXSE_SYSTEM_COLUMNS )
 2650             m_dUnboundFields[i] = SPH_ATTR_NONE;
 2651 
 2652         else if ( m_pShare->m_eTableFieldType[i]==MYSQL_TYPE_TIMESTAMP )
 2653             m_dUnboundFields[i] = SPH_ATTR_TIMESTAMP;
 2654 
 2655         else
 2656             m_dUnboundFields[i] = SPH_ATTR_INTEGER;
 2657     }
 2658 
 2659     for ( uint32 i=0; i<m_iAttrs; i++ )
 2660         if ( m_dAttrs[i].m_iField>=0 )
 2661             m_dUnboundFields [ m_dAttrs[i].m_iField ] = SPH_ATTR_NONE;
 2662 
 2663     if ( m_bUnpackError )
 2664         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (unpack error)" );
 2665 
 2666     SPH_RET ( !m_bUnpackError );
 2667 }
 2668 
 2669 
 2670 bool ha_sphinx::UnpackStats ( CSphSEStats * pStats )
 2671 {
 2672     assert ( pStats );
 2673 
 2674     char * pCurSave = m_pCur;
 2675     for ( uint i=0; i<m_iMatchesTotal && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
 2676     {
 2677         m_pCur += m_bId64 ? 12 : 8; // skip id+weight
 2678         for ( uint32 i=0; i<m_iAttrs && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
 2679         {
 2680             if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
 2681             {
 2682                 // skip MVA list
 2683                 uint32 uCount = UnpackDword ();
 2684                 m_pCur += uCount*4;
 2685             } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING )
 2686             {
 2687                 uint32 iLen = UnpackDword();
 2688                 m_pCur += iLen;
 2689             } else // skip normal value
 2690                 m_pCur += m_dAttrs[i].m_uType==SPH_ATTR_BIGINT ? 8 : 4;
 2691         }
 2692     }
 2693 
 2694     pStats->m_iMatchesTotal = UnpackDword ();
 2695     pStats->m_iMatchesFound = UnpackDword ();
 2696     pStats->m_iQueryMsec = UnpackDword ();
 2697     pStats->m_iWords = UnpackDword ();
 2698 
 2699     if ( m_bUnpackError )
 2700         return false;
 2701 
 2702     SafeDeleteArray ( pStats->m_dWords );
 2703     if ( pStats->m_iWords<0 || pStats->m_iWords>=SPHINXSE_MAX_KEYWORDSTATS )
 2704         return false;
 2705     pStats->m_dWords = new CSphSEWordStats [ pStats->m_iWords ];
 2706     if ( !pStats->m_dWords )
 2707         return false;
 2708 
 2709     for ( int i=0; i<pStats->m_iWords; i++ )
 2710     {
 2711         CSphSEWordStats & tWord = pStats->m_dWords[i];
 2712         tWord.m_sWord = UnpackString ();
 2713         tWord.m_iDocs = UnpackDword ();
 2714         tWord.m_iHits = UnpackDword ();
 2715     }
 2716 
 2717     if ( m_bUnpackError )
 2718         return false;
 2719 
 2720     m_pCur = pCurSave;
 2721     return true;
 2722 }
 2723 
 2724 
 2725 /// condition pushdown implementation, to properly intercept WHERE clauses on my columns
 2726 #if MYSQL_VERSION_ID<50610
 2727 const COND * ha_sphinx::cond_push ( const COND * cond )
 2728 #else
 2729 const Item * ha_sphinx::cond_push ( const Item *cond )
 2730 #endif
 2731 {
 2732     // catch the simplest case: query_column="some text"
 2733     for ( ;; )
 2734     {
 2735         if ( cond->type()!=Item::FUNC_ITEM )
 2736             break;
 2737 
 2738         Item_func * condf = (Item_func *)cond;
 2739         if ( condf->functype()!=Item_func::EQ_FUNC || condf->argument_count()!=2 )
 2740             break;
 2741 
 2742         // get my tls
 2743         CSphSEThreadTable * pTable = GetTls ();
 2744         if ( !pTable )
 2745             break;
 2746 
 2747         Item ** args = condf->arguments();
 2748         if ( !m_pShare->m_bSphinxQL )
 2749         {
 2750             // on non-QL tables, intercept query=value condition for SELECT
 2751             if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::STRING_ITEM ))
 2752                 break;
 2753 
 2754             Item_field * pField = (Item_field *) args[0];
 2755             if ( pField->field->field_index!=2 ) // FIXME! magic key index
 2756                 break;
 2757 
 2758             // copy the query, and let know that we intercepted this condition
 2759             Item_string * pString = (Item_string *) args[1];
 2760             pTable->m_bQuery = true;
 2761             strncpy ( pTable->m_sQuery, pString->str_value.c_ptr(), sizeof(pTable->m_sQuery) );
 2762             pTable->m_sQuery[sizeof(pTable->m_sQuery)-1] = '\0';
 2763             pTable->m_pQueryCharset = pString->str_value.charset();
 2764 
 2765         } else
 2766         {
 2767             if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::INT_ITEM ))
 2768                 break;
 2769 
 2770             // on QL tables, intercept id=value condition for DELETE
 2771             Item_field * pField = (Item_field *) args[0];
 2772             if ( pField->field->field_index!=0 ) // FIXME! magic key index
 2773                 break;
 2774 
 2775             Item_int * pVal = (Item_int *) args[1];
 2776             pTable->m_iCondId = pVal->val_int();
 2777             pTable->m_bCondId = true;
 2778         }
 2779 
 2780         // we intercepted this condition
 2781         return NULL;
 2782     }
 2783 
 2784     // don't change anything
 2785     return cond;
 2786 }
 2787 
 2788 
 2789 /// condition popup
 2790 void ha_sphinx::cond_pop ()
 2791 {
 2792     CSphSEThreadTable * pTable = GetTls ();
 2793     if ( pTable )
 2794         pTable->m_bQuery = false;
 2795 }
 2796 
 2797 
 2798 /// get TLS (maybe allocate it, too)
 2799 CSphSEThreadTable * ha_sphinx::GetTls()
 2800 {
 2801     SPH_ENTER_METHOD()
 2802     // where do we store that pointer in today's version?
 2803     CSphTLS ** ppTls;
 2804 #if MYSQL_VERSION_ID>50100
 2805     ppTls = (CSphTLS**) thd_ha_data ( table->in_use, ht );
 2806 #else
 2807     ppTls = (CSphTLS**) &current_thd->ha_data[sphinx_hton.slot];
 2808 #endif // >50100
 2809 
 2810     CSphSEThreadTable * pTable = NULL;
 2811     // allocate if needed
 2812     if ( !*ppTls )
 2813     {
 2814         *ppTls = new CSphTLS ( this );
 2815         pTable = (*ppTls)->m_pHeadTable;
 2816     } else
 2817     {
 2818         pTable = (*ppTls)->m_pHeadTable;
 2819     }
 2820 
 2821     while ( pTable && pTable->m_pHandler!=this )
 2822         pTable = pTable->m_pTableNext;
 2823 
 2824     if ( !pTable )
 2825     {
 2826         pTable = new CSphSEThreadTable ( this );
 2827         pTable->m_pTableNext = (*ppTls)->m_pHeadTable;
 2828         (*ppTls)->m_pHeadTable = pTable;
 2829     }
 2830 
 2831     // errors will be handled by caller
 2832     return pTable;
 2833 }
 2834 
 2835 
 2836 // Positions an index cursor to the index specified in the handle. Fetches the
 2837 // row if available. If the key value is null, begin at the first key of the
 2838 // index.
 2839 int ha_sphinx::index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function )
 2840 {
 2841     SPH_ENTER_METHOD();
 2842     char sError[256];
 2843 
 2844     // set new data for thd->ha_data, it is used in show_status
 2845     CSphSEThreadTable * pTable = GetTls();
 2846     if ( !pTable )
 2847     {
 2848         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: TLS malloc() failed" );
 2849         SPH_RET ( HA_ERR_END_OF_FILE );
 2850     }
 2851     pTable->m_tStats.Reset ();
 2852 
 2853     // sphinxql table, just return the key once
 2854     if ( m_pShare->m_bSphinxQL )
 2855     {
 2856         // over and out
 2857         if ( pTable->m_bCondDone )
 2858             SPH_RET ( HA_ERR_END_OF_FILE );
 2859 
 2860         // return a value from pushdown, if any
 2861         if ( pTable->m_bCondId )
 2862         {
 2863             table->field[0]->store ( pTable->m_iCondId, 1 );
 2864             pTable->m_bCondDone = true;
 2865             SPH_RET(0);
 2866         }
 2867 
 2868         // return a value from key
 2869         longlong iRef = 0;
 2870         if ( key_len==4 )
 2871             iRef = uint4korr ( key );
 2872         else if ( key_len==8 )
 2873             iRef = uint8korr ( key );
 2874         else
 2875         {
 2876             my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unexpected key length" );
 2877             SPH_RET ( HA_ERR_END_OF_FILE );
 2878         }
 2879 
 2880         table->field[0]->store ( iRef, 1 );
 2881         pTable->m_bCondDone = true;
 2882         SPH_RET(0);
 2883     }
 2884 
 2885     // parse query
 2886     if ( pTable->m_bQuery )
 2887     {
 2888         // we have a query from condition pushdown
 2889         m_pCurrentKey = (const byte *) pTable->m_sQuery;
 2890         m_iCurrentKeyLen = strlen(pTable->m_sQuery);
 2891     } else
 2892     {
 2893         // just use the key (might be truncated)
 2894         m_pCurrentKey = key+HA_KEY_BLOB_LENGTH;
 2895         m_iCurrentKeyLen = uint2korr(key); // or maybe key_len?
 2896         pTable->m_pQueryCharset = m_pShare ? m_pShare->m_pTableQueryCharset : NULL;
 2897     }
 2898 
 2899     CSphSEQuery q ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, m_pShare->m_sIndex );
 2900     if ( !q.Parse () )
 2901     {
 2902         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), q.m_sParseError );
 2903         SPH_RET ( HA_ERR_END_OF_FILE );
 2904     }
 2905 
 2906     // do connect
 2907     int iSocket = ConnectAPI ( q.m_sHost, q.m_iPort );
 2908     if ( iSocket<0 )
 2909         SPH_RET ( HA_ERR_END_OF_FILE );
 2910 
 2911     // my buffer
 2912     char * pBuffer; // will be free by CSphSEQuery dtor; do NOT free manually
 2913     int iReqLen = q.BuildRequest ( &pBuffer );
 2914 
 2915     if ( iReqLen<=0 )
 2916     {
 2917         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: q.BuildRequest() failed" );
 2918         SPH_RET ( HA_ERR_END_OF_FILE );
 2919     }
 2920 
 2921     // send request
 2922     ::send ( iSocket, pBuffer, iReqLen, 0 );
 2923 
 2924     // receive reply
 2925     char sHeader[8];
 2926     int iGot = ::recv ( iSocket, sHeader, sizeof(sHeader), RECV_FLAGS );
 2927     if ( iGot!=sizeof(sHeader) )
 2928     {
 2929         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "failed to receive response header (searchd went away?)" );
 2930         SPH_RET ( HA_ERR_END_OF_FILE );
 2931     }
 2932 
 2933     short int uRespStatus = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[0] ) ) );
 2934     short int uRespVersion = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[2] ) ) );
 2935     uint uRespLength = ntohl ( sphUnalignedRead ( *(uint *)( &sHeader[4] ) ) );
 2936     SPH_DEBUG ( "got response header (status=%d version=%d length=%d)",
 2937         uRespStatus, uRespVersion, uRespLength );
 2938 
 2939     SafeDeleteArray ( m_pResponse );
 2940     if ( uRespLength<=SPHINXSE_MAX_ALLOC )
 2941         m_pResponse = new char [ uRespLength+1 ];
 2942 
 2943     if ( !m_pResponse )
 2944     {
 2945         my_snprintf ( sError, sizeof(sError), "bad searchd response length (length=%u)", uRespLength );
 2946         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2947         SPH_RET ( HA_ERR_END_OF_FILE );
 2948     }
 2949 
 2950     int iRecvLength = 0;
 2951     while ( iRecvLength<(int)uRespLength )
 2952     {
 2953         int iRecv = ::recv ( iSocket, m_pResponse+iRecvLength, uRespLength-iRecvLength, RECV_FLAGS );
 2954         if ( iRecv<0 )
 2955             break;
 2956         iRecvLength += iRecv;
 2957     }
 2958 
 2959     ::closesocket ( iSocket );
 2960     iSocket = -1;
 2961 
 2962     if ( iRecvLength!=(int)uRespLength )
 2963     {
 2964         my_snprintf ( sError, sizeof(sError), "net read error (expected=%d, got=%d)", uRespLength, iRecvLength );
 2965         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2966         SPH_RET ( HA_ERR_END_OF_FILE );
 2967     }
 2968 
 2969     // we'll have a message, at least
 2970     pTable->m_bStats = true;
 2971 
 2972     // parse reply
 2973     m_iCurrentPos = 0;
 2974     m_pCur = m_pResponse;
 2975     m_pResponseEnd = m_pResponse + uRespLength;
 2976     m_bUnpackError = false;
 2977 
 2978     if ( uRespStatus!=SEARCHD_OK )
 2979     {
 2980         char * sMessage = UnpackString ();
 2981         if ( !sMessage )
 2982         {
 2983             my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "no valid response from searchd (status=%d, resplen=%d)",
 2984                 uRespStatus, uRespLength );
 2985             SPH_RET ( HA_ERR_END_OF_FILE );
 2986         }
 2987 
 2988         strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof(pTable->m_tStats.m_sLastMessage) );
 2989         SafeDeleteArray ( sMessage );
 2990 
 2991         if ( uRespStatus!=SEARCHD_WARNING )
 2992         {
 2993             my_snprintf ( sError, sizeof(sError), "searchd error: %s", pTable->m_tStats.m_sLastMessage );
 2994             my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
 2995 
 2996             pTable->m_tStats.m_bLastError = true;
 2997             SPH_RET ( HA_ERR_END_OF_FILE );
 2998         }
 2999     }
 3000 
 3001     if ( !UnpackSchema () )
 3002         SPH_RET ( HA_ERR_END_OF_FILE );
 3003 
 3004     if ( !UnpackStats ( &pTable->m_tStats ) )
 3005     {
 3006         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackStats() failed" );
 3007         SPH_RET ( HA_ERR_END_OF_FILE );
 3008     }
 3009 
 3010     SPH_RET ( get_rec ( buf, key, key_len ) );
 3011 }
 3012 
 3013 
 3014 // Positions an index cursor to the index specified in key. Fetches the
 3015 // row if any. This is only used to read whole keys.
 3016 int ha_sphinx::index_read_idx ( byte *, uint, const byte *, uint, enum ha_rkey_function )
 3017 {
 3018     SPH_ENTER_METHOD();
 3019     SPH_RET ( HA_ERR_WRONG_COMMAND );
 3020 }
 3021 
 3022 
 3023 // Used to read forward through the index.
 3024 int ha_sphinx::index_next ( byte * buf )
 3025 {
 3026     SPH_ENTER_METHOD();
 3027     SPH_RET ( get_rec ( buf, m_pCurrentKey, m_iCurrentKeyLen ) );
 3028 }
 3029 
 3030 
 3031 int ha_sphinx::index_next_same ( byte * buf, const byte * key, uint keylen )
 3032 {
 3033     SPH_ENTER_METHOD();
 3034     SPH_RET ( get_rec ( buf, key, keylen ) );
 3035 }
 3036 
 3037 #ifndef PRIi64
 3038 #define PRIi64 "lld"
 3039 #endif
 3040 
 3041 #define INT64_FMT "%" PRIi64
 3042 
 3043 int ha_sphinx::get_rec ( byte * buf, const byte *, uint )
 3044 {
 3045     SPH_ENTER_METHOD();
 3046 
 3047     if ( m_iCurrentPos>=m_iMatchesTotal )
 3048     {
 3049         SafeDeleteArray ( m_pResponse );
 3050         SPH_RET ( HA_ERR_END_OF_FILE );
 3051     }
 3052 
 3053     #if MYSQL_VERSION_ID>50100
 3054     my_bitmap_map * org_bitmap = dbug_tmp_use_all_columns ( table, table->write_set );
 3055     #endif
 3056     Field ** field = table->field;
 3057 
 3058     // unpack and return the match
 3059     longlong uMatchID = UnpackDword ();
 3060     if ( m_bId64 )
 3061         uMatchID = ( uMatchID<<32 ) + UnpackDword();
 3062     uint32 uMatchWeight = UnpackDword ();
 3063 
 3064     field[0]->store ( uMatchID, 1 );
 3065     field[1]->store ( uMatchWeight, 1 );
 3066     field[2]->store ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, &my_charset_bin );
 3067 
 3068     for ( uint32 i=0; i<m_iAttrs; i++ )
 3069     {
 3070         longlong iValue64 = 0;
 3071         uint32 uValue = UnpackDword ();
 3072         if ( m_dAttrs[i].m_uType==SPH_ATTR_BIGINT )
 3073             iValue64 = ( (longlong)uValue<<32 ) | UnpackDword();
 3074         if ( m_dAttrs[i].m_iField<0 )
 3075         {
 3076             // skip MVA or String
 3077             if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
 3078             {
 3079                 for ( ; uValue>0 && !m_bUnpackError; uValue-- )
 3080                     UnpackDword();
 3081             } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING && CheckResponcePtr ( uValue ) )
 3082             {
 3083                 m_pCur += uValue;
 3084             }
 3085             continue;
 3086         }
 3087 
 3088         Field * af = field [ m_dAttrs[i].m_iField ];
 3089         switch ( m_dAttrs[i].m_uType )
 3090         {
 3091             case SPH_ATTR_INTEGER:
 3092             case SPH_ATTR_ORDINAL:
 3093             case SPH_ATTR_BOOL:
 3094                 af->store ( uValue, 1 );
 3095                 break;
 3096 
 3097             case SPH_ATTR_FLOAT:
 3098                 af->store ( sphDW2F(uValue) );
 3099                 break;
 3100 
 3101             case SPH_ATTR_TIMESTAMP:
 3102                 if ( af->type()==MYSQL_TYPE_TIMESTAMP )
 3103                     longstore ( af->ptr, uValue ); // because store() does not accept timestamps
 3104                 else
 3105                     af->store ( uValue, 1 );
 3106                 break;
 3107 
 3108             case SPH_ATTR_BIGINT:
 3109                 af->store ( iValue64, 0 );
 3110                 break;
 3111 
 3112             case SPH_ATTR_STRING:
 3113                 if ( !uValue )
 3114                     af->store ( "", 0, &my_charset_bin );
 3115                 else if ( CheckResponcePtr ( uValue ) )
 3116                 {
 3117                     af->store ( m_pCur, uValue, &my_charset_bin );
 3118                     m_pCur += uValue;
 3119                 }
 3120                 break;
 3121 
 3122             case SPH_ATTR_UINT64SET:
 3123             case SPH_ATTR_UINT32SET :
 3124                 if ( uValue<=0 )
 3125                 {
 3126                     // shortcut, empty MVA set
 3127                     af->store ( "", 0, &my_charset_bin );
 3128 
 3129                 } else
 3130                 {
 3131                     // convert MVA set to comma-separated string
 3132                     char sBuf[1024]; // FIXME! magic size
 3133                     char * pCur = sBuf;
 3134 
 3135                     if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET )
 3136                     {
 3137                         for ( ; uValue>0 && !m_bUnpackError; uValue-- )
 3138                         {
 3139                             uint32 uEntry = UnpackDword ();
 3140                             if ( pCur < sBuf+sizeof(sBuf)-16 ) // 10 chars per 32bit value plus some safety bytes
 3141                             {
 3142                                 snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u", uEntry );
 3143                                 while ( *pCur ) pCur++;
 3144                                 if ( uValue>1 )
 3145                                     *pCur++ = ','; // non-trailing commas
 3146                             }
 3147                         }
 3148                     } else
 3149                     {
 3150                         for ( ; uValue>0 && !m_bUnpackError; uValue-=2 )
 3151                         {
 3152                             longlong uEntry = UnpackDword ();
 3153                             uEntry = uEntry<<32;
 3154                             uEntry += UnpackDword();
 3155                             if ( pCur < sBuf+sizeof(sBuf)-24 ) // 20 chars per 64bit value plus some safety bytes
 3156                             {
 3157                                 snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, INT64_FMT, uEntry );
 3158                                 while ( *pCur ) pCur++;
 3159                                 if ( uValue>2 )
 3160                                     *pCur++ = ','; // non-trailing commas
 3161                             }
 3162                         }
 3163                     }
 3164 
 3165                     af->store ( sBuf, pCur-sBuf, &my_charset_bin );
 3166                 }
 3167                 break;
 3168 
 3169             default:
 3170                 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unhandled attr type" );
 3171                 SafeDeleteArray ( m_pResponse );
 3172                 SPH_RET ( HA_ERR_END_OF_FILE );
 3173         }
 3174     }
 3175 
 3176     if ( m_bUnpackError )
 3177     {
 3178         my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: response unpacker failed" );
 3179         SafeDeleteArray ( m_pResponse );
 3180         SPH_RET ( HA_ERR_END_OF_FILE );
 3181     }
 3182 
 3183     // zero out unmapped fields
 3184     for ( int i=SPHINXSE_SYSTEM_COLUMNS; i<(int)table->s->fields; i++ )
 3185         if ( m_dUnboundFields[i]!=SPH_ATTR_NONE )
 3186             switch ( m_dUnboundFields[i] )
 3187     {
 3188         case SPH_ATTR_INTEGER:      table->field[i]->store ( 0, 1 ); break;
 3189         case SPH_ATTR_TIMESTAMP:    longstore ( table->field[i]->ptr, 0 ); break;
 3190         default:
 3191             my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0),
 3192                 "INTERNAL ERROR: unhandled unbound field type %d", m_dUnboundFields[i] );
 3193             SafeDeleteArray ( m_pResponse );
 3194             SPH_RET ( HA_ERR_END_OF_FILE );
 3195     }
 3196 
 3197     memset ( buf, 0, table->s->null_bytes );
 3198     m_iCurrentPos++;
 3199 
 3200     #if MYSQL_VERSION_ID > 50100
 3201     dbug_tmp_restore_column_map ( table->write_set, org_bitmap );
 3202     #endif
 3203 
 3204     SPH_RET(0);
 3205 }
 3206 
 3207 
 3208 // Used to read backwards through the index.
 3209 int ha_sphinx::index_prev ( byte * )
 3210 {
 3211     SPH_ENTER_METHOD();
 3212     SPH_RET ( HA_ERR_WRONG_COMMAND );
 3213 }
 3214 
 3215 
 3216 // index_first() asks for the first key in the index.
 3217 //
 3218 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
 3219 // and sql_select.cc.
 3220 int ha_sphinx::index_first ( byte * )
 3221 {
 3222     SPH_ENTER_METHOD();
 3223     SPH_RET ( HA_ERR_END_OF_FILE );
 3224 }
 3225 
 3226 // index_last() asks for the last key in the index.
 3227 //
 3228 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
 3229 // and sql_select.cc.
 3230 int ha_sphinx::index_last ( byte * )
 3231 {
 3232     SPH_ENTER_METHOD();
 3233     SPH_RET ( HA_ERR_WRONG_COMMAND );
 3234 }
 3235 
 3236 
 3237 int ha_sphinx::rnd_init ( bool )
 3238 {
 3239     SPH_ENTER_METHOD();
 3240     SPH_RET(0);
 3241 }
 3242 
 3243 
 3244 int ha_sphinx::rnd_end()
 3245 {
 3246     SPH_ENTER_METHOD();
 3247     SPH_RET(0);
 3248 }
 3249 
 3250 
 3251 int ha_sphinx::rnd_next ( byte * )
 3252 {
 3253     SPH_ENTER_METHOD();
 3254     SPH_RET ( HA_ERR_END_OF_FILE );
 3255 }
 3256 
 3257 
 3258 void ha_sphinx::position ( const byte * )
 3259 {
 3260     SPH_ENTER_METHOD();
 3261     SPH_VOID_RET();
 3262 }
 3263 
 3264 
 3265 // This is like rnd_next, but you are given a position to use
 3266 // to determine the row. The position will be of the type that you stored in
 3267 // ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
 3268 // or position you saved when position() was called.
 3269 // Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
 3270 int ha_sphinx::rnd_pos ( byte *, byte * )
 3271 {
 3272     SPH_ENTER_METHOD();
 3273     SPH_RET ( HA_ERR_WRONG_COMMAND );
 3274 }
 3275 
 3276 
 3277 #if MYSQL_VERSION_ID>=50030
 3278 int ha_sphinx::info ( uint )
 3279 #else
 3280 void ha_sphinx::info ( uint )
 3281 #endif
 3282 {
 3283     SPH_ENTER_METHOD();
 3284 
 3285     if ( table->s->keys>0 )
 3286         table->key_info[0].rec_per_key[0] = 1;
 3287 
 3288     #if MYSQL_VERSION_ID>50100
 3289     stats.records = 20;
 3290     #else
 3291     records = 20;
 3292     #endif
 3293 
 3294 #if MYSQL_VERSION_ID>=50030
 3295     SPH_RET(0);
 3296 #else
 3297     SPH_VOID_RET();
 3298 #endif
 3299 }
 3300 
 3301 
 3302 int ha_sphinx::reset ()
 3303 {
 3304     SPH_ENTER_METHOD();
 3305     CSphSEThreadTable * pTable = GetTls ();
 3306     if ( pTable )
 3307         pTable->m_bQuery = false;
 3308     SPH_RET(0);
 3309 }
 3310 
 3311 
 3312 int ha_sphinx::delete_all_rows()
 3313 {
 3314     SPH_ENTER_METHOD();
 3315     SPH_RET ( HA_ERR_WRONG_COMMAND );
 3316 }
 3317 
 3318 
 3319 // First you should go read the section "locking functions for mysql" in
 3320 // lock.cc to understand this.
 3321 // This create a lock on the table. If you are implementing a storage engine
 3322 // that can handle transacations look at ha_berkely.cc to see how you will
 3323 // want to go about doing this. Otherwise you should consider calling flock()
 3324 // here.
 3325 //
 3326 // Called from lock.cc by lock_external() and unlock_external(). Also called
 3327 // from sql_table.cc by copy_data_between_tables().
 3328 int ha_sphinx::external_lock ( THD *, int )
 3329 {
 3330     SPH_ENTER_METHOD();
 3331     SPH_RET(0);
 3332 }
 3333 
 3334 
 3335 THR_LOCK_DATA ** ha_sphinx::store_lock ( THD *, THR_LOCK_DATA ** to,
 3336     enum thr_lock_type lock_type )
 3337 {
 3338     SPH_ENTER_METHOD();
 3339 
 3340     if ( lock_type!=TL_IGNORE && m_tLock.type==TL_UNLOCK )
 3341         m_tLock.type = lock_type;
 3342 
 3343     *to++ = &m_tLock;
 3344     SPH_RET(to);
 3345 }
 3346 
 3347 
 3348 int ha_sphinx::delete_table ( const char * )
 3349 {
 3350     SPH_ENTER_METHOD();
 3351     SPH_RET(0);
 3352 }
 3353 
 3354 
 3355 // Renames a table from one name to another from alter table call.
 3356 //
 3357 // If you do not implement this, the default rename_table() is called from
 3358 // handler.cc and it will delete all files with the file extentions returned
 3359 // by bas_ext().
 3360 //
 3361 // Called from sql_table.cc by mysql_rename_table().
 3362 int ha_sphinx::rename_table ( const char *, const char * )
 3363 {
 3364     SPH_ENTER_METHOD();
 3365     SPH_RET(0);
 3366 }
 3367 
 3368 
 3369 // Given a starting key, and an ending key estimate the number of rows that
 3370 // will exist between the two. end_key may be empty which in case determine
 3371 // if start_key matches any rows.
 3372 //
 3373 // Called from opt_range.cc by check_quick_keys().
 3374 ha_rows ha_sphinx::records_in_range ( uint, key_range *, key_range * )
 3375 {
 3376     SPH_ENTER_METHOD();
 3377     SPH_RET(3); // low number to force index usage
 3378 }
 3379 
 3380 #if MYSQL_VERSION_ID < 50610
 3381 #define user_defined_key_parts key_parts
 3382 #endif
 3383 
 3384 // create() is called to create a database. The variable name will have the name
 3385 // of the table. When create() is called you do not need to worry about opening
 3386 // the table. Also, the FRM file will have already been created so adjusting
 3387 // create_info will not do you any good. You can overwrite the frm file at this
 3388 // point if you wish to change the table definition, but there are no methods
 3389 // currently provided for doing that.
 3390 //
 3391 // Called from handle.cc by ha_create_table().
 3392 int ha_sphinx::create ( const char * name, TABLE * table, HA_CREATE_INFO * )
 3393 {
 3394     SPH_ENTER_METHOD();
 3395     char sError[256];
 3396 
 3397     CSphSEShare tInfo;
 3398     if ( !ParseUrl ( &tInfo, table, true ) )
 3399         SPH_RET(-1);
 3400 
 3401     // check SphinxAPI table
 3402     for ( ; !tInfo.m_bSphinxQL; )
 3403     {
 3404         // check system fields (count and types)
 3405         if ( table->s->fields<SPHINXSE_SYSTEM_COLUMNS )
 3406         {
 3407             my_snprintf ( sError, sizeof(sError), "%s: there MUST be at least %d columns",
 3408                 name, SPHINXSE_SYSTEM_COLUMNS );
 3409             break;
 3410         }
 3411 
 3412         if ( !IsIDField ( table->field[0] ) )
 3413         {
 3414             my_snprintf ( sError, sizeof(sError), "%s: 1st column (docid) MUST be unsigned integer or bigint", name );
 3415             break;
 3416         }
 3417 
 3418         if ( !IsIntegerFieldType ( table->field[1]->type() ) )
 3419         {
 3420             my_snprintf ( sError, sizeof(sError), "%s: 2nd column (weight) MUST be integer or bigint", name );
 3421             break;
 3422         }
 3423 
 3424         enum_field_types f2 = table->field[2]->type();
 3425         if ( f2!=MYSQL_TYPE_VARCHAR
 3426             && f2!=MYSQL_TYPE_BLOB && f2!=MYSQL_TYPE_MEDIUM_BLOB && f2!=MYSQL_TYPE_LONG_BLOB && f2!=MYSQL_TYPE_TINY_BLOB )
 3427         {
 3428             my_snprintf ( sError, sizeof(sError), "%s: 3rd column (search query) MUST be varchar or text", name );
 3429             break;
 3430         }
 3431 
 3432         // check attributes
 3433         int i;
 3434         for ( i=3; i<(int)table->s->fields; i++ )
 3435         {
 3436             enum_field_types eType = table->field[i]->type();
 3437             if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
 3438             {
 3439                 my_snprintf ( sError, sizeof(sError), "%s: %dth column (attribute %s) MUST be integer, bigint, timestamp, varchar, or float",
 3440                     name, i+1, table->field[i]->field_name );
 3441                 break;
 3442             }
 3443         }
 3444 
 3445         if ( i!=(int)table->s->fields )
 3446             break;
 3447 
 3448         // check index
 3449         if (
 3450             table->s->keys!=1 ||
 3451             table->key_info[0].user_defined_key_parts!=1 ||
 3452             strcasecmp ( table->key_info[0].key_part[0].field->field_name, table->field[2]->field_name ) )
 3453         {
 3454             my_snprintf ( sError, sizeof(sError), "%s: there must be an index on '%s' column",
 3455                 name, table->field[2]->field_name );
 3456             break;
 3457         }
 3458 
 3459         // all good
 3460         sError[0] = '\0';
 3461         break;
 3462     }
 3463 
 3464     // check SphinxQL table
 3465     for ( ; tInfo.m_bSphinxQL; )
 3466     {
 3467         sError[0] = '\0';
 3468 
 3469         // check that 1st column is id, is of int type, and has an index
 3470         if ( strcmp ( table->field[0]->field_name, "id" ) )
 3471         {
 3472             my_snprintf ( sError, sizeof(sError), "%s: 1st column must be called 'id'", name );
 3473             break;
 3474         }
 3475 
 3476         if ( !IsIDField ( table->field[0] ) )
 3477         {
 3478             my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be INT UNSIGNED or BIGINT", name );
 3479             break;
 3480         }
 3481 
 3482         // check index
 3483         if (
 3484             table->s->keys!=1 ||
 3485             table->key_info[0].user_defined_key_parts!=1 ||
 3486             strcasecmp ( table->key_info[0].key_part[0].field->field_name, "id" ) )
 3487         {
 3488             my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be indexed", name );
 3489             break;
 3490         }
 3491 
 3492         // check column types
 3493         for ( int i=1; i<(int)table->s->fields; i++ )
 3494         {
 3495             enum_field_types eType = table->field[i]->type();
 3496             if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
 3497             {
 3498                 my_snprintf ( sError, sizeof(sError), "%s: column %d(%s) is of unsupported type (use int/bigint/timestamp/varchar/float)",
 3499                     name, i+1, table->field[i]->field_name );
 3500                 break;
 3501             }
 3502         }
 3503         if ( sError[0] )
 3504             break;
 3505 
 3506         // all good
 3507         break;
 3508     }
 3509 
 3510     // report and bail
 3511     if ( sError[0] )
 3512     {
 3513         my_error ( ER_CANT_CREATE_TABLE, MYF(0), sError, -1 );
 3514         SPH_RET(-1);
 3515     }
 3516 
 3517     SPH_RET(0);
 3518 }
 3519 
 3520 // show functions
 3521 
 3522 #if MYSQL_VERSION_ID<50100
 3523 #define SHOW_VAR_FUNC_BUFF_SIZE 1024
 3524 #endif
 3525 
 3526 CSphSEStats * sphinx_get_stats ( THD * thd, SHOW_VAR * out )
 3527 {
 3528 #if MYSQL_VERSION_ID>50100
 3529     if ( sphinx_hton_ptr )
 3530     {
 3531         CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
 3532 
 3533         if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
 3534             return &pTls->m_pHeadTable->m_tStats;
 3535     }
 3536 #else
 3537     CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
 3538     if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
 3539         return &pTls->m_pHeadTable->m_tStats;
 3540 #endif
 3541 
 3542     out->type = SHOW_CHAR;
 3543     out->value = "";
 3544     return 0;
 3545 }
 3546 
 3547 int sphinx_showfunc_total ( THD * thd, SHOW_VAR * out, char * )
 3548 {
 3549     CSphSEStats * pStats = sphinx_get_stats ( thd, out );
 3550     if ( pStats )
 3551     {
 3552         out->type = SHOW_INT;
 3553         out->value = (char *) &pStats->m_iMatchesTotal;
 3554     }
 3555     return 0;
 3556 }
 3557 
 3558 int sphinx_showfunc_total_found ( THD * thd, SHOW_VAR * out, char * )
 3559 {
 3560     CSphSEStats * pStats = sphinx_get_stats ( thd, out );
 3561     if ( pStats )
 3562     {
 3563         out->type = SHOW_INT;
 3564         out->value = (char *) &pStats->m_iMatchesFound;
 3565     }
 3566     return 0;
 3567 }
 3568 
 3569 int sphinx_showfunc_time ( THD * thd, SHOW_VAR * out, char * )
 3570 {
 3571     CSphSEStats * pStats = sphinx_get_stats ( thd, out );
 3572     if ( pStats )
 3573     {
 3574         out->type = SHOW_INT;
 3575         out->value = (char *) &pStats->m_iQueryMsec;
 3576     }
 3577     return 0;
 3578 }
 3579 
 3580 int sphinx_showfunc_word_count ( THD * thd, SHOW_VAR * out, char * )
 3581 {
 3582     CSphSEStats * pStats = sphinx_get_stats ( thd, out );
 3583     if ( pStats )
 3584     {
 3585         out->type = SHOW_INT;
 3586         out->value = (char *) &pStats->m_iWords;
 3587     }
 3588     return 0;
 3589 }
 3590 
 3591 int sphinx_showfunc_words ( THD * thd, SHOW_VAR * out, char * sBuffer )
 3592 {
 3593 #if MYSQL_VERSION_ID>50100
 3594     if ( sphinx_hton_ptr )
 3595     {
 3596         CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
 3597 #else
 3598     {
 3599         CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
 3600 #endif
 3601         if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
 3602         {
 3603             CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
 3604             if ( pStats && pStats->m_iWords )
 3605             {
 3606                 uint uBuffLen = 0;
 3607 
 3608                 out->type = SHOW_CHAR;
 3609                 out->value = sBuffer;
 3610 
 3611                 // the following is partially based on code in sphinx_show_status()
 3612                 sBuffer[0] = 0;
 3613                 for ( int i=0; i<pStats->m_iWords; i++ )
 3614                 {
 3615                     CSphSEWordStats & tWord = pStats->m_dWords[i];
 3616                     uBuffLen = my_snprintf ( sBuffer, SHOW_VAR_FUNC_BUFF_SIZE, "%s%s:%d:%d ", sBuffer,
 3617                         tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
 3618                 }
 3619 
 3620                 if ( uBuffLen > 0 )
 3621                 {
 3622                     // trim last space
 3623                     sBuffer [ --uBuffLen ] = 0;
 3624 
 3625                     if ( pTls->m_pHeadTable->m_pQueryCharset )
 3626                     {
 3627                         // String::c_ptr() will nul-terminate the buffer.
 3628                         //
 3629                         // NOTE: It's not entirely clear whether this conversion is necessary at all.
 3630 
 3631                         String sConvert;
 3632                         uint iErrors;
 3633                         sConvert.copy ( sBuffer, uBuffLen, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
 3634                         memcpy ( sBuffer, sConvert.c_ptr(), sConvert.length() + 1 );
 3635                     }
 3636                 }
 3637 
 3638                 return 0;
 3639             }
 3640         }
 3641     }
 3642 
 3643     out->type = SHOW_CHAR;
 3644     out->value = "";
 3645     return 0;
 3646 }
 3647 
 3648 int sphinx_showfunc_error ( THD * thd, SHOW_VAR * out, char * )
 3649 {
 3650     CSphSEStats * pStats = sphinx_get_stats ( thd, out );
 3651     if ( pStats && pStats->m_bLastError )
 3652     {
 3653         out->type = SHOW_CHAR;
 3654         out->value = pStats->m_sLastMessage;
 3655     }
 3656     return 0;
 3657 }
 3658 
 3659 #if MYSQL_VERSION_ID>50100
 3660 struct st_mysql_storage_engine sphinx_storage_engine =
 3661 {
 3662     MYSQL_HANDLERTON_INTERFACE_VERSION
 3663 };
 3664 
 3665 struct st_mysql_show_var sphinx_status_vars[] =
 3666 {
 3667     {"sphinx_total",        (char *)sphinx_showfunc_total,          SHOW_FUNC},
 3668     {"sphinx_total_found",  (char *)sphinx_showfunc_total_found,    SHOW_FUNC},
 3669     {"sphinx_time",         (char *)sphinx_showfunc_time,           SHOW_FUNC},
 3670     {"sphinx_word_count",   (char *)sphinx_showfunc_word_count,     SHOW_FUNC},
 3671     {"sphinx_words",        (char *)sphinx_showfunc_words,          SHOW_FUNC},
 3672     {"sphinx_error",        (char *)sphinx_showfunc_error,          SHOW_FUNC},
 3673     {0, 0, (enum_mysql_show_type)0}
 3674 };
 3675 
 3676 
 3677 mysql_declare_plugin(sphinx)
 3678 {
 3679     MYSQL_STORAGE_ENGINE_PLUGIN,
 3680     &sphinx_storage_engine,
 3681     sphinx_hton_name,
 3682     "Sphinx developers",
 3683     sphinx_hton_comment,
 3684     PLUGIN_LICENSE_GPL,
 3685     sphinx_init_func, // Plugin Init
 3686     sphinx_done_func, // Plugin Deinit
 3687     0x0001, // 0.1
 3688     sphinx_status_vars,
 3689     NULL,
 3690     NULL
 3691 }
 3692 mysql_declare_plugin_end;
 3693 
 3694 #endif // >50100
 3695 
 3696 //
 3697 // $Id$
 3698 //