868 {
869 if(chunks.size() > 1024) {
871 return;
872 }
873
874 std::vector<XrdCl::ChunkList> hostLists;
875 for(size_t dataHosts = 0; dataHosts < objcfg.plgr.size(); dataHosts++){
877 }
878
880
881
882 char* globalBuffer = (char*)buffer;
883
884
885 std::set<std::tuple<size_t, size_t, size_t>> requestedChunks;
886
887 std::map<size_t, std::shared_ptr<block_t>> blockMap;
888
889
890 for(size_t index = 0; index < chunks.size(); index++){
891 uint32_t remainLength = chunks[index].length;
892 uint64_t currentOffset = chunks[index].offset;
893
894 while(remainLength > 0){
895 size_t blkid = currentOffset / objcfg.datasize;
896 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize;
897 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ;
898
899 uint32_t rdsize = objcfg.chunksize - rdoff;
900 if( rdsize > remainLength ) rdsize = remainLength;
901 if(currentOffset + rdsize >= filesize) {
902 rdsize = filesize - currentOffset;
903 remainLength = rdsize;
904 }
905
906
907 std::string fn = objcfg.GetFileName(blkid, strpid);
908
909 auto itr = urlmap.find( fn );
910 if( itr == urlmap.end() )
911 {
912 log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read: No mapping of file to host found.");
913 break;
914 }
915
916 const std::string &url = itr->second;
917 auto itr2 = archiveIndices.find(url);
918 if(itr2 == archiveIndices.end())
919 {
920 log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read: Couldn't find host for file.");
921 break;
922 }
923 size_t indexOfArchive = archiveIndices[url];
924
925 if (blockMap.find(blkid) == blockMap.end())
926 {
927 blockMap.emplace(blkid,
928 std::make_shared<block_t>(blkid, *this, objcfg));
929 }
930
932 XrdCl::StatInfo* info = nullptr;
933 if(dataarchs[url]->
Stat(objcfg.GetFileName(blkid, strpid), info).IsOK())
934 blockMap[blkid]->stripes[strpid].resize( info ->
GetSize() );
935
936 auto requestChunk = std::make_tuple(indexOfArchive, blkid, strpid);
937 if(requestedChunks.find(requestChunk) == requestedChunks.end())
938 {
939 uint64_t off = 0;
940 dataarchs[url]->GetOffset(objcfg.GetFileName(blkid, strpid), off);
941 hostLists[indexOfArchive].emplace_back(XrdCl::ChunkInfo(
942 off,
944 blockMap[blkid]->stripes[strpid].data()));
945
946
947 requestedChunks.emplace(requestChunk);
948
949 }
950 remainLength -= rdsize;
951 currentOffset += rdsize;
952
953 }
954 }
955
956 std::vector<XrdCl::Pipeline> hostPipes;
957 hostPipes.reserve(hostLists.size());
958 for(size_t i = 0; i < hostLists.size(); i++){
959 while(hostLists[i].size() > 0){
960 uint32_t range = hostLists[i].size() > 1024 ? 1024 : hostLists[i].size();
961 XrdCl::ChunkList partList(hostLists[i].begin(), hostLists[i].begin() + range);
962 hostLists[i].erase(hostLists[i].begin(), hostLists[i].begin() + range);
963 hostPipes.emplace_back(
964 XrdCl::VectorRead(XrdCl::Ctx<XrdCl::File>(dataarchs[objcfg.GetDataUrl(i)]->archive),
965 partList, nullptr, timeout)
966 >> [=](const XrdCl::XRootDStatus &st, XrdCl::VectorReadInfo ch) mutable
967 {
968 auto it = requestedChunks.begin();
969 while(it!=requestedChunks.end())
970 {
971 auto &args = *it;
972 size_t host = std::get<0>(args);
973 size_t blkid = std::get<1>(args);
974 size_t strpid = std::get<2>(args);
975 it++;
976 if(host == i)
977 {
978 std::shared_ptr<block_t> currentBlock = blockMap[blkid];
979
980
981 if(!st.IsOK())
982 {
983 log->Dump(XrdCl::XRootDMsg, "EC Vector Read of host %zu failed entirely.", i);
984 MissingVectorRead(currentBlock, blkid, strpid, timeout);
985 }
986 else{
987 uint32_t orgcksum = 0;
988 auto s = dataarchs[objcfg.GetDataUrl(i)]->GetCRC32( objcfg.GetFileName(blkid, strpid), orgcksum );
989
990
991
992
993 if( !st.IsOK() )
994 {
995 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't read CRC32 from CD.");
996 MissingVectorRead(currentBlock, blkid, strpid, timeout);
997 continue;
998 }
999
1000
1001
1002 uint32_t cksum = objcfg.digest( 0, currentBlock->stripes[strpid].data(), currentBlock->stripes[strpid].size() );
1003 if( orgcksum != cksum )
1004 {
1005 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Wrong checksum for block %zu stripe %zu.", blkid, strpid);
1006 MissingVectorRead(currentBlock, blkid, strpid, timeout);
1007 continue;
1008 }
1009 else{
1010 currentBlock->state[strpid] = block_t::Valid;
1011 bool recoverable = currentBlock->error_correction( currentBlock );
1012 if(!recoverable)
1013 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't recover block %zu.", blkid);
1014 }
1015 }
1016 }
1017 }
1018 }
1019 );
1020 }
1021 }
1022
1023 auto finalPipehndl = [=] (const XrdCl::XRootDStatus &st) mutable {
1024
1025 std::unique_lock<std::mutex> lk(missingChunksMutex);
1026 waitMissing.wait(lk, [=] { return missingChunksVectorRead.size() == 0;});
1027
1028 bool failed = false;
1029 for(size_t index = 0; index < chunks.size(); index++){
1030 uint32_t remainLength = chunks[index].length;
1031 uint64_t currentOffset = chunks[index].offset;
1032
1033 char *localBuffer;
1034 if (globalBuffer)
1035 localBuffer = globalBuffer;
1036 else
1037 localBuffer = (char*)(chunks[index].buffer);
1038
1039 while(remainLength > 0){
1040 size_t blkid = currentOffset / objcfg.datasize;
1041 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize;
1042 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ;
1043 uint32_t rdsize = objcfg.chunksize - rdoff;
1044 if( rdsize > remainLength ) rdsize = remainLength;
1045
1046
1047 if(blockMap.find(blkid) == blockMap.end() || blockMap[blkid] == nullptr){
1049 failed = true;
1050 break;
1051 }
1053 log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read: Invalid stripe in block %zu stripe %zu.", blkid, strpid);
1054 failed = true;
1055 break;
1056 }
1057
1058 memcpy(localBuffer, blockMap[blkid]->stripes[strpid].data() + rdoff, rdsize);
1059
1060 remainLength -= rdsize;
1061 currentOffset += rdsize;
1062 localBuffer += rdsize;
1063 }
1064 if(globalBuffer) globalBuffer = localBuffer;
1065 }
1066 if(handler){
1067 if(failed) log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read failed (at least in part).");
1069 else handler->
HandleResponse(
new XrdCl::XRootDStatus(),
nullptr);
1070 }
1071 };
1072
1075
1077
1078 }
static Log * GetLog()
Get default log.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInvalidArgs
std::vector< ChunkInfo > ChunkList
List of chunks.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.