MapleStory

Statsd In android 9 (2)

前面一部分已经介绍了statsd上报使用的两个接口
Java侧使用StatsLog
Native侧使用android::util::stats_write
最终都通过sock写入statsd中,下面将先介绍statsd的daemon部分的结构,随后介绍事件的管理。

statsd位于frameworks/base/cmds/statsd路径下
其目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
statsd
----benchmark
----src
--------anomaly \\异常事件触法跟踪管理
--------condition \\根据当前条件触法新的事件
--------config \\配置管理
--------external \\事件拉取管理
--------guardrail \\事件数量管理
--------logd \\循环读取日志事件并分发
--------matchers \\事件匹配,一类Matcher跟踪一类事件
--------metrics \\事件匹配计算,将计算结果保存或者上报
--------packages \\缓存应用信息,包含包名以及版本号
--------perfetto \\数据源配置描述
--------socket \\日志sock监听
--------storage \\日志文件写入,配置读取
--------subscriber \\事件订阅通知
----tests \\单元测试
----tools \\功能测试

那么还是先从main函数看该服务的初始化
有一下几个步骤:
1.启动Looper
2.配置Binder
3.与Java服务通信,确认启动
4.根据配置看是从logd读取事件还是从socket中读取事件,前面一部分也看到了
在日志上报时,具体写入到logd或者statsd也是由配置控制的
5.循环从looper中读取事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int main(int /*argc*/, char** /*argv*/) {
// Set up the looper
sp<Looper> looper(Looper::prepare(0 /* opts */));
// Set up the binder
sp<ProcessState> ps(ProcessState::self());
ps->setThreadPoolMaxThreadCount(9);
ps->startThreadPool();
ps->giveThreadPoolName();
IPCThreadState::self()->disableBackgroundScheduling(true);
// Create the service
sp<StatsService> service = new StatsService(looper);
if (defaultServiceManager()->addService(String16("stats"), service) != 0) {
ALOGE("Failed to add service");
return -1;
}
service->sayHiToStatsCompanion();
service->Startup();
sp<StatsSocketListener> socketListener = new StatsSocketListener(service);
if (kUseLogd) {
ALOGI("using logd");
// Start the log reader thread
status_t err = start_log_reader_thread(service);
if (err != NO_ERROR) {
return 1;
}
}
if (kUseStatsdSocket) {
ALOGI("using statsd socket");
// Backlog and /proc/sys/net/unix/max_dgram_qlen set to large value
if (socketListener->startListener(600)) {
exit(1);
}
}
// Loop forever -- the reports run on this thread in a handler, and the
// binder calls remain responsive in their pool of one thread.
while (true) {
looper->pollAll(-1 /* timeoutMillis */);
}
ALOGW("statsd escaped from its loop.");
return 1;
}

下面以socket日志读取为例子来观察日志的流向
主线程启动后开始不断从socket中读取事件,并把日志事件发送给服务
具体来说就是
1.创建一块足够大小的buffer
2.从socket中读取事件到构造好的内存结构中(这块暂时还没看明白- -
3.校验头部
4.包装成Event事件
5.调用服务的OnLogEvent处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
bool StatsSocketListener::onDataAvailable(SocketClient* cli) {
static bool name_set;
if (!name_set) {
prctl(PR_SET_NAME, "statsd.writer");
name_set = true;
}
// + 1 to ensure null terminator if MAX_PAYLOAD buffer is received
char buffer[sizeof_log_id_t + sizeof(uint16_t) + sizeof(log_time) + LOGGER_ENTRY_MAX_PAYLOAD +
1];
struct iovec iov = {buffer, sizeof(buffer) - 1};
alignas(4) char control[CMSG_SPACE(sizeof(struct ucred))];
struct msghdr hdr = {
NULL, 0, &iov, 1, control, sizeof(control), 0,
};
int socket = cli->getSocket();
// To clear the entire buffer is secure/safe, but this contributes to 1.68%
// overhead under logging load. We are safe because we check counts, but
// still need to clear null terminator
// memset(buffer, 0, sizeof(buffer));
ssize_t n = recvmsg(socket, &hdr, 0);
if (n <= (ssize_t)(sizeof(android_log_header_t))) {
return false;
}
buffer[n] = 0;
struct ucred* cred = NULL;
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&hdr);
while (cmsg != NULL) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) {
cred = (struct ucred*)CMSG_DATA(cmsg);
break;
}
cmsg = CMSG_NXTHDR(&hdr, cmsg);
}
struct ucred fake_cred;
if (cred == NULL) {
cred = &fake_cred;
cred->pid = 0;
cred->uid = DEFAULT_OVERFLOWUID;
}
char* ptr = ((char*)buffer) + sizeof(android_log_header_t);
n -= sizeof(android_log_header_t);
log_msg msg;
msg.entry.len = n;
msg.entry.hdr_size = kLogMsgHeaderSize;
msg.entry.sec = time(nullptr);
msg.entry.pid = cred->pid;
msg.entry.uid = cred->uid;
memcpy(msg.buf + kLogMsgHeaderSize, ptr, n + 1);
LogEvent event(msg);
// Call the listener
mListener->OnLogEvent(&event, false /*reconnected, N/A in statsd socket*/);
return true;
}

服务的定义位于头文件中,他实现了LogListener的接口
而在OnLogEvent的实现中实际调用的是mProcessor的OnLogEvent方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class StatsService : public BnStatsManager, public LogListener, public IBinder::DeathRecipient
--------
void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) {
mProcessor->OnLogEvent(event, reconnectionStarts);
}
--------
mProcessor = new StatsLogProcessor(mUidMap, mAnomalyAlarmMonitor, mPeriodicAlarmMonitor,
getElapsedRealtimeNs(), [this](const ConfigKey& key) {
sp<IStatsCompanionService> sc = getStatsCompanionService();
auto receiver = mConfigManager->GetConfigReceiver(key);
if (sc == nullptr) {
VLOG("Could not find StatsCompanionService");
return false;
} else if (receiver == nullptr) {
VLOG("Statscompanion could not find a broadcast receiver for %s",
key.ToString().c_str());
return false;
} else {
sc->sendDataBroadcast(receiver, mProcessor->getLastReportTimeNs(key));
return true;
}
}

mProcessor的对象实际是一个StatsLogProcessor,他是整个日志处理的核心
其实现了ConfigListener,可以动态更新配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
StatsLogProcessor(const sp<UidMap>& uidMap, const sp<AlarmMonitor>& anomalyAlarmMonitor,
const sp<AlarmMonitor>& subscriberTriggerAlarmMonitor,
const int64_t timeBaseNs,
const std::function<bool(const ConfigKey&)>& sendBroadcast);
virtual ~StatsLogProcessor();
void OnLogEvent(LogEvent* event, bool reconnectionStarts);
// for testing only.
void OnLogEvent(LogEvent* event);
void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
const StatsdConfig& config);
void OnConfigRemoved(const ConfigKey& key);
size_t GetMetricsSize(const ConfigKey& key) const;
void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs,
const bool include_current_partial_bucket,
const DumpReportReason dumpReportReason, vector<uint8_t>* outData);
/* Tells MetricsManager that the alarms in alarmSet have fired. Modifies anomaly alarmSet. */
void onAnomalyAlarmFired(
const int64_t& timestampNs,
unordered_set<sp<const InternalAlarm>, SpHash<InternalAlarm>> alarmSet);
/* Tells MetricsManager that the alarms in alarmSet have fired. Modifies periodic alarmSet. */
void onPeriodicAlarmFired(
const int64_t& timestampNs,
unordered_set<sp<const InternalAlarm>, SpHash<InternalAlarm>> alarmSet);
/* Flushes data to disk. Data on memory will be gone after written to disk. */
void WriteDataToDisk(const DumpReportReason dumpReportReason);
// Reset all configs.
void resetConfigs();
}

OnLogEvent里具体的流程如下:
1.检查是否是重连,缓存中会保存最后一个事件,避免丢失
2.如果是重连检查事件是否已经处理过了,如果处理过了则返回,若没处理过记录重连,继续处理
3.处理特殊的事件
4.清理Pull事件缓存
5.将事件交由mMetricsManagers处理
6.刷新事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
#ifdef VERY_VERBOSE_PRINTING
if (mPrintAllLogs) {
ALOGI("%s", event->ToString().c_str());
}
#endif
const int64_t currentTimestampNs = event->GetElapsedTimestampNs();
if (reconnected && mLastTimestampSeen != 0) {
// LogReader tells us the connection has just been reset. Now we need
// to enter reconnection state to find the last CP.
mInReconnection = true;
}
if (mInReconnection) {
// We see the checkpoint
if (currentTimestampNs == mLastTimestampSeen) {
mInReconnection = false;
// Found the CP. ignore this event, and we will start to read from next event.
return;
}
if (currentTimestampNs > mLargestTimestampSeen) {
// We see a new log but CP has not been found yet. Give up now.
mLogLossCount++;
mInReconnection = false;
StatsdStats::getInstance().noteLogLost(currentTimestampNs);
// Persist the data before we reset. Do we want this?
WriteDataToDiskLocked(CONFIG_RESET);
// We see fresher event before we see the checkpoint. We might have lost data.
// The best we can do is to reset.
resetConfigsLocked(currentTimestampNs);
} else {
// Still in search of the CP. Keep going.
return;
}
}
mLogCount++;
mLastTimestampSeen = currentTimestampNs;
if (mLargestTimestampSeen < currentTimestampNs) {
mLargestTimestampSeen = currentTimestampNs;
}
resetIfConfigTtlExpiredLocked(currentTimestampNs);
StatsdStats::getInstance().noteAtomLogged(
event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);
// Hard-coded logic to update the isolated uid's in the uid-map.
// The field numbers need to be currently updated by hand with atoms.proto
if (event->GetTagId() == android::util::ISOLATED_UID_CHANGED) {
onIsolatedUidChangedEventLocked(*event);
}
if (mMetricsManagers.empty()) {
return;
}
int64_t curTimeSec = getElapsedRealtimeSec();
if (curTimeSec - mLastPullerCacheClearTimeSec > StatsdStats::kPullerCacheClearIntervalSec) {
mStatsPullerManager.ClearPullerCacheIfNecessary(curTimeSec * NS_PER_SEC);
mLastPullerCacheClearTimeSec = curTimeSec;
}
if (event->GetTagId() != android::util::ISOLATED_UID_CHANGED) {
// Map the isolated uid to host uid if necessary.
mapIsolatedUidToHostUidIfNecessaryLocked(event);
}
// pass the event to metrics managers.
for (auto& pair : mMetricsManagers) {
pair.second->onLogEvent(*event);
flushIfNecessaryLocked(event->GetElapsedTimestampNs(), pair.first, *(pair.second));
}
}

mMetricsManagers的创建是在读取配置的时候
每个MetricsManager可以监听一组特定的进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void StatsLogProcessor::OnConfigUpdatedLocked(
const int64_t timestampNs, const ConfigKey& key, const StatsdConfig& config) {
VLOG("Updated configuration for key %s", key.ToString().c_str());
sp<MetricsManager> newMetricsManager =
new MetricsManager(key, config, mTimeBaseNs, timestampNs, mUidMap,
mAnomalyAlarmMonitor, mPeriodicAlarmMonitor);
if (newMetricsManager->isConfigValid()) {
mUidMap->OnConfigUpdated(key);
if (newMetricsManager->shouldAddUidMapListener()) {
// We have to add listener after the MetricsManager is constructed because it's
// not safe to create wp or sp from this pointer inside its constructor.
mUidMap->addListener(newMetricsManager.get());
}
newMetricsManager->refreshTtl(timestampNs);
mMetricsManagers[key] = newMetricsManager;
VLOG("StatsdConfig valid");
} else {
// If there is any error in the config, don't use it.
ALOGE("StatsdConfig NOT valid");
}
}

MetricsManager的结构:
里面持有了
std::vector>
std::vector>
std::vector>
std::vector>
std::vector>
具体逻辑如注释
这些结构都是在initStatsdConfig中初始化的,该方法在metrics_manager_util中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
// We only store the sp of LogMatchingTracker, MetricProducer, and ConditionTracker in
// MetricsManager. There are relationships between them, and the relationships are denoted by
// index instead of pointers. The reasons for this are: (1) the relationship between them are
// complicated, so storing index instead of pointers reduces the risk that A holds B's sp, and B
// holds A's sp. (2) When we evaluate matcher results, or condition results, we can quickly get
// the related results from a cache using the index.
// Hold all the atom matchers from the config.
std::vector<sp<LogMatchingTracker>> mAllAtomMatchers;
// Hold all the conditions from the config.
std::vector<sp<ConditionTracker>> mAllConditionTrackers;
// Hold all metrics from the config.
std::vector<sp<MetricProducer>> mAllMetricProducers;
// Hold all alert trackers.
std::vector<sp<AnomalyTracker>> mAllAnomalyTrackers;
// Hold all periodic alarm trackers.
std::vector<sp<AlarmTracker>> mAllPeriodicAlarmTrackers;
// To make the log processing more efficient, we want to do as much filtering as possible
// before we go into individual trackers and conditions to match.
// 1st filter: check if the event tag id is in mTagIds.
// 2nd filter: if it is, we parse the event because there is at least one member is interested.
// then pass to all LogMatchingTrackers (itself also filter events by ids).
// 3nd filter: for LogMatchingTrackers that matched this event, we pass this event to the
// ConditionTrackers and MetricProducers that use this matcher.
// 4th filter: for ConditionTrackers that changed value due to this event, we pass
// new conditions to metrics that use this condition.
// The following map is initialized from the statsd_config.
// maps from the index of the LogMatchingTracker to index of MetricProducer.
std::unordered_map<int, std::vector<int>> mTrackerToMetricMap;
// maps from LogMatchingTracker to ConditionTracker
std::unordered_map<int, std::vector<int>> mTrackerToConditionMap;
// maps from ConditionTracker to MetricProducer
std::unordered_map<int, std::vector<int>> mConditionToMetricMap;
void initLogSourceWhiteList();
// The metrics that don't need to be uploaded or even reported.
std::set<int64_t> mNoReportMetricIds;
}

日志事件在每个MetricsManager的处理逻辑如下,简要来说:
1.事件有效性校验,对于单点事件APP_BREADCRUMB_REPORTED或者延迟类事件DAVEY_OCCURRED做分别的校验
2.使用LogMatcher做日志匹配,结果保存在缓存中
3.使用ConditionTracker进行过滤,并更新当前状态
4.如果LogMatcher匹配,调用MetricProducer的onMatchedLogEvent方法
// 1st filter: check if the event tag id is in mTagIds.
// 2nd filter: if it is, we parse the event because there is at least one member is interested.
// then pass to all LogMatchingTrackers (itself also filter events by ids).
// 3nd filter: for LogMatchingTrackers that matched this event, we pass this event to the
// ConditionTrackers and MetricProducers that use this matcher.
// 4th filter: for ConditionTrackers that changed value due to this event, we pass
// new conditions to metrics that use this condition.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
void MetricsManager::onLogEvent(const LogEvent& event) {
if (!mConfigValid) {
return;
}
if (event.GetTagId() == android::util::APP_BREADCRUMB_REPORTED) {
// Check that app breadcrumb reported fields are valid.
// TODO: Find a way to make these checks easier to maintain.
status_t err = NO_ERROR;
// Uid is 3rd from last field and must match the caller's uid,
// unless that caller is statsd itself (statsd is allowed to spoof uids).
long appHookUid = event.GetLong(event.size()-2, &err);
if (err != NO_ERROR ) {
VLOG("APP_BREADCRUMB_REPORTED had error when parsing the uid");
return;
}
int32_t loggerUid = event.GetUid();
if (loggerUid != appHookUid && loggerUid != AID_STATSD) {
VLOG("APP_BREADCRUMB_REPORTED has invalid uid: claimed %ld but caller is %d",
appHookUid, loggerUid);
return;
}
// The state must be from 0,3. This part of code must be manually updated.
long appHookState = event.GetLong(event.size(), &err);
if (err != NO_ERROR ) {
VLOG("APP_BREADCRUMB_REPORTED had error when parsing the state field");
return;
} else if (appHookState < 0 || appHookState > 3) {
VLOG("APP_BREADCRUMB_REPORTED does not have valid state %ld", appHookState);
return;
}
} else if (event.GetTagId() == android::util::DAVEY_OCCURRED) {
// Daveys can be logged from any app since they are logged in libs/hwui/JankTracker.cpp.
// Check that the davey duration is reasonable. Max length check is for privacy.
status_t err = NO_ERROR;
// Uid is the first field provided.
long jankUid = event.GetLong(1, &err);
if (err != NO_ERROR ) {
VLOG("Davey occurred had error when parsing the uid");
return;
}
int32_t loggerUid = event.GetUid();
if (loggerUid != jankUid && loggerUid != AID_STATSD) {
VLOG("DAVEY_OCCURRED has invalid uid: claimed %ld but caller is %d", jankUid,
loggerUid);
return;
}
long duration = event.GetLong(event.size(), &err);
if (err != NO_ERROR ) {
VLOG("Davey occurred had error when parsing the duration");
return;
} else if (duration > 100000) {
VLOG("Davey duration is unreasonably long: %ld", duration);
return;
}
} else {
std::lock_guard<std::mutex> lock(mAllowedLogSourcesMutex);
if (mAllowedLogSources.find(event.GetUid()) == mAllowedLogSources.end()) {
VLOG("log source %d not on the whitelist", event.GetUid());
return;
}
}
int tagId = event.GetTagId();
int64_t eventTime = event.GetElapsedTimestampNs();
if (mTagIds.find(tagId) == mTagIds.end()) {
// not interesting...
return;
}
vector<MatchingState> matcherCache(mAllAtomMatchers.size(), MatchingState::kNotComputed);
for (auto& matcher : mAllAtomMatchers) {
matcher->onLogEvent(event, mAllAtomMatchers, matcherCache);
}
// A bitmap to see which ConditionTracker needs to be re-evaluated.
vector<bool> conditionToBeEvaluated(mAllConditionTrackers.size(), false);
for (const auto& pair : mTrackerToConditionMap) {
if (matcherCache[pair.first] == MatchingState::kMatched) {
const auto& conditionList = pair.second;
for (const int conditionIndex : conditionList) {
conditionToBeEvaluated[conditionIndex] = true;
}
}
}
vector<ConditionState> conditionCache(mAllConditionTrackers.size(),
ConditionState::kNotEvaluated);
// A bitmap to track if a condition has changed value.
vector<bool> changedCache(mAllConditionTrackers.size(), false);
for (size_t i = 0; i < mAllConditionTrackers.size(); i++) {
if (conditionToBeEvaluated[i] == false) {
continue;
}
sp<ConditionTracker>& condition = mAllConditionTrackers[i];
condition->evaluateCondition(event, matcherCache, mAllConditionTrackers, conditionCache,
changedCache);
}
for (size_t i = 0; i < mAllConditionTrackers.size(); i++) {
if (changedCache[i] == false) {
continue;
}
auto pair = mConditionToMetricMap.find(i);
if (pair != mConditionToMetricMap.end()) {
auto& metricList = pair->second;
for (auto metricIndex : metricList) {
// metric cares about non sliced condition, and it's changed.
// Push the new condition to it directly.
if (!mAllMetricProducers[metricIndex]->isConditionSliced()) {
mAllMetricProducers[metricIndex]->onConditionChanged(conditionCache[i],
eventTime);
// metric cares about sliced conditions, and it may have changed. Send
// notification, and the metric can query the sliced conditions that are
// interesting to it.
} else {
mAllMetricProducers[metricIndex]->onSlicedConditionMayChange(conditionCache[i],
eventTime);
}
}
}
}
// For matched AtomMatchers, tell relevant metrics that a matched event has come.
for (size_t i = 0; i < mAllAtomMatchers.size(); i++) {
if (matcherCache[i] == MatchingState::kMatched) {
StatsdStats::getInstance().noteMatcherMatched(mConfigKey,
mAllAtomMatchers[i]->getId());
auto pair = mTrackerToMetricMap.find(i);
if (pair != mTrackerToMetricMap.end()) {
auto& metricList = pair->second;
for (const int metricIndex : metricList) {
// pushed metrics are never scheduled pulls
mAllMetricProducers[metricIndex]->onMatchedLogEvent(i, event);
}
}
}
}
}

Comments