"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/jrd/recsrc/HashJoin.cpp" between
Firebird-3.0.2.32703-0.tar.bz2 and Firebird-3.0.4.33054-0.tar.bz2

About: Firebird is a relational database offering many ANSI SQL standard features.

HashJoin.cpp  (Firebird-3.0.2.32703-0.tar.bz2):HashJoin.cpp  (Firebird-3.0.4.33054-0.tar.bz2)
skipping to change at line 26 skipping to change at line 26
* Copyright (c) 2009 Dmitry Yemanov <dimitr@firebirdsql.org> * Copyright (c) 2009 Dmitry Yemanov <dimitr@firebirdsql.org>
* and all contributors signed below. * and all contributors signed below.
* *
* All Rights Reserved. * All Rights Reserved.
* Contributor(s): ______________________________________. * Contributor(s): ______________________________________.
*/ */
#include "firebird.h" #include "firebird.h"
#include "../common/classes/Hash.h" #include "../common/classes/Hash.h"
#include "../jrd/jrd.h" #include "../jrd/jrd.h"
#include "../jrd/btr.h"
#include "../jrd/req.h" #include "../jrd/req.h"
#include "../jrd/intl.h" #include "../jrd/intl.h"
#include "../jrd/cmp_proto.h" #include "../jrd/cmp_proto.h"
#include "../jrd/evl_proto.h" #include "../jrd/evl_proto.h"
#include "../jrd/mov_proto.h" #include "../jrd/mov_proto.h"
#include "../jrd/intl_proto.h" #include "../jrd/intl_proto.h"
#include "../jrd/Collation.h"
#include "RecordSource.h" #include "RecordSource.h"
#include <stdlib.h>
using namespace Firebird; using namespace Firebird;
using namespace Jrd; using namespace Jrd;
// ---------------------- // ----------------------
// Data access: hash join // Data access: hash join
// ---------------------- // ----------------------
namespace
{
typedef int (*qsort_compare_callback)(const void* a1, const void* a2, voi
d* arg);
struct qsort_ctx_data
{
void* arg;
qsort_compare_callback compare;
};
#if defined(DARWIN) // || defined(FREEBSD)
#undef HAVE_QSORT_R
#endif
#ifndef HAVE_QSORT_R
#if defined(WIN_NT) || defined(DARWIN) || defined(FREEBSD)
int qsort_ctx_arg_swap(void* arg, const void* a1, const void* a2)
{
struct qsort_ctx_data* ss = (struct qsort_ctx_data*) arg;
return (ss->compare)(a1, a2, ss->arg);
}
#endif
#endif
#define USE_QSORT_CTX
void qsort_ctx(void* base, size_t count, size_t width, qsort_compare_call
back compare, void* arg)
{
#ifdef HAVE_QSORT_R
qsort_r(base, count, width, compare, arg);
#else
#if defined(WIN_NT)
struct qsort_ctx_data tmp = {arg, compare};
qsort_s(base, count, width, &qsort_ctx_arg_swap, &tmp);
#elif defined(DARWIN) || defined(FREEBSD)
struct qsort_ctx_data tmp = {arg, compare};
qsort_r(base, count, width, &tmp, &qsort_ctx_arg_swap);
#else
#undef USE_QSORT_CTX
#endif
#endif
}
} // namespace
// NS: FIXME - Why use static hash table here??? Hash table shall support dynami c resizing // NS: FIXME - Why use static hash table here??? Hash table shall support dynami c resizing
static const size_t HASH_SIZE = 1009; static const ULONG HASH_SIZE = 1009;
static const size_t COLLISION_PREALLOCATE_SIZE = 32; // 256 KB static const ULONG BUCKET_PREALLOCATE_SIZE = 32; // 256 bytes per slot
static const size_t KEYBUF_PREALLOCATE_SIZE = 64 * 1024; // 64 KB
static const size_t KEYBUF_SIZE_LIMIT = 1024 * 1024 * 1024; // 1 GB
class HashJoin::HashTable : public PermanentStorage class HashJoin::HashTable : public PermanentStorage
{ {
struct Collision
{
#ifdef USE_QSORT_CTX
Collision()
: offset(0), position(0)
{}
Collision(void* /*ctx*/, ULONG off, ULONG pos)
: offset(off), position(pos)
{}
#else
Collision()
: context(NULL), offset(0), position(0)
{}
Collision(void* ctx, ULONG off, ULONG pos)
: context(ctx), offset(off), position(pos)
{}
void* context;
#endif
ULONG offset;
ULONG position;
};
class CollisionList class CollisionList
{ {
static const FB_SIZE_T INVALID_ITERATOR = FB_SIZE_T(~0); static const FB_SIZE_T INVALID_ITERATOR = FB_SIZE_T(~0);
public: struct Entry
CollisionList(MemoryPool& pool, const KeyBuffer* keyBuffer, ULONG {
itemLength) Entry()
: m_collisions(pool, COLLISION_PREALLOCATE_SIZE), : hash(0), position(0)
m_keyBuffer(keyBuffer), m_itemLength(itemLength), m_ite {}
rator(INVALID_ITERATOR)
{}
#ifdef USE_QSORT_CTX
static int compare(const void* p1, const void* p2, void* arg)
#else
static int compare(const void* p1, const void* p2)
#endif
{
const Collision* const c1 = static_cast<const Collision*>
(p1);
const Collision* const c2 = static_cast<const Collision*>
(p2);
#ifndef USE_QSORT_CTX
fb_assert(c1->context == c2->context);
#endif
const CollisionList* const collisions =
#ifdef USE_QSORT_CTX
static_cast<const CollisionList*>(arg);
#else
static_cast<const CollisionList*>(c1->context);
#endif
const UCHAR* const baseAddress = collisions->m_keyBuffer-
>begin();
const UCHAR* const ptr1 = baseAddress + c1->offset; Entry(ULONG h, ULONG pos)
const UCHAR* const ptr2 = baseAddress + c2->offset; : hash(h), position(pos)
{}
return memcmp(ptr1, ptr2, collisions->m_itemLength); static const ULONG generate(const Entry& item)
{
return item.hash;
}
ULONG hash;
ULONG position;
};
public:
CollisionList(MemoryPool& pool)
: m_collisions(pool, BUCKET_PREALLOCATE_SIZE),
m_iterator(INVALID_ITERATOR)
{
m_collisions.setSortMode(FB_ARRAY_SORT_MANUAL);
} }
void sort() void sort()
{ {
Collision* const base = m_collisions.begin(); m_collisions.sort();
const size_t count = m_collisions.getCount();
const size_t width = sizeof(Collision);
#ifdef USE_QSORT_CTX
qsort_ctx(base, count, width, compare, this);
#else
qsort(base, count, width, compare);
#endif
} }
void add(const Collision& collision) void add(ULONG hash, ULONG position)
{ {
m_collisions.add(collision); m_collisions.add(Entry(hash, position));
} }
bool locate(ULONG length, const UCHAR* data) bool locate(ULONG hash)
{ {
const UCHAR* const ptr1 = data; if (m_collisions.find(hash, m_iterator))
const ULONG len1 = length; return true;
const ULONG len2 = m_itemLength;
const ULONG minLen = MIN(len1, len2);
FB_SIZE_T highBound = m_collisions.getCount(), lowBound =
0;
const UCHAR* const baseAddress = m_keyBuffer->begin();
while (highBound > lowBound)
{
const FB_SIZE_T temp = (highBound + lowBound) >>
1;
const UCHAR* const ptr2 = baseAddress + m_collisi
ons[temp].offset;
const int result = memcmp(ptr1, ptr2, minLen);
if (result > 0 || (!result && len1 > len2))
lowBound = temp + 1;
else
highBound = temp;
}
if (highBound >= m_collisions.getCount() ||
lowBound >= m_collisions.getCount())
{
m_iterator = INVALID_ITERATOR;
return false;
}
const UCHAR* const ptr2 = baseAddress + m_collisions[lowB
ound].offset;
if (memcmp(ptr1, ptr2, minLen))
{
m_iterator = INVALID_ITERATOR;
return false;
}
m_iterator = lowBound; m_iterator = INVALID_ITERATOR;
return true; return false;
} }
bool iterate(ULONG length, const UCHAR* data, ULONG& position) bool iterate(ULONG hash, ULONG& position)
{ {
if (m_iterator >= m_collisions.getCount()) if (m_iterator >= m_collisions.getCount())
return false; return false;
const Collision& collision = m_collisions[m_iterator++]; const Entry& collision = m_collisions[m_iterator++];
const UCHAR* const baseAddress = m_keyBuffer->begin();
const UCHAR* const ptr1 = data; if (hash != collision.hash)
const ULONG len1 = length;
const UCHAR* const ptr2 = baseAddress + collision.offset;
const ULONG len2 = m_itemLength;
const ULONG minLen = MIN(len1, len2);
if (memcmp(ptr1, ptr2, minLen))
{ {
m_iterator = INVALID_ITERATOR; m_iterator = INVALID_ITERATOR;
return false; return false;
} }
position = collision.position; position = collision.position;
return true; return true;
} }
private: private:
Array<Collision> m_collisions; SortedArray<Entry, EmptyStorage<Entry>, ULONG, Entry> m_collision
const KeyBuffer* const m_keyBuffer; s;
const ULONG m_itemLength;
FB_SIZE_T m_iterator; FB_SIZE_T m_iterator;
}; };
public: public:
HashTable(MemoryPool& pool, size_t streamCount, unsigned int tableSize = HASH_SIZE) HashTable(MemoryPool& pool, ULONG streamCount, ULONG tableSize = HASH_SIZ E)
: PermanentStorage(pool), m_streamCount(streamCount), : PermanentStorage(pool), m_streamCount(streamCount),
m_tableSize(tableSize), m_slot(0) m_tableSize(tableSize), m_slot(0)
{ {
m_collisions = FB_NEW_POOL(pool) CollisionList*[streamCount * tab leSize]; m_collisions = FB_NEW_POOL(pool) CollisionList*[streamCount * tab leSize];
memset(m_collisions, 0, streamCount * tableSize * sizeof(Collisio nList*)); memset(m_collisions, 0, streamCount * tableSize * sizeof(Collisio nList*));
} }
~HashTable() ~HashTable()
{ {
for (size_t i = 0; i < m_streamCount * m_tableSize; i++) for (ULONG i = 0; i < m_streamCount * m_tableSize; i++)
delete m_collisions[i]; delete m_collisions[i];
delete[] m_collisions; delete[] m_collisions;
} }
void put(size_t stream, void put(ULONG stream, ULONG hash, ULONG position)
ULONG keyLength, const KeyBuffer* keyBuffer,
ULONG offset, ULONG position)
{ {
const unsigned int slot = const ULONG slot = hash % m_tableSize;
InternalHash::hash(keyLength, keyBuffer->begin() + offset
, m_tableSize);
fb_assert(stream < m_streamCount); fb_assert(stream < m_streamCount);
fb_assert(slot < m_tableSize); fb_assert(slot < m_tableSize);
CollisionList* collisions = m_collisions[stream * m_tableSize + s lot]; CollisionList* collisions = m_collisions[stream * m_tableSize + s lot];
if (!collisions) if (!collisions)
{ {
collisions = FB_NEW_POOL(getPool()) CollisionList(getPool (), keyBuffer, keyLength); collisions = FB_NEW_POOL(getPool()) CollisionList(getPool ());
m_collisions[stream * m_tableSize + slot] = collisions; m_collisions[stream * m_tableSize + slot] = collisions;
} }
collisions->add(Collision(collisions, offset, position)); collisions->add(hash, position);
} }
bool setup(ULONG length, const UCHAR* data) bool setup(ULONG hash)
{ {
const unsigned int slot = InternalHash::hash(length, data, m_tabl eSize); const ULONG slot = hash % m_tableSize;
for (size_t i = 0; i < m_streamCount; i++) for (ULONG i = 0; i < m_streamCount; i++)
{ {
CollisionList* const collisions = m_collisions[i * m_tabl eSize + slot]; CollisionList* const collisions = m_collisions[i * m_tabl eSize + slot];
if (!collisions) if (!collisions)
return false; return false;
if (!collisions->locate(length, data)) if (!collisions->locate(hash))
return false; return false;
} }
m_slot = slot; m_slot = slot;
return true; return true;
} }
void reset(size_t stream, ULONG length, const UCHAR* data) void reset(ULONG stream, ULONG hash)
{ {
fb_assert(stream < m_streamCount); fb_assert(stream < m_streamCount);
CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot]; CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot];
collisions->locate(length, data); collisions->locate(hash);
} }
bool iterate(size_t stream, ULONG length, const UCHAR* data, ULONG& posit ion) bool iterate(ULONG stream, ULONG hash, ULONG& position)
{ {
fb_assert(stream < m_streamCount); fb_assert(stream < m_streamCount);
CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot]; CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot];
return collisions->iterate(length, data, position); return collisions->iterate(hash, position);
} }
void sort() void sort()
{ {
for (size_t i = 0; i < m_streamCount * m_tableSize; i++) for (ULONG i = 0; i < m_streamCount * m_tableSize; i++)
{ {
CollisionList* const collisions = m_collisions[i]; CollisionList* const collisions = m_collisions[i];
if (collisions) if (collisions)
collisions->sort(); collisions->sort();
} }
} }
private: private:
const size_t m_streamCount; const ULONG m_streamCount;
const unsigned int m_tableSize; const ULONG m_tableSize;
CollisionList** m_collisions; CollisionList** m_collisions;
size_t m_slot; ULONG m_slot;
}; };
HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count, HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
RecordSource* const* args, NestValueArray* con st* keys) RecordSource* const* args, NestValueArray* con st* keys)
: m_args(csb->csb_pool, count - 1) : m_args(csb->csb_pool, count - 1)
{ {
fb_assert(count >= 2); fb_assert(count >= 2);
m_impure = CMP_impure(csb, sizeof(Impure)); m_impure = CMP_impure(csb, sizeof(Impure));
m_leader.source = args[0]; m_leader.source = args[0];
m_leader.keys = keys[0]; m_leader.keys = keys[0];
m_leader.keyLengths = FB_NEW_POOL(csb->csb_pool) const FB_SIZE_T leaderKeyCount = m_leader.keys->getCount();
KeyLengthArray(csb->csb_pool, m_leader.keys->getCount()); m_leader.keyLengths = FB_NEW_POOL(csb->csb_pool) ULONG[leaderKeyCount];
m_leader.totalKeyLength = 0; m_leader.totalKeyLength = 0;
for (FB_SIZE_T j = 0; j < m_leader.keys->getCount(); j++) for (FB_SIZE_T j = 0; j < leaderKeyCount; j++)
{ {
dsc desc; dsc desc;
(*m_leader.keys)[j]->getDesc(tdbb, csb, &desc); (*m_leader.keys)[j]->getDesc(tdbb, csb, &desc);
USHORT keyLength = desc.isText() ? desc.getStringLength() : desc. dsc_length; USHORT keyLength = desc.isText() ? desc.getStringLength() : desc. dsc_length;
if (IS_INTL_DATA(&desc)) if (IS_INTL_DATA(&desc))
keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE(&desc), keyLength); keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE(&desc), keyLength);
m_leader.keyLengths->add(keyLength); m_leader.keyLengths[j] = keyLength;
m_leader.totalKeyLength += keyLength; m_leader.totalKeyLength += keyLength;
} }
for (size_t i = 1; i < count; i++) for (FB_SIZE_T i = 1; i < count; i++)
{ {
RecordSource* const sub_rsb = args[i]; RecordSource* const sub_rsb = args[i];
fb_assert(sub_rsb); fb_assert(sub_rsb);
SubStream sub; SubStream sub;
sub.buffer = FB_NEW_POOL(csb->csb_pool) BufferedStream(csb, sub_r sb); sub.buffer = FB_NEW_POOL(csb->csb_pool) BufferedStream(csb, sub_r sb);
sub.keys = keys[i]; sub.keys = keys[i];
sub.keyLengths = FB_NEW_POOL(csb->csb_pool) const FB_SIZE_T subKeyCount = sub.keys->getCount();
KeyLengthArray(csb->csb_pool, sub.keys->getCount()); sub.keyLengths = FB_NEW_POOL(csb->csb_pool) ULONG[subKeyCount];
sub.totalKeyLength = 0; sub.totalKeyLength = 0;
for (FB_SIZE_T j = 0; j < sub.keys->getCount(); j++) for (FB_SIZE_T j = 0; j < subKeyCount; j++)
{ {
dsc desc; dsc desc;
(*sub.keys)[j]->getDesc(tdbb, csb, &desc); (*sub.keys)[j]->getDesc(tdbb, csb, &desc);
USHORT keyLength = desc.isText() ? desc.getStringLength() : desc.dsc_length; USHORT keyLength = desc.isText() ? desc.getStringLength() : desc.dsc_length;
if (IS_INTL_DATA(&desc)) if (IS_INTL_DATA(&desc))
keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE (&desc), keyLength); keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE (&desc), keyLength);
sub.keyLengths->add(keyLength); sub.keyLengths[j] = keyLength;
sub.totalKeyLength += keyLength; sub.totalKeyLength += keyLength;
} }
m_args.add(sub); m_args.add(sub);
} }
} }
void HashJoin::open(thread_db* tdbb) const void HashJoin::open(thread_db* tdbb) const
{ {
jrd_req* const request = tdbb->getRequest(); jrd_req* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure); Impure* const impure = request->getImpure<Impure>(m_impure);
impure->irsb_flags = irsb_open | irsb_mustread; impure->irsb_flags = irsb_open | irsb_mustread;
delete impure->irsb_arg_buffer;
delete impure->irsb_hash_table; delete impure->irsb_hash_table;
delete[] impure->irsb_leader_buffer; delete[] impure->irsb_leader_buffer;
delete[] impure->irsb_record_counts;
MemoryPool& pool = *tdbb->getDefaultPool(); MemoryPool& pool = *tdbb->getDefaultPool();
const size_t argCount = m_args.getCount(); const FB_SIZE_T argCount = m_args.getCount();
impure->irsb_arg_buffer = FB_NEW_POOL(pool) KeyBuffer(pool, KEYBUF_PREALL OCATE_SIZE);
impure->irsb_hash_table = FB_NEW_POOL(pool) HashTable(pool, argCount); impure->irsb_hash_table = FB_NEW_POOL(pool) HashTable(pool, argCount);
impure->irsb_leader_buffer = FB_NEW_POOL(pool) UCHAR[m_leader.totalKeyLen gth]; impure->irsb_leader_buffer = FB_NEW_POOL(pool) UCHAR[m_leader.totalKeyLen gth];
impure->irsb_record_counts = FB_NEW_POOL(pool) ULONG[argCount];
UCharBuffer buffer(pool);
for (FB_SIZE_T i = 0; i < argCount; i++) for (FB_SIZE_T i = 0; i < argCount; i++)
{ {
// Read and cache the inner streams. While doing that, // Read and cache the inner streams. While doing that,
// hash the join condition values and populate hash tables. // hash the join condition values and populate hash tables.
m_args[i].buffer->open(tdbb); m_args[i].buffer->open(tdbb);
ULONG& counter = impure->irsb_record_counts[i]; ULONG counter = 0;
counter = 0; UCHAR* const keyBuffer = buffer.getBuffer(m_args[i].totalKeyLengt
h, false);
while (m_args[i].buffer->getRecord(tdbb)) while (m_args[i].buffer->getRecord(tdbb))
{ {
const ULONG offset = (ULONG) impure->irsb_arg_buffer->get const ULONG hash = computeHash(tdbb, request, m_args[i],
Count(); keyBuffer);
if (offset > KEYBUF_SIZE_LIMIT) impure->irsb_hash_table->put(i, hash, counter++);
status_exception::raise(Arg::Gds(isc_imp_exc) <<
Arg::Gds(isc_blktoobig));
impure->irsb_arg_buffer->resize(offset + m_args[i].totalK
eyLength);
UCHAR* const keys = impure->irsb_arg_buffer->begin() + of
fset;
computeKeys(tdbb, request, m_args[i], keys);
impure->irsb_hash_table->put(i, m_args[i].totalKeyLength,
impure->irsb_arg_buffer,
offset, counter++);
} }
} }
impure->irsb_hash_table->sort(); impure->irsb_hash_table->sort();
m_leader.source->open(tdbb); m_leader.source->open(tdbb);
} }
void HashJoin::close(thread_db* tdbb) const void HashJoin::close(thread_db* tdbb) const
{ {
jrd_req* const request = tdbb->getRequest(); jrd_req* const request = tdbb->getRequest();
skipping to change at line 468 skipping to change at line 321
invalidateRecords(request); invalidateRecords(request);
if (impure->irsb_flags & irsb_open) if (impure->irsb_flags & irsb_open)
{ {
impure->irsb_flags &= ~irsb_open; impure->irsb_flags &= ~irsb_open;
delete impure->irsb_hash_table; delete impure->irsb_hash_table;
impure->irsb_hash_table = NULL; impure->irsb_hash_table = NULL;
delete impure->irsb_arg_buffer;
impure->irsb_arg_buffer = NULL;
delete[] impure->irsb_leader_buffer; delete[] impure->irsb_leader_buffer;
impure->irsb_leader_buffer = NULL; impure->irsb_leader_buffer = NULL;
delete[] impure->irsb_record_counts;
impure->irsb_record_counts = NULL;
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i].buffer->close(tdbb); m_args[i].buffer->close(tdbb);
m_leader.source->close(tdbb); m_leader.source->close(tdbb);
} }
} }
bool HashJoin::getRecord(thread_db* tdbb) const bool HashJoin::getRecord(thread_db* tdbb) const
{ {
if (--tdbb->tdbb_quantum < 0) if (--tdbb->tdbb_quantum < 0)
JRD_reschedule(tdbb, 0, true); JRD_reschedule(tdbb, 0, true);
jrd_req* const request = tdbb->getRequest(); jrd_req* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure); Impure* const impure = request->getImpure<Impure>(m_impure);
if (!(impure->irsb_flags & irsb_open)) if (!(impure->irsb_flags & irsb_open))
return false; return false;
const ULONG leaderKeyLength = m_leader.totalKeyLength;
UCHAR* leaderKeyBuffer = impure->irsb_leader_buffer;
while (true) while (true)
{ {
if (impure->irsb_flags & irsb_mustread) if (impure->irsb_flags & irsb_mustread)
{ {
// Fetch the record from the leading stream // Fetch the record from the leading stream
if (!m_leader.source->getRecord(tdbb)) if (!m_leader.source->getRecord(tdbb))
return false; return false;
// Compute and hash the comparison keys // Compute and hash the comparison keys
memset(leaderKeyBuffer, 0, leaderKeyLength); impure->irsb_leader_hash =
computeKeys(tdbb, request, m_leader, leaderKeyBuffer); computeHash(tdbb, request, m_leader, impure->irsb
_leader_buffer);
// Ensure the every inner stream having matches for this hash slot. // Ensure the every inner stream having matches for this hash slot.
// Setup the hash table for the iteration through collisi ons. // Setup the hash table for the iteration through collisi ons.
if (!impure->irsb_hash_table->setup(leaderKeyLength, lead erKeyBuffer)) if (!impure->irsb_hash_table->setup(impure->irsb_leader_h ash))
continue; continue;
impure->irsb_flags &= ~irsb_mustread; impure->irsb_flags &= ~irsb_mustread;
impure->irsb_flags |= irsb_first; impure->irsb_flags |= irsb_first;
} }
// Fetch collisions from the inner streams // Fetch collisions from the inner streams
if (impure->irsb_flags & irsb_first) if (impure->irsb_flags & irsb_first)
{ {
skipping to change at line 628 skipping to change at line 472
} }
void HashJoin::nullRecords(thread_db* tdbb) const void HashJoin::nullRecords(thread_db* tdbb) const
{ {
m_leader.source->nullRecords(tdbb); m_leader.source->nullRecords(tdbb);
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i].source->nullRecords(tdbb); m_args[i].source->nullRecords(tdbb);
} }
void HashJoin::computeKeys(thread_db* tdbb, jrd_req* request, ULONG HashJoin::computeHash(thread_db* tdbb,
const SubStream& sub, UCHAR* k jrd_req* request,
eyBuffer) const const SubStream& sub,
UCHAR* keyBuffer) const
{ {
memset(keyBuffer, 0, sub.totalKeyLength);
UCHAR* keyPtr = keyBuffer;
for (FB_SIZE_T i = 0; i < sub.keys->getCount(); i++) for (FB_SIZE_T i = 0; i < sub.keys->getCount(); i++)
{ {
dsc* const desc = EVL_expr(tdbb, request, (*sub.keys)[i]); dsc* const desc = EVL_expr(tdbb, request, (*sub.keys)[i]);
const USHORT keyLength = (*sub.keyLengths)[i]; const USHORT keyLength = sub.keyLengths[i];
if (desc && !(request->req_flags & req_null)) if (desc && !(request->req_flags & req_null))
{ {
if (desc->isText()) if (desc->isText())
{ {
dsc to; dsc to;
to.makeText(keyLength, desc->getTextType(), keyBu ffer); to.makeText(keyLength, desc->getTextType(), keyPt r);
if (IS_INTL_DATA(desc)) if (IS_INTL_DATA(desc))
{ {
// Convert the INTL string into the binar y comparable form // Convert the INTL string into the binar y comparable form
INTL_string_to_key(tdbb, INTL_INDEX_TYPE( desc), INTL_string_to_key(tdbb, INTL_INDEX_TYPE( desc),
desc, &to, INTL_KEY_UNIQUE); desc, &to, INTL_KEY_UNIQUE);
} }
else else
{ {
// This call ensures that the padding byt es are appended // This call ensures that the padding byt es are appended
MOV_move(tdbb, desc, &to); MOV_move(tdbb, desc, &to);
} }
} }
else else
{ {
// We don't enforce proper alignments inside the key buffer, // We don't enforce proper alignments inside the key buffer,
// so use plain byte copying instead of MOV_move( ) to avoid bus errors // so use plain byte copying instead of MOV_move( ) to avoid bus errors
fb_assert(keyLength == desc->dsc_length); fb_assert(keyLength == desc->dsc_length);
memcpy(keyBuffer, desc->dsc_address, keyLength); memcpy(keyPtr, desc->dsc_address, keyLength);
} }
} }
keyBuffer += keyLength; keyPtr += keyLength;
} }
fb_assert(keyPtr - keyBuffer == sub.totalKeyLength);
return InternalHash::hash(sub.totalKeyLength, keyBuffer);
} }
bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) co nst bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) co nst
{ {
HashTable* const hashTable = impure->irsb_hash_table; HashTable* const hashTable = impure->irsb_hash_table;
const BufferedStream* const arg = m_args[stream].buffer; const BufferedStream* const arg = m_args[stream].buffer;
const ULONG leaderKeyLength = m_leader.totalKeyLength;
const UCHAR* leaderKeyBuffer = impure->irsb_leader_buffer;
ULONG position; ULONG position;
if (hashTable->iterate(stream, leaderKeyLength, leaderKeyBuffer, position )) if (hashTable->iterate(stream, impure->irsb_leader_hash, position))
{ {
arg->locate(tdbb, position); arg->locate(tdbb, position);
if (arg->getRecord(tdbb)) if (arg->getRecord(tdbb))
return true; return true;
} }
while (true) while (true)
{ {
if (stream == 0 || !fetchRecord(tdbb, impure, stream - 1)) if (stream == 0 || !fetchRecord(tdbb, impure, stream - 1))
return false; return false;
hashTable->reset(stream, leaderKeyLength, leaderKeyBuffer); hashTable->reset(stream, impure->irsb_leader_hash);
if (hashTable->iterate(stream, leaderKeyLength, leaderKeyBuffer, position)) if (hashTable->iterate(stream, impure->irsb_leader_hash, position ))
{ {
arg->locate(tdbb, position); arg->locate(tdbb, position);
if (arg->getRecord(tdbb)) if (arg->getRecord(tdbb))
return true; return true;
} }
} }
} }
 End of changes. 67 change blocks. 
261 lines changed or deleted 97 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)